Traiter un flux de modifications Bigtable


Ce tutoriel explique comment déployer un pipeline de données dans Dataflow pour flux en temps réel de modifications de la base de données provenant de la table Bigtable flux de modifications. Le résultat du pipeline est écrit dans une série de fichiers Cloud Storage.

Un exemple d'ensemble de données pour une application d'écoute musicale est fourni. Dans ce vous suivez les chansons que vous écoutez, puis vous classez les cinq premiers période.

Ce tutoriel est destiné aux utilisateurs techniques familiarisés avec l'écriture de code et déployer des pipelines de données sur Google Cloud.

Objectifs

Ce tutoriel vous explique comment effectuer les opérations suivantes:

  • Créer une table Bigtable avec un flux de modifications activé
  • Déployer sur Dataflow un pipeline qui transforme et génère la flux de modifications.
  • Affichez les résultats de votre pipeline de données.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  6. Activer les API Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage :

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  11. Activer les API Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage :

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  12. Mettez à jour et installez CLI cbt pour en savoir plus.
    gcloud components update
    gcloud components install cbt
    

Préparer l'environnement

Obtenir le code

Clonez le dépôt contenant l'exemple de code. Si vous avez déjà téléchargé et extrayez-les pour obtenir la dernière version.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Créer un bucket

  • Créez un bucket Cloud Storage :
    gcloud storage buckets create gs://BUCKET_NAME
    Remplacez BUCKET_NAME par un nom qui répond aux conditions requises pour le nom des buckets :
  • Créer une instance Bigtable

    Vous pouvez utiliser une instance existante pour ce tutoriel ou créer une instance avec les configurations par défaut dans une région proche de chez vous.

    Créer une table

    L'application exemple suit les chansons que les utilisateurs écoutent et stocke les des événements d'écoute dans Bigtable. Créer une table avec un flux de modifications qui comporte une famille de colonnes (cf) et une colonne (chanson) et utilise des ID utilisateur pour les clés de ligne.

    Créez la table.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Remplacez les éléments suivants :

    • PROJECT_ID: ID du projet que vous utilisez
    • BIGTABLE_INSTANCE_ID: ID de l'instance qui contiendra la nouvelle table

    Démarrer le pipeline

    Ce pipeline transforme le flux de modifications en procédant comme suit:

    1. Lit le flux de modifications.
    2. Récupère le nom de la chanson.
    3. Regrouper les événements d'écoute du titre dans des fenêtres de N secondes
    4. Comptabilise les cinq titres les plus écoutés
    5. Génère les résultats

    Exécutez le pipeline.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Remplacez BIGTABLE_REGION par l'ID de la région dans laquelle se trouve votre instance Bigtable, par exemple us-east5.

    Comprendre le pipeline

    Les extraits de code suivants du pipeline peuvent vous aider à comprendre du code que vous exécutez.

    Lire le flux de modifications

    Le code de cet exemple configure le flux source avec les paramètres pour une instance et une table Bigtable spécifiques.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Obtenir le nom de la chanson

    Lorsqu'un titre est écouté, son nom est écrit dans la famille de colonnes cf et le qualificatif de colonne song, de sorte que le code extrait la valeur de la modification et le transmet en sortie à l'étape suivante du pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Compter les cinq titres les plus écoutés

    Vous pouvez utiliser les fonctions Beam intégrées Count et Top.of pour obtenir les cinq titres dans la fenêtre actuelle.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Générer les résultats

    Ce pipeline écrit les résultats dans la sortie standard ainsi que dans des fichiers. Pour le les écritures en groupes de 10 éléments ou segments d'une minute.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Afficher le pipeline

    1. Dans la console Google Cloud, accédez à la page Dataflow.

      Accéder à Dataflow

    2. Cliquez sur la tâche dont le nom commence par song-rank.

    3. En bas de l'écran, cliquez sur Show (Afficher) pour ouvrir le panneau des journaux.

    4. Cliquez sur Journaux de nœud de calcul pour surveiller les journaux de sortie du flux de modifications.

    Diffusion d'écritures

    Utilisez le CLI cbt d'écrire un certain nombre de titres écoutés pour différents utilisateurs la table song-rank. Il est conçu pour s'écrire en quelques minutes afin de simuler le titre écouté en streaming au fil du temps.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Consulter le résultat

    Lisez le résultat sur Cloud Storage pour identifier les titres les plus populaires.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Exemple de résultat :

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    Nettoyer

    Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.

    Supprimer le projet

      Supprimez un projet Google Cloud :

      gcloud projects delete PROJECT_ID

    Supprimer des ressources individuelles

    1. Supprimez le bucket et les fichiers.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Désactivez le flux de modifications sur la table.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Supprimez la table song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Arrêtez le pipeline de flux de modifications.

      1. Répertoriez les tâches pour obtenir l'ID de la tâche.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Annuler le job

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Remplacez JOB_ID par l'ID de tâche affiché après la commande précédente.

    Étape suivante