Modello Testo Cloud Storage in Pub/Sub (batch)

Questo modello crea una pipeline batch che legge i record dai file di testo archiviati in Cloud Storage e li pubblica in un argomento Pub/Sub. Il modello può essere utilizzato per pubblicare in un argomento Pub/Sub i record contenuti in un file delimitato da accapo contenente record JSON o in un file CSV, per l'elaborazione in tempo reale. Puoi utilizzare questo modello per riprodurre i dati in Pub/Sub.

Questo modello non imposta un timestamp nei singoli record. L'ora dell'evento è uguale all'ora di pubblicazione durante l'esecuzione. Se la tua pipeline necessita di un'indicazione accurata sull'ora dell'evento per l'elaborazione, non devi utilizzarla.

Requisiti della pipeline

  • I file da leggere devono essere in formato JSON delimitato da nuova riga o in formato CSV. I record che comprendono più righe nei file di origine possono causare problemi downstream perché ogni riga presente nei file verrà pubblicata come messaggio in Pub/Sub.
  • L'argomento Pub/Sub deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametri obbligatori

  • inputFilePattern: il pattern del file di input da cui leggere. Ad esempio, gs://bucket-name/files/*.json.
  • outputTopic: l'argomento di input Pub/Sub in cui scrivere. Il nome deve essere nel formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>. Ad esempio: projects/your-project-id/topics/your-topic-name.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Text Files on Cloud Storage to Pub/Sub (Batch) template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME \
    --parameters \

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del bucket Cloud Storage

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/files/*.json",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   "environment": { "zone": "us-central1-f" }

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del bucket Cloud Storage
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;

 * The {@code TextToPubsub} pipeline publishes records to Cloud Pub/Sub from a set of files. The
 * pipeline reads each file row-by-row and publishes each record as a string message. At the moment,
 * publishing messages with attributes is unsupported.
 * <p>Check out <a
 * href="">README</a>
 * for instructions on how to use or modify this template.
    name = "GCS_Text_to_Cloud_PubSub",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Storage Text File to Pub/Sub (Batch)",
    description = {
      "This template creates a batch pipeline that reads records from text files stored in Cloud Storage and publishes them to a Pub/Sub topic. "
          + "The template can be used to publish records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. "
          + "You can use this template to replay data to Pub/Sub.\n",
      "This template does not set any timestamp on the individual records. The event time is equal to the publishing time during execution. "
          + "If your pipeline relies on an accurate event time for processing, you must not use this pipeline."
    optionsClass = Options.class,
    documentation =
    contactInformation = "",
    requirements = {
      "The files to read need to be in newline-delimited JSON or CSV format. Records spanning multiple lines in the source files might cause issues downstream because each line within the files will be published as a message to Pub/Sub.",
      "The Pub/Sub topic must exist before running the pipeline."
public class TextToPubsub {

  /** The custom options supported by the pipeline. Inherits standard configuration options. */
  public interface Options extends PipelineOptions {
        order = 1,
        groupName = "Source",
        description = "Cloud Storage Input File(s)",
        helpText = "The input file pattern to read from.",
        example = "gs://bucket-name/files/*.json")
    ValueProvider<String> getInputFilePattern();

    void setInputFilePattern(ValueProvider<String> value);

        order = 2,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText =
            "The Pub/Sub input topic to write to. The name must be in the format `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.",
        example = "projects/your-project-id/topics/your-topic-name")
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> value);

   * Main entry-point for the pipeline. Reads in the command-line arguments, parses them, and
   * executes the pipeline.
   * @param args Arguments passed in from the command-line.
  public static void main(String[] args) {

    // Parse the user options passed from the command-line
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);


   * Executes the pipeline with the provided execution parameters.
   * @param options The execution parameters.
  public static PipelineResult run(Options options) {
    // Create the pipeline.
    Pipeline pipeline = Pipeline.create(options);

     * Steps:
     *  1) Read from the text source.
     *  2) Write each text record to Pub/Sub
        .apply("Read Text Data",
        .apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));


