Nachrichten aus Pub/Sub mit Dataflow streamen
Dataflow ist ein vollständig verwalteter Dienst zum Transformieren und Anreichern von Daten im Stream- (Echtzeit-) und Batchmodus mit gleicher Zuverlässigkeit und Aussagekraft. Es bietet eine vereinfachte Pipeline-Entwicklungsumgebung mit dem Apache Beam SDK, das eine Vielzahl von Windowing- und Sitzungsanalyse-Primitiven sowie ein Ökosystem von Quell- und Sink-Connectors bietet. In diesem Schnellstart erfahren Sie, wie Sie Dataflow für Folgendes verwenden:
- Nachrichten lesen, die in einem Pub/Sub-Thema veröffentlicht wurden
- Windowing (oder Gruppieren) von Nachrichten nach Zeitstempel
- Nachrichten in Cloud Storage schreiben
Dieser Schnellstart bietet eine Einführung in die Verwendung von Dataflow in Java und Python. SQL wird ebenfalls unterstützt. Diese Kurzanleitung wird auch als Google Cloud Skills Boost-Anleitung mit temporären Anmeldedaten für den Einstieg angeboten.
Sie können auch UI-basierte Dataflow-Vorlagen verwenden, wenn Sie keine benutzerdefinierte Datenverarbeitung durchführen möchten.
Hinweis
- Melden Sie sich bei Ihrem Google Cloud-Konto an. Wenn Sie mit Google Cloud noch nicht vertraut sind, erstellen Sie ein Konto, um die Leistungsfähigkeit unserer Produkte in der Praxis sehen und bewerten zu können. Neukunden erhalten außerdem ein Guthaben von 300 $, um Arbeitslasten auszuführen, zu testen und bereitzustellen.
- Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Google Cloud-Projekt erstellen oder auswählen.
-
Cloud-Projekt erstellen:
gcloud projects create PROJECT_ID
-
Wählen Sie das von Ihnen erstellte Cloud-Projekt aus:
gcloud config set project PROJECT_ID
-
-
Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.
-
Aktivieren Sie die Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Richten Sie die Authentifizierung ein:
-
Erstellen Sie das Dienstkonto:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Ersetzen Sie
SERVICE_ACCOUNT_NAME
mit einem Namen für das Dienstkonto. -
Weisen Sie dem Dienstkonto Rollen zu. Führen Sie den Befehl
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
für jede der folgenden IAM-Rollen einmal aus:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Dabei gilt:
SERVICE_ACCOUNT_NAME
: der Name des DienstkontosPROJECT_ID
: die Projekt-ID, unter der Sie das Dienstkonto erstellt habenROLE
: die zu gewährende Rolle
-
Gewähren Sie Ihrem Google-Konto eine Rolle, mit der Sie die Rollen des Dienstkontos verwenden und das Dienstkonto an andere Ressourcen anhängen können:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Dabei gilt:
SERVICE_ACCOUNT_NAME
: der Name des DienstkontosPROJECT_ID
: die Projekt-ID, unter der Sie das Dienstkonto erstellt habenUSER_EMAIL
: E-Mail-Adresse Ihres Google-Kontos
-
- Installieren Sie die Google Cloud CLI.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Google Cloud-Projekt erstellen oder auswählen.
-
Cloud-Projekt erstellen:
gcloud projects create PROJECT_ID
-
Wählen Sie das von Ihnen erstellte Cloud-Projekt aus:
gcloud config set project PROJECT_ID
-
-
Die Abrechnung für das Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.
-
Aktivieren Sie die Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Richten Sie die Authentifizierung ein:
-
Erstellen Sie das Dienstkonto:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Ersetzen Sie
SERVICE_ACCOUNT_NAME
mit einem Namen für das Dienstkonto. -
Weisen Sie dem Dienstkonto Rollen zu. Führen Sie den Befehl
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
für jede der folgenden IAM-Rollen einmal aus:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Dabei gilt:
SERVICE_ACCOUNT_NAME
: der Name des DienstkontosPROJECT_ID
: die Projekt-ID, unter der Sie das Dienstkonto erstellt habenROLE
: die zu gewährende Rolle
-
Gewähren Sie Ihrem Google-Konto eine Rolle, mit der Sie die Rollen des Dienstkontos verwenden und das Dienstkonto an andere Ressourcen anhängen können:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Dabei gilt:
SERVICE_ACCOUNT_NAME
: der Name des DienstkontosPROJECT_ID
: die Projekt-ID, unter der Sie das Dienstkonto erstellt habenUSER_EMAIL
: E-Mail-Adresse Ihres Google-Kontos
-
-
Erstellen Sie Anmeldedaten zur Authentifizierung für Ihr Google-Konto:
gcloud auth application-default login
Pub/Sub-Projekt einrichten
-
Erstellen Sie Variablen für Ihren Bucket, Ihr Projekt und Ihre Region. Cloud Storage-Bucket-Namen müssen global eindeutig sein. Wählen Sie eine Dataflow-Region in der Nähe des Standorts, an dem die Befehle in dieser Kurzanleitung ausgeführt werden. Der Wert der Variablen
REGION
muss ein gültiger Regionsname sein. Weitere Informationen zu Regionen und Standorten finden Sie unter Dataflow-Standorte.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
Erstellen Sie einen Cloud Storage-Bucket, der zu diesem Projekt gehört:
gsutil mb gs://$BUCKET_NAME
-
Erstellen Sie ein Pub/Sub-Thema in diesem Projekt:
gcloud pubsub topics create $TOPIC_ID
-
Erstellen Sie einen Cloud Scheduler-Job in diesem Projekt. Der Job veröffentlicht eine Nachricht zu einem Pub/Sub-Thema in Intervallen von einer Minute.
Wenn für das Projekt keine App Engine-Anwendung vorhanden ist, wird in diesem Schritt eine erstellt.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Starten Sie den Job.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Verwenden Sie die folgenden Befehle, um das Schnellstart-Repository zu klonen und zum Beispielcodeverzeichnis zu gehen:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Streamen Sie Nachrichten von Pub/Sub zu Cloud Storage
Codebeispiel
In diesem Beispielcode wird Dataflow für Folgendes verwendet:
- Pub/Sub-Nachrichten lesen
- Windowing (oder Gruppieren) von Nachrichten in festen Intervallen nach Veröffentlichungszeitstempeln.
Die Nachrichten in jedem Fenster in Dateien in Cloud Storage schreiben.
Java
Python
Pipeline starten
Führen Sie den folgenden Befehl aus, um die Pipeline zu starten:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
Der vorherige Befehl wird lokal ausgeführt und startet einen Dataflow-Job, der in der Cloud ausgeführt wird. Wenn der Befehl JOB_MESSAGE_DETAILED: Workers
have started successfully
zurückgibt, beenden Sie das lokale Programm mit Ctrl+C
.
Job- und Pipeline-Fortschritt beobachten
Sie können den Fortschritt des Jobs in der Dataflow-Konsole verfolgen.
Öffnen Sie die Ansicht mit den Auftragsdetails, um Folgendes zu sehen:
- Jobstruktur
- Jobprotokolle
- Anzeigebereich-Messwerte
Es kann einige Minuten dauern, bis die Ausgabedateien in Cloud Storage angezeigt werden.
Alternativ können Sie die folgende Befehlszeile verwenden, um zu prüfen, welche Dateien geschrieben wurden.
gsutil ls gs://${BUCKET_NAME}/samples/
Die Ausgabe sollte so aussehen:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Bereinigen
Löschen Sie das Cloud-Projekt mit den Ressourcen, damit Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.
Cloud Scheduler-Job löschen
gcloud scheduler jobs delete publisher-job --location=$REGION
Beenden Sie den Job in der Dataflow-Konsole. Brechen Sie die Pipeline ab, ohne sie zu leeren.
Thema löschen
gcloud pubsub topics delete $TOPIC_ID
Löschen Sie die von der Pipeline erstellten Dateien.
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
Entfernen Sie den Cloud Storage-Bucket.
gsutil rb gs://${BUCKET_NAME}
-
Löschen Sie das Dienstkonto:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Widerrufen Sie die von Ihnen erstellten Anmeldedaten für die Authentifizierung und löschen Sie die lokale Datei mit den Anmeldedaten:
gcloud auth application-default revoke
-
Optional: Widerrufen Sie Anmeldedaten von der gcloud-CLI.
gcloud auth revoke
Nächste Schritte
Wenn Sie Pub/Sub-Nachrichten nach einem benutzerdefinierten Zeitstempel als Fenster darstellen möchten, können Sie den Zeitstempel als Attribut in der Pub/Sub-Nachricht angeben und dann den benutzerdefinierten Zeitstempel mit PubsubIOs's verwenden
withTimestampAttribute
Sehen Sie sich die Open-Source-Dataflow-Vorlagen von Google für Streaming an.
Weitere Informationen zur Dataflow-Integration in Pub/Sub
Beachten Sie diese Anleitung zu Lesevorgängen aus Pub/Sub und Schreibvorgängen in BigQuery mit Dataflow Flex-Vorlagen.
Weitere Informationen zum Windowing finden Sie im Beispiel Apache Beam Mobile Gaming Pipeline.