Ce tutoriel explique comment déployer un pipeline de données dans Dataflow pour un flux en temps réel des modifications apportées à la base de données, provenant du flux de modifications d'une table Bigtable. La sortie du pipeline est écrite dans une série de fichiers sur Cloud Storage.
Un exemple de jeu de données pour une application d'écoute de musique est fourni. Dans ce tutoriel, vous allez suivre les chansons écoutées, puis classer les cinq premières sur une période donnée.
Ce tutoriel est destiné aux utilisateurs techniques qui savent écrire du code et déployer des pipelines de données sur Google Cloud.
Objectifs
Ce tutoriel vous explique comment effectuer les tâches suivantes :
- Créez une table Bigtable avec un flux de modifications activé.
- Déployez un pipeline sur Dataflow qui transforme et génère le flux de modifications.
- Affichez les résultats de votre pipeline de données.
Coûts
Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :
Pour obtenir une estimation des coûts en fonction de votre utilisation prévue, utilisez le simulateur de coût.
Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.
Avant de commencer
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name. -
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name. - Mettez à jour et installez la CLI
cbt
.gcloud components update gcloud components install cbt
-
Create a Cloud Storage bucket and configure it as follows:
-
Remplacez
STORAGE_CLASS
par la classe de stockage de votre choix. -
Remplacez
LOCATION
par l'emplacement de votre choix (ASIA
,EU
ouUS
) -
Remplacez
BUCKET_NAME
par Ce nom doit respecter les exigences relatives aux noms de buckets. - PROJECT_ID : ID du projet que vous utilisez
- BIGTABLE_INSTANCE_ID : ID de l'instance qui contiendra la nouvelle table.
- Lit le flux de modifications
- Récupère le nom du titre
- Regroupe les événements d'écoute de titres dans des fenêtres de N secondes.
- Compte les cinq titres les plus écoutés
- Génère les résultats
Dans la console Google Cloud , accédez à la page Dataflow.
Cliquez sur le job dont le nom commence par song-rank.
En bas de l'écran, cliquez sur Afficher pour ouvrir le panneau des journaux.
Cliquez sur Journaux des nœuds de calcul pour surveiller les journaux de sortie du flux de modifications.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STORAGE_CLASS --location LOCATION
Créer une instance Bigtable
Vous pouvez utiliser une instance existante pour ce tutoriel ou créer une instance avec les configurations par défaut dans une région proche de vous.
Créer une table
L'application exemple suit les titres écoutés par les utilisateurs et stocke les événements d'écoute dans Bigtable. Créez une table avec un flux de modifications activé, qui comporte une famille de colonnes (cf) et une colonne (song), et qui utilise des ID utilisateur pour les clés de ligne.
Créez la table.
gcloud bigtable instances tables create song-rank \ --column-families=cf --change-stream-retention-period=7d \ --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Remplacez les éléments suivants :
Démarrer le pipeline
Ce pipeline transforme le flux de modifications en effectuant les opérations suivantes :
Exécutez le pipeline.
mvn compile exec:java -Dexec.mainClass=SongRank \ "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \ --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \ --outputLocation=gs://BUCKET_NAME/ \ --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Remplacez BIGTABLE_REGION par l'ID de la région dans laquelle se trouve votre instance Bigtable, par exemple
us-east5
.Comprendre le pipeline
Les extraits de code suivants du pipeline peuvent vous aider à comprendre le code que vous exécutez.
Lire le flux de modifications
Le code de cet exemple configure le flux source avec les paramètres de l'instance et de la table Bigtable spécifiques.
Obtenir le nom du titre
Lorsqu'un utilisateur écoute un titre, son nom est écrit dans la famille de colonnes
cf
et le qualificatif de colonnesong
. Le code extrait donc la valeur de la mutation du flux de modifications et la transmet à l'étape suivante du pipeline.Compter les cinq titres les plus écoutés
Vous pouvez utiliser les fonctions Beam intégrées
Count
etTop.of
pour obtenir les cinq titres les plus écoutés dans la fenêtre actuelle.Afficher les résultats
Ce pipeline écrit les résultats dans la sortie standard et dans des fichiers. Pour les fichiers, il regroupe les écritures par groupes de 10 éléments ou par segments d'une minute.
Afficher le pipeline
Écritures de flux
Utilisez la CLI
cbt
pour écrire le nombre d'écoutes de titres pour différents utilisateurs dans la tablesong-rank
. Il est conçu pour écrire pendant quelques minutes afin de simuler des écoutes de titres en streaming au fil du temps.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \ song-rank song-rank-data.csv column-family=cf batch-size=1
Consulter le résultat
Lisez les résultats sur Cloud Storage pour découvrir les titres les plus populaires.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Exemple de résultat :
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}] 2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}] 2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
-
Remplacez
Supprimez le bucket et les fichiers.
gcloud storage rm --recursive gs://BUCKET_NAME/
Désactivez le flux de modifications dans la table.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
Supprimez la table
song-rank
.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
Arrêtez le pipeline de flux de modifications.
Listez les jobs pour obtenir l'ID du job.
gcloud dataflow jobs list --region=BIGTABLE_REGION
Annulez la tâche.
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
Remplacez JOB_ID par l'ID de tâche affiché après la commande précédente.
Installez Google Cloud CLI. Après l'installation, initialisez Google Cloud CLI en exécutant la commande suivante :
gcloud init
Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.
Create or select a Google Cloud project.
Make sure that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Installez Google Cloud CLI. Après l'installation, initialisez Google Cloud CLI en exécutant la commande suivante :
gcloud init
Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.
Create or select a Google Cloud project.
Make sure that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Préparer l'environnement
Obtenir le code
Clonez le dépôt contenant l'exemple de code. Si vous avez déjà téléchargé ce dépôt, extrayez-le pour obtenir la dernière version.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Créer un bucket
Effectuer un nettoyage
Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.
Supprimer le projet
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID