Modello File di testo su Cloud Storage in Pub/Sub (stream)

Questo modello crea una pipeline di inserimento flussi che esegue continuamente sondaggi per rilevare nuovi file di testo caricati su Cloud Storage, legge ogni file riga per riga e pubblica le stringhe in un argomento Pub/Sub. Il modello pubblica in un argomento Pub/Sub i record contenuti in un file delimitato da accapo contenente record JSON o in un file CSV, per l'elaborazione in tempo reale. Puoi utilizzare questo modello per riprodurre i dati in Pub/Sub.

La pipeline viene eseguita a tempo indeterminato e deve essere terminata manualmente tramite un "cancel" e non un "drain", a causa dell'utilizzo della trasformazione "Watch", che è un "SplittableDoFn" che non supporta il draining.

Al momento, l'intervallo di polling è fisso e impostato su 10 secondi. Questo modello non imposta un timestamp nei singoli record, quindi l'ora dell'evento corrisponde a quella di pubblicazione durante l'esecuzione. Se la tua pipeline si basa su un'ora dell'evento accurata per l'elaborazione, non dovresti usarla.

Requisiti della pipeline

  • I file di input devono essere in formato JSON o CSV delimitato da nuova riga. I record che comprendono più righe nei file di origine possono causare problemi downstream, perché ogni riga presente nei file viene pubblicata come messaggio in Pub/Sub.
  • L'argomento Pub/Sub deve esistere prima dell'esecuzione.
  • La pipeline viene eseguita a tempo indeterminato e deve essere interrotta manualmente.

Parametri del modello

Parametri obbligatori

  • inputFilePattern: il pattern del file di input da cui leggere. Ad esempio, gs://bucket-name/files/*.json.
  • outputTopic: l'argomento di input Pub/Sub in cui scrivere. Il nome deve essere nel formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>. Ad esempio: projects/your-project-id/topics/your-topic-name.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming Almeno una volta, seleziona Almeno una volta.
  8. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • STAGING_LOCATION: la posizione per l'organizzazione in anteprima dei file locali (ad esempio gs://your-bucket/staging)
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del bucket Cloud Storage
  • FILE_PATTERN: il pattern glob dei file da leggere nel bucket Cloud Storage (ad esempio path/*.csv)

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • STAGING_LOCATION: la posizione per l'organizzazione in anteprima dei file locali (ad esempio gs://your-bucket/staging)
  • TOPIC_NAME: il nome dell'argomento Pub/Sub
  • BUCKET_NAME: il nome del bucket Cloud Storage
  • FILE_PATTERN: il pattern glob dei file da leggere nel bucket Cloud Storage (ad esempio path/*.csv)
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.TextToPubsub.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Watch;
import org.joda.time.Duration;

/**
 * The {@code TextToPubsubStream} is a streaming version of {@code TextToPubsub} pipeline that
 * publishes records to Cloud Pub/Sub from a set of files. The pipeline continuously polls for new
 * files, reads them row-by-row and publishes each record as a string message. The polling interval
 * is fixed and equals to 10 seconds. At the moment, publishing messages with attributes is
 * unsupported.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Stream_GCS_Text_to_Cloud_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Stream_GCS_Text_to_Cloud_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Text Files on Cloud Storage to Pub/Sub",
    description = {
      "This template creates a streaming pipeline that continuously polls for new text files uploaded to Cloud Storage, reads each file line by line, and publishes strings to a Pub/Sub topic. "
          + "The template publishes records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. "
          + "You can use this template to replay data to Pub/Sub.\n",
      "The pipeline runs indefinitely and needs to be terminated manually via a <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the <code>Watch</code> transform, which is a <code>SplittableDoFn</code> that does not support draining.\n",
      "Currently, the polling interval is fixed and set to 10 seconds. This template does not set any timestamp on the individual records, so the event time is equal to the publishing time during execution. "
          + "If your pipeline relies on an accurate event time for processing, you should not use this pipeline."
    },
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-pubsub-stream",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "Input files must be in newline-delimited JSON or CSV format. Records that span multiple lines in the source files can cause issues downstream, because each line within the files is published as a message to Pub/Sub.",
      "The Pub/Sub topic must exist prior to execution.",
      "The pipeline runs indefinitely and needs to be terminated manually.",
    },
    streaming = true,
    supportsAtLeastOnce = true,
    supportsExactlyOnce = true)
public class TextToPubsubStream {
  private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10);

  /**
   * Main entry-point for the pipeline. Reads in the command-line arguments, parses them, and
   * executes the pipeline.
   *
   * @param args Arguments passed in from the command-line.
   */
  public static void main(String[] args) {

    // Parse the user options passed from the command-line
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    run(options);
  }

  /**
   * Executes the pipeline with the provided execution parameters.
   *
   * @param options The execution parameters.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline.
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps:
     *  1) Read from the text source.
     *  2) Write each text record to Pub/Sub
     */
    pipeline
        .apply(
            "Read Text Data",
            TextIO.read()
                .from(options.getInputFilePattern())
                .watchForNewFiles(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
        .apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));

    return pipeline.run();
  }
}

Passaggi successivi