Bigtable-Änderungsstream verarbeiten


In dieser Anleitung wird gezeigt, wie Sie in Dataflow Echtzeitstream von Datenbankänderungen aus den Änderungsstream. Die Ausgabe der Pipeline wird Cloud Storage

Ein Beispiel-Dataset für eine Musikhören-Anwendung wird bereitgestellt. In dieser zeichnen Sie die angehörten Songs auf und platzieren Sie dann eine Rangliste der Top-5-Songs in einem Punkt.

Dieses Tutorial richtet sich an technisch versierte Nutzer, die mit dem Schreiben von Code und die Bereitstellung von Datenpipelines in Google Cloud.

Lernziele

In dieser Anleitung erfahren Sie, wie Sie Folgendes tun:

  • Erstellen Sie eine Bigtable-Tabelle mit einem aktivierten Änderungsstream.
  • Stellen Sie in Dataflow eine Pipeline bereit, die den den Änderungsstream.
  • Sehen Sie sich die Ergebnisse der Datenpipeline an.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Installieren Sie die Google Cloud CLI.
  3. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  6. Aktivieren Sie die Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  7. Installieren Sie die Google Cloud CLI.
  8. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  11. Aktivieren Sie die Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  12. Aktualisieren und installieren Sie die cbt-Befehlszeile .
    gcloud components update
    gcloud components install cbt
    

Umgebung vorbereiten

Code abrufen

Klonen Sie das Repository, das den Beispielcode enthält. Wenn Sie diese Datei bereits heruntergeladen haben, Repository abrufen, um die neueste Version abzurufen.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Bucket erstellen

  • Cloud Storage-Bucket erstellen:
    gcloud storage buckets create gs://BUCKET_NAME
    Ersetzen Sie BUCKET_NAME durch einen Bucket-Namen, der den Anforderungen für Bucket-Namen entspricht:
  • Bigtable-Instanz erstellen

    Sie können für diese Anleitung eine vorhandene Instanz verwenden oder eine Instanz erstellen. mit den Standardkonfigurationen in einer Region in Ihrer Nähe.

    Tabelle erstellen

    Die Beispiel-App zeichnet die Songs auf, die sich die Nutzer anhören, und speichert die auf Ereignisse in Bigtable warten. Tabelle mit Änderungsstream erstellen aktiviert ist, die über eine Spaltenfamilie (cf) und eine Spalte (song) verfügt und User-IDs verwendet für Zeilenschlüssel.

    Erstellen Sie die Tabelle.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des von Ihnen verwendeten Projekts
    • BIGTABLE_INSTANCE_ID: die ID der Instanz, die die neue Tabelle enthalten soll

    Pipeline starten

    Diese Pipeline transformiert den Änderungsstream auf folgende Weise:

    1. Änderungsstream lesen
    2. Ruft den Songnamen ab
    3. Gruppiert die Anhör-Ereignisse des Songs in N-Sekunden-Fenstern
    4. Zählt die fünf Top-Songs
    5. Gibt die Ergebnisse aus

    Pipeline ausführen.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Ersetzen Sie BIGTABLE_REGION durch die ID der Region, in der sich die Bigtable-Instanz befindet, z. B. us-east5.

    Informationen zur Pipeline

    Die folgenden Code-Snippets aus der Pipeline können Ihnen helfen, die Code, den Sie ausführen.

    Änderungsstream lesen

    Mit dem Code in diesem Beispiel wird der Quellstream mit den Parametern für den und einer bestimmten Bigtable-Instanz oder -Tabelle.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Songname abrufen

    Beim Anhören eines Liedes wird der Name in die Spaltenfamilie cf geschrieben. und Spaltenqualifizierer song, sodass der Code den Wert aus der Änderung extrahiert und gibt sie im nächsten Schritt der Pipeline aus.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Die fünf Top-Songs zählen

    Mit den integrierten Beam-Funktionen Count und Top.of erhalten Sie die fünf besten im aktuellen Fenster.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Ergebnisse ausgeben

    Diese Pipeline schreibt die Ergebnisse sowohl in Standard-Out als auch in Dateien. Für die werden die Schreibvorgänge in Gruppen von 10 Elementen oder einminütigen Segmenten unterteilt.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Pipeline ansehen

    1. Rufen Sie in der Google Cloud Console die Seite Dataflow auf.

      Zu Dataflow

    2. Klicken Sie auf den Job, dessen Name mit song-rank beginnt.

    3. Klicken Sie unten auf dem Bildschirm auf Anzeigen, um das Logfeld zu öffnen.

    4. Klicken Sie auf Worker-Logs, um die Ausgabelogs des Änderungsstreams zu überwachen.

    Streamschreibvorgänge

    Verwenden Sie die Methode cbt-Befehlszeile eine Reihe von Anhörungen für verschiedene Nutzende zu schreiben, der Tabelle song-rank. Dies ist darauf ausgelegt, über ein paar Minuten zu schreiben, um den der Song im Laufe der Zeit anhört.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Ausgabe ansehen

    Lesen Sie die Ausgabe in Cloud Storage, um die beliebtesten Titel zu sehen.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Beispielausgabe:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    Bereinigen

    Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

    Projekt löschen

      Google Cloud-Projekt löschen:

      gcloud projects delete PROJECT_ID

    Einzelne Ressourcen löschen

    1. Löschen Sie den Bucket und die Dateien.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Deaktivieren Sie den Änderungsstream für die Tabelle.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Löschen Sie die Tabelle song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Halten Sie die Änderungsstreampipeline an.

      1. Lassen Sie die Jobs auflisten, um die Job-ID zu erhalten.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Brechen Sie den Job ab.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Ersetzen Sie JOB_ID durch die Job-ID, die nach dem vorherigen Befehl angezeigt wird.

    Nächste Schritte