Diffuser des messages depuis Pub/Sub à l'aide de Dataflow
Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données par flux (en temps réel) et par lots avec un niveau identique de fiabilité et d'expressivité. Il fournit un environnement de développement de pipeline simplifié à l'aide du SDK Apache Beam, qui offre de nombreuses primitives de fenêtrage et d'analyse de sessions, ainsi qu'un écosystème de connecteurs de sources et de récepteurs. Ce guide de démarrage rapide vous explique comment effectuer les opérations suivantes à l'aide de Dataflow :
- Lire les messages publiés dans un sujet Pub/Sub
- Effectuer le fenêtrage (ou le regroupement) de messages par horodatage
- Écrire les messages dans Cloud Storage
Ce guide de démarrage rapide vous explique comment utiliser Dataflow en Java et Python. SQL est également compatible. Ce guide de démarrage rapide est également proposé en tant que tutoriel Google Cloud Skills, qui fournit des identifiants temporaires pour vous aider à démarrer.
Vous pouvez également commencer par utiliser des modèles Dataflow basés sur l'interface utilisateur si vous n'avez pas l'intention d'effectuer un traitement de données personnalisé.
Avant de commencer
- Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
- Installez Google Cloud CLI.
-
Pour initialiser gcloudCLI, exécutez la commande suivante :
gcloud init
-
Créez ou sélectionnez un projet Google Cloud.
-
Créez un projet Cloud :
gcloud projects create PROJECT_ID
-
Sélectionnez le projet Cloud que vous avez créé :
gcloud config set project PROJECT_ID
-
-
Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier si la facturation est activée sur un projet.
-
Activer les API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler :
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configurez l'authentification :
-
Créez le compte de service :
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Remplacez
SERVICE_ACCOUNT_NAME
par le nom que vous souhaitez donner au compte de service. -
Attribuez des rôles au compte de service. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Remplacez les éléments suivants :
SERVICE_ACCOUNT_NAME
: nom du compte de service.PROJECT_ID
: ID du projet dans lequel vous avez créé le compte de service.ROLE
: rôle à accorder
-
Attribuez à votre compte Google un rôle vous permettant d'utiliser les rôles du compte de service et d'associer le compte de service à d'autres ressources :
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Remplacez les éléments suivants :
SERVICE_ACCOUNT_NAME
: nom du compte de service.PROJECT_ID
: ID du projet dans lequel vous avez créé le compte de service.USER_EMAIL
: adresse e-mail de votre compte Google
-
- Installez Google Cloud CLI.
-
Pour initialiser gcloudCLI, exécutez la commande suivante :
gcloud init
-
Créez ou sélectionnez un projet Google Cloud.
-
Créez un projet Cloud :
gcloud projects create PROJECT_ID
-
Sélectionnez le projet Cloud que vous avez créé :
gcloud config set project PROJECT_ID
-
-
Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier si la facturation est activée sur un projet.
-
Activer les API Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler :
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Configurez l'authentification :
-
Créez le compte de service :
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Remplacez
SERVICE_ACCOUNT_NAME
par le nom que vous souhaitez donner au compte de service. -
Attribuez des rôles au compte de service. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Remplacez les éléments suivants :
SERVICE_ACCOUNT_NAME
: nom du compte de service.PROJECT_ID
: ID du projet dans lequel vous avez créé le compte de service.ROLE
: rôle à accorder
-
Attribuez à votre compte Google un rôle vous permettant d'utiliser les rôles du compte de service et d'associer le compte de service à d'autres ressources :
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Remplacez les éléments suivants :
SERVICE_ACCOUNT_NAME
: nom du compte de service.PROJECT_ID
: ID du projet dans lequel vous avez créé le compte de service.USER_EMAIL
: adresse e-mail de votre compte Google
-
-
Créez des identifiants d'authentification pour votre compte Google :
gcloud auth application-default login
Configurer votre projet Pub/Sub
-
Créez des variables pour votre bucket, votre projet et votre région. Les noms des buckets Cloud Storage doivent être uniques. Sélectionnez une région Dataflow proche de l'emplacement où vous exécutez les commandes dans ce guide de démarrage rapide. La valeur de la variable
REGION
doit être un nom de région valide. Pour en savoir plus sur les régions et les emplacements, consultez la page Emplacements Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Créez un bucket Cloud Storage appartenant à ce projet :
gsutil mb gs://$BUCKET_NAME
-
Créez un sujet Pub/Sub dans ce projet :
gcloud pubsub topics create $TOPIC_ID
-
Créez une tâche Cloud Scheduler dans ce projet. La tâche publie un message sur un sujet Cloud Pub/Sub chaque minute.
Si une application App Engine n'existe pas pour le projet, cette étape va en créer une.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Démarrez la tâche.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Utilisez la commande suivante pour cloner le dépôt du guide de démarrage rapide et accéder au répertoire de l'exemple de code :
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Diffuser des messages depuis Pub/Sub vers Cloud Storage
Exemple de code
Cet exemple de code utilise Dataflow pour effectuer les opérations suivantes :
- Lire les messages Pub/Sub
- Effectuer le fenêtrage (ou le regroupement) de messages en intervalles fixes par données d'horodatage
Écrire les messages dans chaque fenêtre dans des fichiers dans Cloud Storage
Java
Python
Démarrer le pipeline
Pour démarrer le pipeline, exécutez la commande suivante :
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
La commande précédente s'exécute localement et lance une tâche Dataflow dans le cloud. Lorsque la commande renvoie JOB_MESSAGE_DETAILED: Workers
have started successfully
, quittez le programme local à l'aide de Ctrl+C
.
Observer la progression de la tâche et du pipeline
Vous pouvez observer la progression de la tâche dans la console Dataflow.
Ouvrez la vue "Détails de la tâche" pour afficher les éléments suivants :
- Structure de la tâche
- Journaux de la tâche
- Métriques de l'étape
Vous devrez peut-être patienter quelques minutes avant que les fichiers de sortie ne s'affichent dans Cloud Storage.
Vous pouvez également utiliser la ligne de commande ci-dessous pour afficher les fichiers qui ont été écrits.
gsutil ls gs://${BUCKET_NAME}/samples/
Le résultat doit se présenter sous la forme suivante :
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Effectuer un nettoyage
Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Cloud contenant les ressources.
Supprimez la tâche Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
Dans la console Dataflow, arrêtez la tâche. Annulez le pipeline sans le drainer.
Supprimez le sujet.
gcloud pubsub topics delete $TOPIC_ID
Supprimez les fichiers créés par le pipeline.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Supprimez le bucket Cloud Storage.
gsutil rb gs://${BUCKET_NAME}
-
Supprimez le compte de service :
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Facultatif : Révoquez les identifiants d'authentification que vous avez créés et supprimez le fichier d'identifiants local.
gcloud auth application-default revoke
-
Facultatif : Révoquez les identifiants de la CLI gcloud.
gcloud auth revoke
Étapes suivantes
Si vous souhaitez afficher les messages Pub/Sub selon un horodatage personnalisé, vous pouvez spécifier l'horodatage en tant qu'attribut dans le message Pub/Sub, puis utiliser cet horodatage personnalisé avec la classe
withTimestampAttribute
de PubsubIO.Consultez les modèles Dataflow Open Source de Google conçus pour la diffusion en flux continu.
Découvrez comment Dataflow est intégré à Pub/Sub.
Suivez ce tutoriel qui lit des données depuis Pub/Sub et écrit dans BigQuery à l'aide de modèles Flex Dataflow.
Pour en savoir plus sur le fenêtrage, consultez l'exemple Pipeline pour les jeux mobiles d'Apache Beam.