Diffuser des messages depuis Pub/Sub à l'aide de Dataflow
Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données par flux (en temps réel) et par lots avec un niveau identique de fiabilité et d'expressivité. Il fournit un environnement de développement de pipeline simplifié à l'aide du SDK Apache Beam, qui offre de nombreuses primitives de fenêtrage et d'analyse de sessions, ainsi qu'un écosystème de connecteurs de sources et de récepteurs. Ce guide de démarrage rapide vous explique comment effectuer les opérations suivantes à l'aide de Dataflow :
- Lire les messages publiés dans un sujet Pub/Sub
- Effectuer le fenêtrage (ou le regroupement) de messages par horodatage
- Écrire les messages dans Cloud Storage
Ce guide de démarrage rapide vous explique comment utiliser Dataflow en Java et Python. SQL est également compatible. Ce guide de démarrage rapide est également proposé en tant que tutoriel Google Cloud Skills, qui fournit des identifiants temporaires pour vous aider à démarrer.
Vous pouvez également commencer par utiliser des modèles Dataflow basés sur l'interface utilisateur si vous n'avez pas l'intention d'effectuer un traitement de données personnalisé.
Avant de commencer
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Configurer votre projet Pub/Sub
-
Créez des variables pour votre bucket, votre projet et votre région. Les noms des buckets Cloud Storage doivent être uniques. Sélectionnez une région Dataflow proche de l'emplacement où vous exécutez les commandes dans ce guide de démarrage rapide. La valeur de la variable
REGION
doit être un nom de région valide. Pour en savoir plus sur les régions et les emplacements, consultez la page Emplacements Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Créez un bucket Cloud Storage appartenant à ce projet :
gcloud storage buckets create gs://$BUCKET_NAME
-
Créez un sujet Pub/Sub dans ce projet :
gcloud pubsub topics create $TOPIC_ID
-
Créez une tâche Cloud Scheduler dans ce projet. La tâche publie un message sur un sujet Cloud Pub/Sub chaque minute.
Si une application App Engine n'existe pas pour le projet, cette étape va en créer une.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Démarrez la tâche.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Utilisez la commande suivante pour cloner le dépôt du guide de démarrage rapide et accéder au répertoire de l'exemple de code :
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Diffuser des messages depuis Pub/Sub vers Cloud Storage
Exemple de code
Cet exemple de code utilise Dataflow pour effectuer les opérations suivantes :
- Lire les messages Pub/Sub
- Effectuer le fenêtrage (ou le regroupement) de messages en intervalles fixes par données d'horodatage
Écrire les messages dans chaque fenêtre dans des fichiers dans Cloud Storage
Java
Python
Démarrer le pipeline
Pour démarrer le pipeline, exécutez la commande suivante :
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
La commande précédente s'exécute localement et lance une tâche Dataflow dans le cloud. Lorsque la commande renvoie JOB_MESSAGE_DETAILED: Workers
have started successfully
, quittez le programme local à l'aide de Ctrl+C
.
Observer la progression de la tâche et du pipeline
Vous pouvez observer la progression de la tâche dans la console Dataflow.
Ouvrez la vue "Détails de la tâche" pour afficher les éléments suivants :
- Structure de la tâche
- Journaux de la tâche
- Métriques de l'étape
Vous devrez peut-être patienter quelques minutes avant que les fichiers de sortie ne s'affichent dans Cloud Storage.
Vous pouvez également utiliser la ligne de commande ci-dessous pour afficher les fichiers qui ont été écrits.
gcloud storage ls gs://${BUCKET_NAME}/samples/
Le résultat doit se présenter sous la forme suivante :
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Effectuer un nettoyage
Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Google Cloud contenant les ressources.
Supprimez la tâche Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
Dans la console Dataflow, arrêtez la tâche. Annulez le pipeline sans le drainer.
Supprimez le sujet.
gcloud pubsub topics delete $TOPIC_ID
Supprimez les fichiers créés par le pipeline.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Supprimez le bucket Cloud Storage.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
Supprimez le compte de service :
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Étape suivante
Si vous souhaitez afficher les messages Pub/Sub selon un horodatage personnalisé, vous pouvez spécifier l'horodatage en tant qu'attribut dans le message Pub/Sub, puis utiliser cet horodatage personnalisé avec la classe
withTimestampAttribute
de PubsubIO.Consultez les modèles Dataflow Open Source de Google conçus pour la diffusion en flux continu.
Découvrez comment Dataflow est intégré à Pub/Sub.
Suivez ce tutoriel qui lit des données depuis Pub/Sub et écrit dans BigQuery à l'aide de modèles Flex Dataflow.
Pour en savoir plus sur le fenêtrage, consultez l'exemple Pipeline pour les jeux mobiles d'Apache Beam.