In dieser Anleitung wird gezeigt, wie Sie in Dataflow Echtzeitstream von Datenbankänderungen aus den Änderungsstream. Die Ausgabe der Pipeline wird Cloud Storage
Ein Beispiel-Dataset für eine Musikhören-Anwendung wird bereitgestellt. In dieser zeichnen Sie die angehörten Songs auf und platzieren Sie dann eine Rangliste der Top-5-Songs in einem Punkt.
Dieses Tutorial richtet sich an technisch versierte Nutzer, die mit dem Schreiben von Code und die Bereitstellung von Datenpipelines in Google Cloud.
Lernziele
In dieser Anleitung erfahren Sie, wie Sie Folgendes tun:
- Erstellen Sie eine Bigtable-Tabelle mit einem aktivierten Änderungsstream.
- Stellen Sie in Dataflow eine Pipeline bereit, die den den Änderungsstream.
- Sehen Sie sich die Ergebnisse der Datenpipeline an.
Kosten
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Nach Abschluss der in diesem Dokument beschriebenen Aufgaben können Sie weitere Kosten vermeiden, indem Sie die erstellten Ressourcen löschen. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
- Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
- Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.
-
Aktivieren Sie die Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com - Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.
-
Aktivieren Sie die Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.com
bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com - Aktualisieren und installieren Sie die
cbt
-Befehlszeile .gcloud components update gcloud components install cbt
Umgebung vorbereiten
Code abrufen
Klonen Sie das Repository, das den Beispielcode enthält. Wenn Sie diese Datei bereits heruntergeladen haben, Repository abrufen, um die neueste Version abzurufen.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Bucket erstellen
gcloud storage buckets create gs://BUCKET_NAMEErsetzen Sie
BUCKET_NAME
durch einen Bucket-Namen, der den Anforderungen für Bucket-Namen entspricht:
Bigtable-Instanz erstellen
Sie können für diese Anleitung eine vorhandene Instanz verwenden oder eine Instanz erstellen. mit den Standardkonfigurationen in einer Region in Ihrer Nähe.
Tabelle erstellen
Die Beispiel-App zeichnet die Songs auf, die sich die Nutzer anhören, und speichert die auf Ereignisse in Bigtable warten. Tabelle mit Änderungsstream erstellen aktiviert ist, die über eine Spaltenfamilie (cf) und eine Spalte (song) verfügt und User-IDs verwendet für Zeilenschlüssel.
Erstellen Sie die Tabelle.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Ersetzen Sie Folgendes:
- PROJECT_ID: die ID des von Ihnen verwendeten Projekts
- BIGTABLE_INSTANCE_ID: die ID der Instanz, die die neue Tabelle enthalten soll
Pipeline starten
Diese Pipeline transformiert den Änderungsstream auf folgende Weise:
- Änderungsstream lesen
- Ruft den Songnamen ab
- Gruppiert die Anhör-Ereignisse des Songs in N-Sekunden-Fenstern
- Zählt die fünf Top-Songs
- Gibt die Ergebnisse aus
Pipeline ausführen.
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Ersetzen Sie BIGTABLE_REGION durch die ID der Region, in der sich die Bigtable-Instanz befindet, z. B. us-east5
.
Informationen zur Pipeline
Die folgenden Code-Snippets aus der Pipeline können Ihnen helfen, die Code, den Sie ausführen.
Änderungsstream lesen
Mit dem Code in diesem Beispiel wird der Quellstream mit den Parametern für den und einer bestimmten Bigtable-Instanz oder -Tabelle.
Songname abrufen
Beim Anhören eines Liedes wird der Name in die Spaltenfamilie cf
geschrieben.
und Spaltenqualifizierer song
, sodass der Code den Wert aus der Änderung extrahiert
und gibt sie im nächsten Schritt der Pipeline aus.
Die fünf Top-Songs zählen
Mit den integrierten Beam-Funktionen Count
und Top.of
erhalten Sie die fünf besten
im aktuellen Fenster.
Ergebnisse ausgeben
Diese Pipeline schreibt die Ergebnisse sowohl in Standard-Out als auch in Dateien. Für die werden die Schreibvorgänge in Gruppen von 10 Elementen oder einminütigen Segmenten unterteilt.
Pipeline ansehen
Rufen Sie in der Google Cloud Console die Seite Dataflow auf.
Klicken Sie auf den Job, dessen Name mit song-rank beginnt.
Klicken Sie unten auf dem Bildschirm auf Anzeigen, um das Logfeld zu öffnen.
Klicken Sie auf Worker-Logs, um die Ausgabelogs des Änderungsstreams zu überwachen.
Streamschreibvorgänge
Verwenden Sie die Methode
cbt
-Befehlszeile
eine Reihe von Anhörungen für verschiedene Nutzende zu schreiben,
der Tabelle song-rank
. Dies ist darauf ausgelegt, über ein paar Minuten zu schreiben, um
den der Song im Laufe der Zeit anhört.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
Ausgabe ansehen
Lesen Sie die Ausgabe in Cloud Storage, um die beliebtesten Titel zu sehen.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Beispielausgabe:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.
Projekt löschen
Google Cloud-Projekt löschen:
gcloud projects delete PROJECT_ID
Einzelne Ressourcen löschen
Löschen Sie den Bucket und die Dateien.
gcloud storage rm --recursive gs://BUCKET_NAME/
Deaktivieren Sie den Änderungsstream für die Tabelle.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
Löschen Sie die Tabelle
song-rank
.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
Halten Sie die Änderungsstreampipeline an.
Lassen Sie die Jobs auflisten, um die Job-ID zu erhalten.
gcloud dataflow jobs list --region=BIGTABLE_REGION
Brechen Sie den Job ab.
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
Ersetzen Sie JOB_ID durch die Job-ID, die nach dem vorherigen Befehl angezeigt wird.