Pub/Sub Lite-Nachrichten mit Dataflow streamen
Alternativ zum Schreiben und Ausführen eigener Datenverarbeitungsprogramme können Sie Dataflow mit dem Pub/Sub Lite-E/A-Connector für Apache Beam verwenden. Dataflow ist ein vollständig verwalteter Dienst zum Transformieren und Anreichern von Daten im Stream-Modus (Echtzeit) und im Batchmodus mit gleicher Zuverlässigkeit und Aussagekraft. Dataflow führt zuverlässig Programme aus, die mit dem Apache Beam SDK entwickelt wurden, das einen erweiterbaren Satz leistungsstarker zustandsorientierter Verarbeitungsabstraktionen sowie E/A-Connectors zu anderen Streaming- und Batchsystemen enthält.
In dieser Kurzanleitung wird beschrieben, wie Sie eine Apache Beam-Pipeline schreiben, die folgende Aufgaben ausführt:
- Nachrichten aus Pub/Sub Lite lesen
- Windowing (oder Gruppieren) von Nachrichten nach Veröffentlichungszeitstempel
- Nachrichten in Cloud Storage schreiben
Außerdem erfahren Sie, wie Sie:
- Die Pipeline zur Ausführung in Dataflow senden
- Eine Flexible Dataflow-Vorlage aus einer Pipeline erstellen
Für diese Anleitung ist Maven erforderlich. Das Beispiel kann jedoch auch konvertiert werden, von Maven zu Gradle. Weitere Informationen finden Sie unter Optional: Von Maven zu Gradle konvertieren.
Hinweise
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Pub/Sub Lite-Projekt einrichten
Erstellen Sie Variablen für Ihren Cloud Storage-Bucket, Ihr Projekt und Dataflow-Region. Cloud Storage-Bucket-Namen müssen global eindeutig sein. Die Dataflow-Region muss eine gültige Region sein, in der Sie Ihren Job ausführen können. Weitere Informationen zu Regionen und Standorten finden Sie unter Dataflow-Standorte.
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
Erstellen Sie einen Cloud Storage-Bucket, der zu diesem Projekt gehört:
gcloud storage buckets create gs://$BUCKET
Pub/Sub Lite-Zonen-Lite-Thema und -Abo erstellen
Erstellen Sie ein zonales Lite-Pub/Sub Lite-Thema und ein Lite-Abo.
Wählen Sie für den Lite-Standort einen unterstützten Pub/Sub Lite-Standort aus. Außerdem müssen Sie
eine Zone für die Region angeben. Beispiel: us-central1-a
.
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Nachrichten an Dataflow streamen
Beispielcode der Kurzanleitung herunterladen
Klonen Sie das Schnellstart-Repository und rufen Sie das Verzeichnis mit dem Beispielcode auf.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
Beispielcode
In diesem Beispielcode wird Dataflow für Folgendes verwendet:
- Nachrichten aus einem Pub/Sub Lite-Abo als unbegrenzte Quelle lesen.
- Gruppieren Sie Nachrichten anhand ihres Veröffentlichungszeitstempels, indem Sie feste Zeitfenster und die Standardtrigger.
Schreiben Sie die gruppierten Nachrichten in Dateien in Cloud Storage.
Java
Bevor Sie dieses Beispiel ausführen, folgen Sie den Schritten zur Einrichtung von Java in Pub/Sub Lite-Clientbibliotheken.
Dataflow-Pipeline starten
Führen Sie den folgenden Befehl aus, um die Pipeline in Dataflow zu starten:
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
Mit dem vorherigen Befehl wird ein Dataflow-Job gestartet. Folgen Sie dem Link. in die Konsolenausgabe ein, um auf den Job in Dataflow zuzugreifen. Monitoring-Konsole.
Jobfortschritt beobachten
Beobachten Sie den Fortschritt des Jobs in der Dataflow-Konsole.
Öffnen Sie die Ansicht mit den Auftragsdetails, um Folgendes zu sehen:
- Jobgrafik
- Ausführungsdetails
- Jobmesswerte
Veröffentlichen Sie einige Nachrichten in Ihrem Lite-Thema.
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
Es kann einige Minuten dauern, bis die Nachrichten in den Worker-Logs angezeigt werden.
Überprüfen Sie mit dem folgenden Befehl, in welche Dateien Cloud Storage
gcloud storage ls "gs://$BUCKET/samples/"
Die Ausgabe sollte so aussehen:
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
Verwenden Sie den folgenden Befehl, um den Inhalt einer Datei aufzurufen:
gcloud storage cat "gs://$BUCKET/samples/your-filename"
Optional: Dataflow-Vorlage erstellen
Sie können optional eine benutzerdefinierte flexible Dataflow-Vorlage auf Basis Ihrer zu erstellen. Mit Dataflow-Vorlagen können Sie Jobs mit unterschiedlichen Eingabeparameter aus der Google Cloud Console oder der Befehlszeile eine vollständige Java-Entwicklungsumgebung einrichten.
Erstellen Sie eine Fat-JAR-Datei, die alle Abhängigkeiten Ihrer Pipeline enthält. Ich Nach dem Befehl sollte
target/pubsublite-streaming-bundled-1.0.jar
angezeigt werden. ausgeführt wurden.mvn clean package -DskipTests=true
Geben Sie Namen und Speicherorte für die Vorlagendatei und den Vorlagencontainer an. Bild.
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
Erstellen Sie eine benutzerdefinierte Flex-Vorlage. Eine erforderliche
metadata.json
-Datei mit der erforderlichen Spezifikation zum Ausführen des Jobs.gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
Führen Sie einen Job mit der benutzerdefinierten Flex-Vorlage aus.
Console
Geben Sie einen Jobnamen ein.
Geben Sie Ihre Dataflow-Region ein.
Wählen Sie Ihre benutzerdefinierte Vorlage aus.
Geben Sie den Vorlagenpfad ein.
Geben Sie die erforderlichen Parameter ein.
Klicken Sie auf Job ausführen.
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
Bereinigen
Löschen Sie das Google Cloud-Projekt mit den Ressourcen, damit Ihrem Google Cloud-Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.
Beenden Sie den Job in der Dataflow-Konsole. Brechen Sie die Pipeline ab, anstatt sie zu leeren.
Löschen Sie das Thema und das Abo.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Löschen Sie die von der Pipeline erstellten Dateien.
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
Löschen Sie das Vorlagen-Image und die Vorlagendatei, sofern vorhanden.
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
Entfernen Sie den Cloud Storage-Bucket.
gcloud storage rm gs://$BUCKET --recursive
-
Löschen Sie das Dienstkonto:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke