Diese Vorlage erstellt eine Streamingpipeline, die kontinuierlich nach neuen Textdateien sucht, die in Cloud Storage hochgeladen wurden, jede Datei Zeile für Zeile liest und Strings in einem Pub/Sub-Thema veröffentlicht. Die Vorlage veröffentlicht Datensätze aus einer JSON-Datei mit Zeilenumbruch oder einer CSV-Datei zur Echtzeitverarbeitung in einem Pub/Sub-Thema. Sie können diese Vorlage verwenden, um Daten in Pub/Sub wiederzugeben.
Die Pipeline wird auf unbestimmte Zeit ausgeführt und muss über eine „Cancel”-Anweisung und nicht durch einen „Drain” beendet werden. Die Verwendung der „Watch”-Transformation, bei der es sich um ein SplittableDoFn handelt, die kein Draining unterstützt.
Derzeit ist das Abfrageintervall fest auf zehn Sekunden festgelegt. Diese Vorlage legt keinen Zeitstempel für die einzelnen Datensätze fest, sodass die Ereigniszeit der Veröffentlichungszeit während der Ausführung entspricht. Wenn Ihre Pipeline für die Verarbeitung eine korrekte Ereigniszeit benötigt, sollten Sie diese Pipeline nicht verwenden.
Pipelineanforderungen
- Die Eingabedateien müssen im JSON-Format mit Zeilenumbruch oder im CSV-Format vorliegen. Datensätze, die sich über mehrere Zeilen in den Quelldateien erstrecken, können zu Problemen in den nachgelagerten Prozessen führen, da jede Zeile in den Dateien als eine Nachricht an Pub/Sub veröffentlicht wird.
- Das Pub/Sub-Thema muss vor der Ausführung vorhanden sein.
- Die Pipeline wird unbefristet ausgeführt und muss manuell beendet werden.
Vorlagenparameter
Erforderliche Parameter
- inputFilePattern: Das Muster der Eingabedatei, aus der gelesen werden soll. Beispiel:
gs://bucket-name/files/*.json
. - outputTopic: Das Pub/Sub-Eingabethema, in das geschrieben werden soll. Der Name muss das Format
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
haben. Beispiel:projects/your-project-id/topics/your-topic-name
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Text Files on Cloud Storage to Pub/Sub (Stream) templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \ --region REGION_NAME\ --staging-location STAGING_LOCATION\ --parameters \ inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME
Ersetzen Sie dabei Folgendes:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
STAGING_LOCATION
: der Speicherort für das Staging lokaler Dateien (z. B.gs://your-bucket/staging
)TOPIC_NAME
: Der Name Ihres Pub/Sub-ThemasBUCKET_NAME
: der Name Ihres Cloud Storage-BucketsFILE_PATTERN
: das Glob-Dateimuster, aus dem im Cloud Storage-Bucket gelesen werden soll (z. B.path/*.csv
)
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" } }
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
STAGING_LOCATION
: der Speicherort für das Staging lokaler Dateien (z. B.gs://your-bucket/staging
)TOPIC_NAME
: Der Name Ihres Pub/Sub-ThemasBUCKET_NAME
: der Name Ihres Cloud Storage-BucketsFILE_PATTERN
: das Glob-Dateimuster, aus dem im Cloud Storage-Bucket gelesen werden soll (z. B.path/*.csv
)
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.