Diffuser des messages Pub/Sub Lite à l'aide de Dataflow
Au lieu d'écrire et d'exécuter vos propres programmes de traitement de données, vous pouvez utiliser Dataflow avec le connecteur d'E/S Pub/Sub Lite pour Apache Beam. Cloud Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données en streaming (en temps réel) et par lot avec un niveau identique de fiabilité et d'expressivité. Il exécute de manière fiable des programmes développés à l'aide du SDK Apache Beam, qui offre un ensemble extensible d'abstractions de traitement avec état puissantes et de connecteurs d'E/S à d'autres systèmes de traitement par flux et par lot.
Ce guide de démarrage rapide explique comment écrire un pipeline Apache Beam pour :
- Lire les messages de Pub/Sub Lite
- Effectuer le fenêtrage (ou le regroupement) de messages par horodatage
- Écrire les messages dans Cloud Storage
Il vous indique également comment :
- Envoyer votre pipeline pour exécution sur Dataflow
- Créer un modèle Flex Dataflow à partir de votre pipeline
Ce tutoriel nécessite Maven, mais il est également possible de convertir l'exemple de projet Maven vers Gradle. Pour en savoir plus, consultez Facultatif: convertir de Maven à Gradle.
Avant de commencer
- Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
- Installez Google Cloud CLI.
-
Pour initialiser gcloudCLI, exécutez la commande suivante :
gcloud init
-
Créez ou sélectionnez un projet Google Cloud.
-
Créez un projet Google Cloud :
gcloud projects create PROJECT_ID
Remplacez
PROJECT_ID
par le nom du projet Google Cloud que vous créez. -
Sélectionnez le projet Google Cloud que vous avez créé :
gcloud config set project PROJECT_ID
Remplacez
PROJECT_ID
par le nom de votre projet Google Cloud.
-
-
Vérifiez que la facturation est activée pour votre projet Google Cloud.
-
Activer les API Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging :
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Configurez l'authentification :
-
Créez le compte de service :
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Remplacez
SERVICE_ACCOUNT_NAME
par le nom que vous souhaitez donner au compte de service. -
Attribuez des rôles au compte de service. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Remplacez les éléments suivants :
SERVICE_ACCOUNT_NAME
: nom du compte de service.PROJECT_ID
: ID du projet dans lequel vous avez créé le compte de service.ROLE
: rôle à accorder
-
Attribuez à votre compte Google un rôle vous permettant d'utiliser les rôles du compte de service et d'associer le compte de service à d'autres ressources :
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Remplacez les éléments suivants :
SERVICE_ACCOUNT_NAME
: nom du compte de service.PROJECT_ID
: ID du projet dans lequel vous avez créé le compte de service.USER_EMAIL
: adresse e-mail de votre compte Google
-
- Installez Google Cloud CLI.
-
Pour initialiser gcloudCLI, exécutez la commande suivante :
gcloud init
-
Créez ou sélectionnez un projet Google Cloud.
-
Créez un projet Google Cloud :
gcloud projects create PROJECT_ID
Remplacez
PROJECT_ID
par le nom du projet Google Cloud que vous créez. -
Sélectionnez le projet Google Cloud que vous avez créé :
gcloud config set project PROJECT_ID
Remplacez
PROJECT_ID
par le nom de votre projet Google Cloud.
-
-
Vérifiez que la facturation est activée pour votre projet Google Cloud.
-
Activer les API Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging :
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Configurez l'authentification :
-
Créez le compte de service :
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Remplacez
SERVICE_ACCOUNT_NAME
par le nom que vous souhaitez donner au compte de service. -
Attribuez des rôles au compte de service. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Remplacez les éléments suivants :
SERVICE_ACCOUNT_NAME
: nom du compte de service.PROJECT_ID
: ID du projet dans lequel vous avez créé le compte de service.ROLE
: rôle à accorder
-
Attribuez à votre compte Google un rôle vous permettant d'utiliser les rôles du compte de service et d'associer le compte de service à d'autres ressources :
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Remplacez les éléments suivants :
SERVICE_ACCOUNT_NAME
: nom du compte de service.PROJECT_ID
: ID du projet dans lequel vous avez créé le compte de service.USER_EMAIL
: adresse e-mail de votre compte Google
-
-
Créez des identifiants d'authentification locaux pour votre compte Google :
gcloud auth application-default login
Configurer votre projet Pub/Sub Lite
Créez des variables pour votre bucket, votre projet et votre région Dataflow dans Cloud Storage. Les noms des buckets Cloud Storage doivent être uniques. La région Dataflow doit être une région valide dans laquelle vous pouvez exécuter votre job. Pour en savoir plus sur les régions et les emplacements, consultez la page Emplacements Dataflow.
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
Créez un bucket Cloud Storage appartenant à ce projet :
gsutil mb gs://$BUCKET
Créer un sujet Lite et un abonnement Pub/Sub Lite zonaux
Créez un sujet Lite Pub/Sub Lite et un abonnement Lite zonaux.
Pour l'emplacement Lite, choisissez un emplacement Pub/Sub Lite compatible. Vous devez également spécifier une zone pour la région. Par exemple, us-central1-a
.
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Diffuser des messages vers Dataflow
Télécharger l'exemple de code du guide de démarrage rapide
Clonez le dépôt de démarrage rapide et accédez au répertoire de l'exemple de code.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
Exemple de code
Cet exemple de code utilise Dataflow pour effectuer les opérations suivantes :
- Lire les messages d'un abonnement Pub/Sub Lite en tant que source illimitée.
- Regroupez les messages en fonction de leur horodatage de publication, à l'aide de périodes fixes et du déclencheur par défaut.
Écrire les messages groupés dans des fichiers Cloud Storage.
Java
Avant d'exécuter cet exemple, suivez les instructions de configuration de Java décrites dans les bibliothèques clientes Pub/Sub Lite.
Démarrer le pipeline Dataflow
Pour démarrer le pipeline dans Dataflow, exécutez la commande suivante :
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
La commande précédente lance une tâche Dataflow. Suivez le lien dans la sortie de la console pour accéder à la tâche dans la console de surveillance Dataflow.
Observer la progression de la tâche
Observez la progression de la tâche dans la console Dataflow.
Ouvrez la vue "Détails de la tâche" pour afficher les éléments suivants :
- Graphique de la tâche
- Détails de l'exécution
- Métriques de tâche
Publiez des messages sur votre sujet Lite.
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
Vous devrez peut-être patienter quelques minutes avant que les messages ne s'affichent dans vos journaux de nœud de calcul.
Utilisez la commande ci-dessous pour vérifier quels fichiers ont été écrits dans Cloud Storage.
gsutil ls "gs://$BUCKET/samples/"
Le résultat doit se présenter sous la forme suivante :
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
Utilisez la commande ci-dessous pour examiner le contenu d'un fichier :
gsutil cat "gs://$BUCKET/samples/your-filename"
Créer un modèle Dataflow (facultatif)
Vous pouvez éventuellement créer un modèle Flex Dataflow personnalisé basé sur votre pipeline. Les modèles Dataflow vous permettent d'exécuter des tâches avec différents paramètres d'entrée à partir de la console Google Cloud ou de la ligne de commande, sans avoir à configurer un environnement de développement Java complet.
Créez un fichier Fat JAR qui inclut toutes les dépendances de votre pipeline. Une fois la commande exécutée, le fichier
target/pubsublite-streaming-bundled-1.0.jar
devrait s'afficher.mvn clean package -DskipTests=true
Indiquez les noms et les emplacements de votre fichier de modèle et de votre image de conteneur de modèle.
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
Créer un modèle Flex personnalisé Un fichier
metadata.json
obligatoire, qui contient les spécifications nécessaires à l'exécution de la tâche, a été fourni avec l'exemple.gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
Exécutez une tâche à l'aide du modèle Flex personnalisé.
Console
Saisissez le nom de la tâche.
Saisissez votre région Dataflow.
Choisissez votre modèle personnalisé.
Saisissez le chemin d'accès du modèle.
Saisissez les paramètres requis.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
Effectuer un nettoyage
Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Google Cloud contenant les ressources.
Dans la console Dataflow, arrêtez la tâche. Annulez le pipeline au lieu de le drainer.
Supprimez le sujet et l'abonnement.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Supprimez les fichiers créés par le pipeline.
gsutil -m rm -rf "gs://$BUCKET/samples/*"
gsutil -m rm -rf "gs://$BUCKET/temp/*"
Supprimez l'image du modèle et le fichier de modèle, s'ils existent.
gcloud container images delete $TEMPLATE_IMAGE
gsutil rm $TEMPLATE_PATH
Supprimez le bucket Cloud Storage.
gsutil rb gs://$BUCKET
-
Supprimez le compte de service :
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Facultatif : Révoquez les identifiants d'authentification que vous avez créés et supprimez le fichier d'identifiants local.
gcloud auth application-default revoke
-
Facultatif : Révoquez les identifiants de la CLI gcloud.
gcloud auth revoke
Étapes suivantes
Apprenez-en plus sur la configuration des modèles Flex Dataflow
Découvrez les pipelines de streaming Dataflow.