Diffuser des messages Pub/Sub Lite à l'aide de Dataflow
Au lieu d'écrire et d'exécuter vos propres programmes de traitement de données, vous pouvez utiliser Dataflow avec le connecteur d'E/S Pub/Sub Lite pour Apache Beam. Cloud Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données en streaming (en temps réel) et par lot avec un niveau identique de fiabilité et d'expressivité. Il exécute de manière fiable des programmes développés à l'aide du SDK Apache Beam, qui offre un ensemble extensible d'abstractions de traitement avec état puissantes et de connecteurs d'E/S à d'autres systèmes de traitement par flux et par lot.
Ce guide de démarrage rapide explique comment écrire un pipeline Apache Beam pour :
- Lire les messages de Pub/Sub Lite
- Effectuer le fenêtrage (ou le regroupement) de messages par horodatage
- Écrire les messages dans Cloud Storage
Il vous indique également comment :
- Envoyer votre pipeline pour exécution sur Dataflow
- Créer un modèle Flex Dataflow à partir de votre pipeline
Ce tutoriel nécessite Maven, mais il est également possible de convertir l'exemple de projet de Maven vers Gradle. Pour en savoir plus, consultez Facultatif: passer de Maven à Gradle.
Avant de commencer
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Configurer votre projet Pub/Sub Lite
Créer des variables pour votre bucket Cloud Storage, projet et Région Dataflow. Les noms des buckets Cloud Storage doivent être uniques. La région Dataflow doit être une région valide dans laquelle vous pouvez exécuter votre tâche. Pour en savoir plus sur les régions et les emplacements, consultez la page Emplacements Dataflow.
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
Créez un bucket Cloud Storage appartenant à ce projet :
gcloud storage buckets create gs://$BUCKET
Créer un sujet et un abonnement zonaux Pub/Sub Lite
Créez un sujet Pub/Sub Lite zonal et un abonnement Lite.
Pour l'emplacement Lite, choisissez un emplacement Pub/Sub Lite compatible. Vous devez également spécifier une zone pour la région. Exemple :us-central1-a
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Diffuser des messages vers Dataflow
Télécharger l'exemple de code du guide de démarrage rapide
Clonez le dépôt du guide de démarrage rapide et accédez au répertoire de l'exemple de code.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
Exemple de code
Cet exemple de code utilise Dataflow pour effectuer les opérations suivantes :
- Lire les messages d'un abonnement Pub/Sub Lite en tant que source illimitée.
- Regroupez les messages en fonction de leur code temporel de publication, à l'aide de fenêtres temporelles fixes et du déclencheur par défaut.
Écrire les messages groupés dans des fichiers Cloud Storage.
Java
Avant d'exécuter cet exemple, suivez les instructions de configuration de Java dans la section Bibliothèques clientes de Pub/Sub Lite.
Démarrer le pipeline Dataflow
Pour démarrer le pipeline dans Dataflow, exécutez la commande suivante :
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
La commande précédente lance une tâche Dataflow. Suivez le lien dans la sortie de la console pour accéder à la tâche dans la console de surveillance Dataflow.
Observer la progression de la tâche
Observez la progression de la tâche dans la console Dataflow.
Ouvrez la vue "Détails de la tâche" pour afficher les éléments suivants :
- Graphique de la tâche
- Détails de l'exécution
- Métriques de tâche
Publiez des messages sur votre sujet Lite.
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
Vous devrez peut-être patienter quelques minutes avant que les messages ne s'affichent dans vos journaux de nœud de calcul.
Utilisez la commande ci-dessous pour vérifier quels fichiers ont été écrits dans Cloud Storage.
gcloud storage ls "gs://$BUCKET/samples/"
Le résultat doit se présenter sous la forme suivante :
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
Utilisez la commande ci-dessous pour examiner le contenu d'un fichier :
gcloud storage cat "gs://$BUCKET/samples/your-filename"
Facultatif: Créer un modèle Dataflow
Vous pouvez éventuellement créer un modèle Flex Dataflow personnalisé basé sur votre pipeline. Les modèles Dataflow vous permettent d'exécuter des tâches avec différents paramètres d'entrée à partir de la console Google Cloud ou de la ligne de commande sans avoir à configurer un environnement de développement Java complet.
Créez un fichier Fat JAR qui inclut toutes les dépendances de votre pipeline. Une fois la commande exécutée, le fichier
target/pubsublite-streaming-bundled-1.0.jar
devrait s'afficher.mvn clean package -DskipTests=true
Indiquez les noms et les emplacements de votre fichier de modèle et de votre image de conteneur de modèle.
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
Créer un modèle Flex personnalisé Un fichier
metadata.json
obligatoire, qui contient les spécifications nécessaires à l'exécution de la tâche, a été fourni avec l'exemple.gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
Exécutez une tâche à l'aide du modèle Flex personnalisé.
Console
Saisissez le nom de la tâche.
Saisissez votre région Dataflow.
Choisissez votre modèle personnalisé.
Saisissez le chemin d'accès du modèle.
Saisissez les paramètres requis.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
Effectuer un nettoyage
Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Google Cloud contenant les ressources.
Dans la console Dataflow, arrêtez la tâche. Annulez le pipeline au lieu de le drainer.
Supprimez le sujet et l'abonnement.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Supprimez les fichiers créés par le pipeline.
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
Supprimez l'image du modèle et le fichier de modèle, s'ils existent.
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
Supprimez le bucket Cloud Storage.
gcloud storage rm gs://$BUCKET --recursive
-
Supprimez le compte de service :
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Étape suivante
Apprenez-en plus sur la configuration des modèles Flex Dataflow
Découvrez les pipelines de streaming Dataflow.