Vorlage „Cloud Storage Text für Pub/Sub (Batch)“

Diese Vorlage erstellt eine Batchpipeline, die Datensätze aus Textdateien liest, die in Cloud Storage gespeichert sind. Diese Datensätze werden dann in einem Pub/Sub-Thema veröffentlicht. Die Vorlage kann zum Veröffentlichen von Datensätzen in einer JSON-Datei mit Zeilenumbruch oder in einer CSV-Datei für ein Pub/Sub-Thema zur Echtzeitverarbeitung genutzt werden. Sie können diese Vorlage verwenden, um Daten in Pub/Sub wiederzugeben.

Diese Vorlage legt keinen Zeitstempel für die einzelnen Datensätze fest. Die Ereigniszeit entspricht der Veröffentlichungszeit während der Ausführung. Wenn Ihre Pipeline für die Verarbeitung eine korrekte Ereigniszeit benötigt, müssen Sie diese Pipeline nicht verwenden.

Pipelineanforderungen

  • Die zu lesenden Dateien müssen in einem durch Zeilenumbruch getrennten JSON- oder CSV-Format vorliegen. Einträge, die sich über mehrere Zeilen in den Quelldateien erstrecken, können später Probleme verursachen, da jede Zeile in den Dateien als Nachricht an Pub/Sub veröffentlicht wird.
  • Das Pub/Sub-Thema muss vorhanden sein, bevor die Pipeline ausgeführt wird.

Vorlagenparameter

Erforderliche Parameter

  • inputFilePattern: Das Muster der Eingabedatei, aus der gelesen werden soll. Beispiel: gs://bucket-name/files/*.json.
  • outputTopic: Das Pub/Sub-Eingabethema, in das geschrieben werden soll. Der Name muss das Format projects/<PROJECT_ID>/topics/<TOPIC_NAME> haben. Beispiel: projects/your-project-id/topics/your-topic-name

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Text Files on Cloud Storage to Pub/Sub (Batch) templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/files/*.json,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • TOPIC_NAME: Der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/files/*.json",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • TOPIC_NAME: Der Name Ihres Pub/Sub-Themas
  • BUCKET_NAME: der Name Ihres Cloud Storage-Buckets
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.TextToPubsub.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;

/**
 * The {@code TextToPubsub} pipeline publishes records to Cloud Pub/Sub from a set of files. The
 * pipeline reads each file row-by-row and publishes each record as a string message. At the moment,
 * publishing messages with attributes is unsupported.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Cloud_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "GCS_Text_to_Cloud_PubSub",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Storage Text File to Pub/Sub (Batch)",
    description = {
      "This template creates a batch pipeline that reads records from text files stored in Cloud Storage and publishes them to a Pub/Sub topic. "
          + "The template can be used to publish records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. "
          + "You can use this template to replay data to Pub/Sub.\n",
      "This template does not set any timestamp on the individual records. The event time is equal to the publishing time during execution. "
          + "If your pipeline relies on an accurate event time for processing, you must not use this pipeline."
    },
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The files to read need to be in newline-delimited JSON or CSV format. Records spanning multiple lines in the source files might cause issues downstream because each line within the files will be published as a message to Pub/Sub.",
      "The Pub/Sub topic must exist before running the pipeline."
    })
public class TextToPubsub {

  /** The custom options supported by the pipeline. Inherits standard configuration options. */
  public interface Options extends PipelineOptions {
    @TemplateParameter.GcsReadFile(
        order = 1,
        groupName = "Source",
        description = "Cloud Storage Input File(s)",
        helpText = "The input file pattern to read from.",
        example = "gs://bucket-name/files/*.json")
    @Required
    ValueProvider<String> getInputFilePattern();

    void setInputFilePattern(ValueProvider<String> value);

    @TemplateParameter.PubsubTopic(
        order = 2,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText =
            "The Pub/Sub input topic to write to. The name must be in the format `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.",
        example = "projects/your-project-id/topics/your-topic-name")
    @Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> value);
  }

  /**
   * Main entry-point for the pipeline. Reads in the command-line arguments, parses them, and
   * executes the pipeline.
   *
   * @param args Arguments passed in from the command-line.
   */
  public static void main(String[] args) {

    // Parse the user options passed from the command-line
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    run(options);
  }

  /**
   * Executes the pipeline with the provided execution parameters.
   *
   * @param options The execution parameters.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline.
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps:
     *  1) Read from the text source.
     *  2) Write each text record to Pub/Sub
     */
    pipeline
        .apply("Read Text Data", TextIO.read().from(options.getInputFilePattern()))
        .apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));

    return pipeline.run();
  }
}

Nächste Schritte