Dataflow ist ein vollständig verwalteter Dienst zum Transformieren und Anreichern von Daten im Stream- (Echtzeit-) und Batchmodus mit gleicher Zuverlässigkeit und Aussagekraft. Es bietet eine vereinfachte Pipeline-Entwicklungsumgebung mit dem Apache Beam SDK, das eine Vielzahl von Windowing- und Sitzungsanalyse-Primitiven sowie ein Ökosystem von Quell- und Sink-Connectors bietet. In diesem Schnellstart erfahren Sie, wie Sie Dataflow für Folgendes verwenden:
- Nachrichten lesen, die in einem Pub/Sub-Thema veröffentlicht wurden
- Windowing (oder Gruppieren) von Nachrichten nach Zeitstempel
- Nachrichten in Cloud Storage schreiben
Dieser Schnellstart bietet eine Einführung in die Verwendung von Dataflow in Java und Python. SQL wird ebenfalls unterstützt.
Sie können auch UI-basierte Dataflow-Vorlagen verwenden, wenn Sie keine benutzerdefinierte Datenverarbeitung durchführen möchten.
Hinweis
- Befolgen Sie die Anleitungen zum Installieren und Initialisieren des Cloud SDK.
- Aktivieren Sie die Abrechnung für Ihr Projekt.
Um diesen Schnellstart abzuschließen, müssen Sie die folgenden APIs aktivieren: Compute Engine, Google Cloud Operations Suite, Cloud Storage, Cloud Storage JSON, Pub/Sub, Cloud Scheduler, Resource Manager und App Engine.
Es kann einen Moment dauern, bis die APIs in der Konsole angezeigt werden.
Dienstkontoschlüssel erstellen
Dienstkontoschlüssel erstellen
- Wählen Sie aus der Liste Dienstkonto die Option Neues Dienstkonto aus.
- Geben Sie im Feld Name des Dienstkontos einen Namen ein.
- Wählen Sie in der Liste Rolle die Option Projekt > Inhaber aus.
- Klicken Sie auf Erstellen.
Der Schlüssel wird an den Standardordner für Downloads Ihres Browsers gesendet.
Setzen Sie die Umgebungsvariable
GOOGLE_APPLICATION_CREDENTIALS
so, dass sie auf den Dienstkontoschlüssel verweist.export GOOGLE_APPLICATION_CREDENTIALS=path/to/my/credentials.json
Erstellen Sie Variablen für Ihren Bucket, Ihr Projekt und Ihre Region. Cloud Storage-Bucket-Namen müssen global eindeutig sein. Wählen Sie eine Dataflow-Region aus, in der Sie die Befehle in dieser Kurzanleitung ausführen.
BUCKET_NAME=BUCKET_NAME PROJECT_NAME=$(gcloud config get-value project) REGION=DATAFLOW_REGION
Erstellen Sie einen Cloud Storage-Bucket, der zu diesem Projekt gehört:
gsutil mb gs://$BUCKET_NAME
Erstellen Sie ein Pub/Sub-Thema in diesem Projekt:
gcloud pubsub topics create cron-topic
Erstellen Sie einen Cloud Scheduler-Job in diesem Projekt. Der Job veröffentlicht eine Nachricht zu einem Cloud Pub/Sub-Thema in Intervallen von einer Minute.
Wenn für das Projekt keine App Engine-Anwendung vorhanden ist, wird bei diesem Schritt eine erstellt.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=cron-topic --message-body="Hello!"
Starten Sie den Job.
gcloud scheduler jobs run publisher-job
Verwenden Sie den folgenden Befehl, um das Schnellstart-Repository zu klonen und zum Beispielcodeverzeichnis zu gehen:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Streamen Sie Nachrichten von Pub/Sub zu Cloud Storage
Codebeispiel
In diesem Beispielcode wird Dataflow für Folgendes verwendet:
- Pub/Sub-Nachrichten lesen
- Windowing (oder Gruppieren) von Nachrichten in festen Intervallen nach Veröffentlichungszeitstempeln.
Die Nachrichten in jedem Fenster in Dateien in Cloud Storage schreiben.
Java
Python
Pipeline starten
Führen Sie den folgenden Befehl aus, um die Pipeline zu starten:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_NAME \ --region=$REGION \ --inputTopic=projects/$PROJECT_NAME/topics/cron-topic \ --output=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --windowSize=2"
Python
python PubSubToGCS.py \ --project=$PROJECT_NAME \ --region=$REGION \ --input_topic=projects/$PROJECT_NAME/topics/cron-topic \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --temp_location=gs://$BUCKET_NAME/temp
Der vorherige Befehl wird lokal ausgeführt und startet einen Dataflow-Job, der in der Cloud ausgeführt wird. Wenn der Befehl JOB_MESSAGE_DETAILED: Workers
have started successfully
zurückgibt, beenden Sie das lokale Programm mit Ctrl+C
.
Job- und Pipeline-Fortschritt beobachten
Sie können den Fortschritt des Jobs in der Dataflow-Konsole verfolgen.
Öffnen Sie die Ansicht mit den Auftragsdetails, um Folgendes zu sehen:
- Jobstruktur
- Jobprotokolle
- Anzeigebereich-Messwerte
Es kann einige Minuten dauern, bis die Ausgabedateien in Cloud Storage angezeigt werden.
Alternativ können Sie die folgende Befehlszeile verwenden, um zu prüfen, welche Dateien geschrieben wurden.
gsutil ls gs://${BUCKET_NAME}/samples/
Die Ausgabe sollte so aussehen:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32 gs://{$BUCKET_NAME}/samples/output-22:32-22:34 gs://{$BUCKET_NAME}/samples/output-22:34-22:36 gs://{$BUCKET_NAME}/samples/output-22:36-22:38
Clean-up
Cloud Scheduler-Job löschen
gcloud scheduler jobs delete publisher-job
Beenden Sie den Job in der Dataflow-Konsole. Brechen Sie die Pipeline ab, ohne sie zu leeren.
Thema löschen
gcloud pubsub topics delete cron-topic
Löschen Sie die von der Pipeline erstellten Dateien.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Entfernen Sie den Cloud Storage-Bucket.
gsutil rb gs://${BUCKET_NAME}
Nächste Schritte
Wenn Sie Pub/Sub-Nachrichten nach einem benutzerdefinierten Zeitstempel als Fenster darstellen möchten, können Sie den Zeitstempel als Attribut in der Pub/Sub-Nachricht angeben und dann den benutzerdefinierten Zeitstempel mit PubsubIOs's verwenden
withTimestampAttribute
Sehen Sie sich die Open-Source-Dataflow-Vorlagen von Google für Streaming an.
Weitere Informationen zur Dataflow-Integration in Pub/Sub
Weitere Informationen zum Windowing finden Sie im Beispiel Apache Beam Mobile Gaming Pipeline.