In dieser Anleitung wird gezeigt, wie Sie eine Datenpipeline für einen Echtzeitstream von Datenbankänderungen in Dataflow bereitstellen, die aus dem Änderungsstream einer Bigtable-Tabelle stammen. Die Ausgabe der Pipeline wird in eine Reihe von Dateien in Cloud Storage geschrieben.
Es wird ein Beispiel-Dataset für eine Musikwiedergabe-App bereitgestellt. In diesem Tutorial erfahren Sie, welche Titel angehört wurden, und ordnen dann die fünf besten Songs über einen bestimmten Zeitraum zu.
Diese Anleitung richtet sich an technische Nutzer, die mit dem Schreiben von Code und dem Bereitstellen von Datenpipelines in Google Cloud vertraut sind.
Lernziele
In dieser Anleitung erfahren Sie, wie Sie Folgendes tun:
- Erstellen Sie eine Bigtable-Tabelle mit aktiviertem Änderungsstream.
- Eine Pipeline in Dataflow bereitstellen, die den Änderungsstream transformiert und ausgibt.
- Sehen Sie sich die Ergebnisse der Datenpipeline an.
Kosten
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
- Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
- Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.
-
Aktivieren Sie die Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com - Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.
-
Aktivieren Sie die Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com - Aktualisieren und installieren Sie die
cbt
-Befehlszeile.gcloud components update gcloud components install cbt
Umgebung vorbereiten
Code abrufen
Klonen Sie das Repository, das den Beispielcode enthält. Wenn Sie dieses Repository bereits heruntergeladen haben, rufen Sie die aktuelle Version ab.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Bucket erstellen
gcloud storage buckets create gs://BUCKET_NAMEErsetzen Sie
BUCKET_NAME
durch einen Bucket-Namen, der den Anforderungen für Bucket-Namen entspricht:
Bigtable-Instanz erstellen
Sie können für diese Anleitung eine vorhandene Instanz verwenden oder in einer Region in Ihrer Nähe eine Instanz mit den Standardkonfigurationen erstellen.
Tabelle erstellen
Die Beispielanwendung verfolgt die Songs, die Nutzer abhören, und speichert die Listener-Ereignisse in Bigtable. Erstellen Sie eine Tabelle mit aktiviertem Änderungsstream, die eine Spaltenfamilie (cf) und eine Spalte (Song) hat und Nutzer-IDs für Zeilenschlüssel verwendet.
Erstellen Sie die Tabelle.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Ersetzen Sie Folgendes:
- PROJECT_ID: die ID des von Ihnen verwendeten Projekts
- BIGTABLE_INSTANCE_ID: die ID der Instanz, die die neue Tabelle enthält
Pipeline starten
Diese Pipeline transformiert den Änderungsstream, indem sie Folgendes ausführt:
- Änderungsstream lesen
- Ruft den Namen des Titels ab
- Gruppiert die Ereignisse zum Anhören von Songs in N-Sekunden-Fenster
- Zählt die Top-5-Songs
- Gibt die Ergebnisse aus
Pipeline ausführen.
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Ersetzen Sie BIGTABLE_REGION durch die ID der Region, in der sich Ihre Bigtable-Instanz befindet, z. B. us-east5
.
Informationen zur Pipeline
Die folgenden Code-Snippets aus der Pipeline können Ihnen helfen, den Code zu verstehen, den Sie ausführen.
Änderungsstream lesen
Der Code in diesem Beispiel konfiguriert den Quellstream mit den Parametern für die jeweilige Bigtable-Instanz und -Tabelle.
Name des Titels wird abgerufen
Wenn ein Titel angehört wird, wird der Songname in die Spaltenfamilie cf
und den Spaltenqualifizierer song
geschrieben, sodass der Code den Wert aus der Änderungsstreammutation extrahiert und für den nächsten Schritt der Pipeline ausgibt.
Die fünf Top-Songs zählen
Sie können die integrierten Beam-Funktionen Count
und Top.of
verwenden, um die fünf Top-Titel im aktuellen Fenster abzurufen.
Ausgabe der Ergebnisse
Diese Pipeline schreibt die Ergebnisse sowohl in die Standardausgabe als auch in Dateien. Für die Dateien werden die Schreibvorgänge in Gruppen von 10 Elementen oder Ein-Minuten-Segmenten unterteilt.
Pipeline ansehen
Rufen Sie in der Google Cloud Console die Seite Dataflow auf.
Klicken Sie auf den Job, dessen Name mit song-rank beginnt.
Klicken Sie unten auf dem Bildschirm auf Show (Anzeigen), um den Logbereich zu öffnen.
Klicken Sie auf Worker-Logs, um die Ausgabelogs des Änderungsstreams zu überwachen.
Streamschreibvorgänge
Verwenden Sie die cbt
-Befehlszeile, um eine Reihe von Songs, die verschiedene Nutzer angehört haben, in die Tabelle song-rank
zu schreiben. Dies ist so konzipiert, dass über einige Minuten geschrieben wird, um das Streamen von Songs im Laufe der Zeit zu simulieren.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
Ausgabe ansehen
In der Ausgabe in Cloud Storage finden Sie die beliebtesten Titel.
gsutil cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Beispielausgabe:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.
Projekt löschen
Google Cloud-Projekt löschen:
gcloud projects delete PROJECT_ID
Einzelne Ressourcen löschen
Löschen Sie den Bucket und die Dateien.
gcloud storage rm --recursive gs://BUCKET_NAME/
Den Änderungsstream für die Tabelle deaktivieren.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
Löschen Sie die Tabelle
song-rank
.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
Beenden Sie die Änderungsstreampipeline.
Listen Sie die Jobs auf, um die Job-ID zu erhalten.
gcloud dataflow jobs list --region=BIGTABLE_REGION
Brechen Sie den Job ab.
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
Ersetzen Sie JOB_ID durch die Job-ID, die nach dem vorherigen Befehl angezeigt wird.