Bigtable-Änderungsstream verarbeiten


In dieser Anleitung wird gezeigt, wie Sie eine Datenpipeline für einen Echtzeitstream von Datenbankänderungen in Dataflow bereitstellen, die aus dem Änderungsstream einer Bigtable-Tabelle stammen. Die Ausgabe der Pipeline wird in eine Reihe von Dateien in Cloud Storage geschrieben.

Es wird ein Beispiel-Dataset für eine Musikwiedergabe-App bereitgestellt. In diesem Tutorial erfahren Sie, welche Titel angehört wurden, und ordnen dann die fünf besten Songs über einen bestimmten Zeitraum zu.

Diese Anleitung richtet sich an technische Nutzer, die mit dem Schreiben von Code und dem Bereitstellen von Datenpipelines in Google Cloud vertraut sind.

Lernziele

In dieser Anleitung erfahren Sie, wie Sie Folgendes tun:

  • Erstellen Sie eine Bigtable-Tabelle mit aktiviertem Änderungsstream.
  • Eine Pipeline in Dataflow bereitstellen, die den Änderungsstream transformiert und ausgibt.
  • Sehen Sie sich die Ergebnisse der Datenpipeline an.

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

  1. Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
  2. Installieren Sie die Google Cloud CLI.
  3. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  6. Aktivieren Sie die Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  7. Installieren Sie die Google Cloud CLI.
  8. Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  11. Aktivieren Sie die Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  12. Aktualisieren und installieren Sie die cbt-Befehlszeile.
    gcloud components update
    gcloud components install cbt
    

Umgebung vorbereiten

Code abrufen

Klonen Sie das Repository, das den Beispielcode enthält. Wenn Sie dieses Repository bereits heruntergeladen haben, rufen Sie die aktuelle Version ab.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Bucket erstellen

  • Cloud Storage-Bucket erstellen:
    gcloud storage buckets create gs://BUCKET_NAME
    Ersetzen Sie BUCKET_NAME durch einen Bucket-Namen, der den Anforderungen für Bucket-Namen entspricht:
  • Bigtable-Instanz erstellen

    Sie können für diese Anleitung eine vorhandene Instanz verwenden oder in einer Region in Ihrer Nähe eine Instanz mit den Standardkonfigurationen erstellen.

    Tabelle erstellen

    Die Beispielanwendung verfolgt die Songs, die Nutzer abhören, und speichert die Listener-Ereignisse in Bigtable. Erstellen Sie eine Tabelle mit aktiviertem Änderungsstream, die eine Spaltenfamilie (cf) und eine Spalte (Song) hat und Nutzer-IDs für Zeilenschlüssel verwendet.

    Erstellen Sie die Tabelle.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die ID des von Ihnen verwendeten Projekts
    • BIGTABLE_INSTANCE_ID: die ID der Instanz, die die neue Tabelle enthält

    Pipeline starten

    Diese Pipeline transformiert den Änderungsstream, indem sie Folgendes ausführt:

    1. Änderungsstream lesen
    2. Ruft den Namen des Titels ab
    3. Gruppiert die Ereignisse zum Anhören von Songs in N-Sekunden-Fenster
    4. Zählt die Top-5-Songs
    5. Gibt die Ergebnisse aus

    Pipeline ausführen.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Ersetzen Sie BIGTABLE_REGION durch die ID der Region, in der sich Ihre Bigtable-Instanz befindet, z. B. us-east5.

    Informationen zur Pipeline

    Die folgenden Code-Snippets aus der Pipeline können Ihnen helfen, den Code zu verstehen, den Sie ausführen.

    Änderungsstream lesen

    Der Code in diesem Beispiel konfiguriert den Quellstream mit den Parametern für die jeweilige Bigtable-Instanz und -Tabelle.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Name des Titels wird abgerufen

    Wenn ein Titel angehört wird, wird der Songname in die Spaltenfamilie cf und den Spaltenqualifizierer song geschrieben, sodass der Code den Wert aus der Änderungsstreammutation extrahiert und für den nächsten Schritt der Pipeline ausgibt.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Die fünf Top-Songs zählen

    Sie können die integrierten Beam-Funktionen Count und Top.of verwenden, um die fünf Top-Titel im aktuellen Fenster abzurufen.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Ausgabe der Ergebnisse

    Diese Pipeline schreibt die Ergebnisse sowohl in die Standardausgabe als auch in Dateien. Für die Dateien werden die Schreibvorgänge in Gruppen von 10 Elementen oder Ein-Minuten-Segmenten unterteilt.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Pipeline ansehen

    1. Rufen Sie in der Google Cloud Console die Seite Dataflow auf.

      Zu Dataflow

    2. Klicken Sie auf den Job, dessen Name mit song-rank beginnt.

    3. Klicken Sie unten auf dem Bildschirm auf Show (Anzeigen), um den Logbereich zu öffnen.

    4. Klicken Sie auf Worker-Logs, um die Ausgabelogs des Änderungsstreams zu überwachen.

    Streamschreibvorgänge

    Verwenden Sie die cbt-Befehlszeile, um eine Reihe von Songs, die verschiedene Nutzer angehört haben, in die Tabelle song-rank zu schreiben. Dies ist so konzipiert, dass über einige Minuten geschrieben wird, um das Streamen von Songs im Laufe der Zeit zu simulieren.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Ausgabe ansehen

    In der Ausgabe in Cloud Storage finden Sie die beliebtesten Titel.

    gsutil cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Beispielausgabe:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    Bereinigen

    Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.

    Projekt löschen

      Google Cloud-Projekt löschen:

      gcloud projects delete PROJECT_ID

    Einzelne Ressourcen löschen

    1. Löschen Sie den Bucket und die Dateien.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Den Änderungsstream für die Tabelle deaktivieren.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Löschen Sie die Tabelle song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Beenden Sie die Änderungsstreampipeline.

      1. Listen Sie die Jobs auf, um die Job-ID zu erhalten.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Brechen Sie den Job ab.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Ersetzen Sie JOB_ID durch die Job-ID, die nach dem vorherigen Befehl angezeigt wird.

    Nächste Schritte