Ce tutoriel explique comment déployer un pipeline de données sur Dataflow pour un flux en temps réel de modifications de la base de données provenant du flux de modifications d'une table Bigtable. La sortie du pipeline est écrite dans une série de fichiers sur Cloud Storage.
Un exemple d'ensemble de données pour une application d'écoute de musique est fourni. Dans ce tutoriel, vous suivez les titres écoutés, puis vous classez les cinq meilleurs titres sur une période donnée.
Ce tutoriel est destiné aux utilisateurs techniques qui connaissent bien l'écriture de code et le déploiement de pipelines de données sur Google Cloud.
Objectifs
Ce tutoriel vous explique comment effectuer les opérations suivantes:
- Créez une table Bigtable avec un flux de modifications activé.
- Déployer sur Dataflow un pipeline qui transforme et génère le flux de modifications
- Affichez les résultats de votre pipeline de données.
Coûts
Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :
Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût.
Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.
Avant de commencer
- Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
- Installez Google Cloud CLI.
-
Pour initialiser gcloudCLI, exécutez la commande suivante :
gcloud init
-
Créez ou sélectionnez un projet Google Cloud.
-
Créez un projet Google Cloud :
gcloud projects create PROJECT_ID
Remplacez
PROJECT_ID
par le nom du projet Google Cloud que vous créez. -
Sélectionnez le projet Google Cloud que vous avez créé :
gcloud config set project PROJECT_ID
Remplacez
PROJECT_ID
par le nom de votre projet Google Cloud.
-
-
Vérifiez que la facturation est activée pour votre projet Google Cloud.
-
Activer les API Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage :
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com - Installez Google Cloud CLI.
-
Pour initialiser gcloudCLI, exécutez la commande suivante :
gcloud init
-
Créez ou sélectionnez un projet Google Cloud.
-
Créez un projet Google Cloud :
gcloud projects create PROJECT_ID
Remplacez
PROJECT_ID
par le nom du projet Google Cloud que vous créez. -
Sélectionnez le projet Google Cloud que vous avez créé :
gcloud config set project PROJECT_ID
Remplacez
PROJECT_ID
par le nom de votre projet Google Cloud.
-
-
Vérifiez que la facturation est activée pour votre projet Google Cloud.
-
Activer les API Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage :
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com - Mettez à jour et installez la CLI
cbt
.gcloud components update gcloud components install cbt
Préparer l'environnement
Obtenir le code
Clonez le dépôt qui contient l'exemple de code. Si vous avez déjà téléchargé ce dépôt, extrayez-le pour obtenir la dernière version.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Créer un bucket
gcloud storage buckets create gs://BUCKET_NAMERemplacez
BUCKET_NAME
par un nom qui répond aux conditions requises pour le nom des buckets :
Créer une instance Bigtable
Vous pouvez utiliser une instance existante pour ce tutoriel ou créer une instance avec les configurations par défaut dans une région proche de vous.
Créer une table
L'exemple d'application suit les chansons que les utilisateurs écoutent et stocke les événements d'écoute dans Bigtable. Créez une table avec un flux de modifications activé, comportant une famille de colonnes (cf) et une colonne (chanson) et utilisant des ID utilisateur pour les clés de ligne.
Créez la table.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Remplacez les éléments suivants :
- PROJECT_ID: ID du projet que vous utilisez.
- BIGTABLE_INSTANCE_ID: ID de l'instance qui contiendra la nouvelle table.
Démarrer le pipeline
Ce pipeline transforme le flux de modifications en procédant comme suit:
- Lit le flux de modifications
- Récupère le nom du titre.
- Regroupe les événements d'écoute du titre dans des fenêtres de N secondes.
- Compte les cinq meilleurs titres
- Génère les résultats
Exécutez le pipeline.
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Remplacez BIGTABLE_REGION par l'ID de la région dans laquelle se trouve votre instance Bigtable, par exemple us-east5
.
Comprendre le pipeline
Les extraits de code suivants du pipeline peuvent vous aider à comprendre le code que vous exécutez.
Lire le flux de modifications
Le code de cet exemple configure le flux source avec les paramètres de l'instance et de la table Bigtable spécifiques.
Obtenir le nom de la chanson
Lorsqu'une chanson est écoutée, son nom est écrit dans la famille de colonnes cf
et le qualificatif de colonne song
. Le code extrait donc la valeur de la mutation du flux de modifications et la génère à l'étape suivante du pipeline.
Compter les cinq meilleurs titres
Vous pouvez utiliser les fonctions Beam intégrées Count
et Top.of
pour obtenir le top 5 des titres de la fenêtre actuelle.
Générer les résultats
Ce pipeline écrit les résultats dans la sortie standard ainsi que dans des fichiers. Pour les fichiers, elle fenêtre les écritures en groupes de 10 éléments ou segments d'une minute.
Afficher le pipeline
Dans la console Google Cloud, accédez à la page Dataflow.
Cliquez sur le job dont le nom commence par song-rank.
Au bas de l'écran, cliquez sur Afficher pour ouvrir le panneau des journaux.
Cliquez sur Worker logs (Journaux des nœuds de calcul) pour surveiller les journaux de sortie du flux de modifications.
Écritures en flux continu
Utilisez la CLI cbt
pour écrire un certain nombre de titres écoutés pour différents utilisateurs dans la table song-rank
. Ce texte est conçu pour s'étendre sur quelques minutes afin de simuler l'écoute d'un titre en streaming au fil du temps.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
Consulter le résultat
Lisez le résultat sur Cloud Storage pour voir les titres les plus populaires.
gsutil cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Exemple de résultat :
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
Effectuer un nettoyage
Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.
Supprimer le projet
Supprimez un projet Google Cloud :
gcloud projects delete PROJECT_ID
Supprimer des ressources individuelles
Supprimez le bucket et les fichiers.
gcloud storage rm --recursive gs://BUCKET_NAME/
Désactivez le flux de modifications sur la table.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
Supprimez la table
song-rank
.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
Arrêtez le pipeline de flux de modifications.
Répertoriez les tâches pour obtenir l'ID de la tâche.
gcloud dataflow jobs list --region=BIGTABLE_REGION
Annuler la tâche
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
Remplacez JOB_ID par l'ID de tâche affiché après la commande précédente.