Vorlage „Textdateien in Cloud Storage für Pub/Sub (Stream)“

Diese Vorlage erstellt eine Streamingpipeline, die kontinuierlich nach neuen Textdateien sucht, die in Cloud Storage hochgeladen wurden, jede Datei Zeile für Zeile liest und Strings in einem Pub/Sub-Thema veröffentlicht. Die Vorlage veröffentlicht Datensätze aus einer JSON-Datei mit Zeilenumbruch oder einer CSV-Datei zur Echtzeitverarbeitung in einem Pub/Sub-Thema. Sie können diese Vorlage verwenden, um Daten in Pub/Sub wiederzugeben.

Die Pipeline wird auf unbestimmte Zeit ausgeführt und muss über eine „Cancel”-Anweisung und nicht durch einen „Drain” beendet werden. Die Verwendung der „Watch”-Transformation, bei der es sich um ein SplittableDoFn handelt, die kein Draining unterstützt.

Derzeit ist das Abfrageintervall fest auf zehn Sekunden festgelegt. Diese Vorlage legt keinen Zeitstempel für die einzelnen Datensätze fest, sodass die Ereigniszeit der Veröffentlichungszeit während der Ausführung entspricht. Wenn Ihre Pipeline für die Verarbeitung eine korrekte Ereigniszeit benötigt, sollten Sie diese Pipeline nicht verwenden.

Pipelineanforderungen

  • Die Eingabedateien müssen im JSON-Format mit Zeilenumbruch oder im CSV-Format vorliegen. Datensätze, die sich über mehrere Zeilen in den Quelldateien erstrecken, können zu Problemen in den nachgelagerten Prozessen führen, da jede Zeile in den Dateien als eine Nachricht an Pub/Sub veröffentlicht wird.
  • Das Pub/Sub-Thema muss vor der Ausführung vorhanden sein.
  • Die Pipeline wird unbefristet ausgeführt und muss manuell beendet werden.

Vorlagenparameter

Erforderliche Parameter

  • inputFilePattern: Das Muster der Eingabedatei, aus der gelesen werden soll. Beispiel: gs://bucket-name/files/*.json.
  • outputTopic: Das Pub/Sub-Eingabethema, in das geschrieben werden soll. Der Name muss das Format projects/<PROJECT_ID>/topics/<TOPIC_NAME> haben. Beispiel: projects/your-project-id/topics/your-topic-name

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Text Files on Cloud Storage to Pub/Sub (Stream) templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Optional: Wählen Sie Mindestens einmal aus, um von der genau einmaligen Verarbeitung zum Mindestens einmal-Streamingmodus zu wechseln.
  8. Klicken Sie auf Job ausführen.

gcloud

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Ersetzen Sie Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TOPIC_NAME: Der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILE_PATTERN: das Glob-Dateimuster, aus dem im Cloud Storage-Bucket gelesen werden soll (z. B. path/*.csv)

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • TOPIC_NAME: Der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
  • FILE_PATTERN: das Glob-Dateimuster, aus dem im Cloud Storage-Bucket gelesen werden soll (z. B. path/*.csv)

Nächste Schritte