In dieser Anleitung wird gezeigt, wie Sie in Dataflow Echtzeitstream von Datenbankänderungen aus den Änderungsstream. Die Ausgabe der Pipeline wird in eine Reihe von Dateien in Cloud Storage geschrieben.
Es wird ein Beispieldatensatz für eine Musik-App bereitgestellt. In dieser Anleitung erfassen Sie die angehörten Titel und sortieren die Top 5 über einen bestimmten Zeitraum.
Diese Anleitung richtet sich an technische Nutzer, die mit dem Schreiben von Code und dem Bereitstellen von Datenpipelines in Google Cloud vertraut sind.
Lernziele
In dieser Anleitung wird Folgendes beschrieben:
- Erstellen Sie eine Bigtable-Tabelle mit einem aktivierten Änderungsstream.
- Stellen Sie eine Pipeline in Dataflow bereit, die den Änderungsstream transformiert und ausgibt.
- Sehen Sie sich die Ergebnisse der Datenpipeline an.
Kosten
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com - Aktualisieren und installieren Sie die
cbt
CLI.gcloud components update gcloud components install cbt
Umgebung vorbereiten
Code abrufen
Klonen Sie das Repository, das den Beispielcode enthält. Wenn Sie diese Datei bereits heruntergeladen haben, Repository abrufen, um die neueste Version abzurufen.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Bucket erstellen
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME
with a bucket name
that meets the bucket naming requirements.
Bigtable-Instanz erstellen
Sie können für diese Anleitung eine vorhandene Instanz verwenden oder eine Instanz mit den Standardkonfigurationen in einer Region in Ihrer Nähe erstellen.
Tabelle erstellen
Die Beispiel-App zeichnet die Songs auf, die sich die Nutzer anhören, und speichert die auf Ereignisse in Bigtable warten. Erstellen Sie eine Tabelle mit einem aktivierten Änderungsstream, die eine Spaltenfamilie (cf) und eine Spalte (song) enthält und Nutzer-IDs als Zeilenschlüssel verwendet.
Erstellen Sie die Tabelle.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Ersetzen Sie Folgendes:
- PROJECT_ID: die ID des von Ihnen verwendeten Projekts
- BIGTABLE_INSTANCE_ID: die ID der Instanz, die die neue Tabelle enthalten soll
Pipeline starten
Diese Pipeline transformiert den Änderungsstream auf folgende Weise:
- Liest den Änderungsstream
- Ruft den Songnamen ab
- Gruppiert die Wiedergabeereignisse des Titels in Zeitfenster von N Sekunden
- Die fünf meistgespielten Titel werden gezählt.
- Gibt die Ergebnisse aus
Pipeline ausführen.
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Ersetzen Sie BIGTABLE_REGION durch die ID der Region, in der sich die Bigtable-Instanz befindet, z. B. us-east5
.
Pipeline
Die folgenden Code-Snippets aus der Pipeline können Ihnen helfen, den ausgeführten Code besser zu verstehen.
Änderungsstream lesen
Mit dem Code in diesem Beispiel wird der Quellstream mit den Parametern für den und einer bestimmten Bigtable-Instanz oder -Tabelle.
Songtitel abrufen
Beim Anhören eines Liedes wird der Name in die Spaltenfamilie cf
geschrieben.
und den Spaltenqualifizierer song
, sodass der Code den Wert aus der Änderung extrahiert
und gibt sie im nächsten Schritt der Pipeline aus.
Die fünf Top-Songs zählen
Mit den integrierten Beam-Funktionen Count
und Top.of
kannst du die fünf beliebtesten Titel im aktuellen Fenster abrufen.
Ergebnisse ausgeben
Diese Pipeline schreibt die Ergebnisse sowohl in Standard-Out als auch in Dateien. Bei den Dateien werden die Schreibvorgänge in Gruppen von 10 Elementen oder einminütigen Segmenten zusammengefasst.
Pipeline ansehen
Rufen Sie in der Google Cloud Console die Seite Dataflow auf.
Klicken Sie auf den Job, dessen Name mit song-rank beginnt.
Klicken Sie unten auf dem Bildschirm auf Anzeigen, um das Logfeld zu öffnen.
Klicken Sie auf Worker-Logs, um die Ausgabeprotokolle des Änderungsstreams zu überwachen.
Streamschreibvorgänge
Verwenden Sie die cbt
-Befehlszeile, um die Anzahl der Songwiedergaben für verschiedene Nutzer in die Tabelle song-rank
zu schreiben. Die Daten werden über einen Zeitraum von mehreren Minuten geschrieben, um die Wiedergabe von Songs zu simulieren, die im Laufe der Zeit gestreamt werden.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
Ausgabe ansehen
Lesen Sie die Ausgabe in Cloud Storage, um die beliebtesten Titel zu sehen.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Beispielausgabe:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.
Projekt löschen
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Einzelne Ressourcen löschen
Löschen Sie den Bucket und die Dateien.
gcloud storage rm --recursive gs://BUCKET_NAME/
Deaktivieren Sie den Änderungsstream für die Tabelle.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
Löschen Sie die Tabelle
song-rank
.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
Halten Sie die Änderungsstream-Pipeline an.
Listen Sie die Jobs auf, um die Job-ID zu erhalten.
gcloud dataflow jobs list --region=BIGTABLE_REGION
Brechen Sie den Job ab.
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
Ersetzen Sie JOB_ID durch die Job-ID, die nach dem vorherigen Befehl angezeigt wird.