Plantilla de archivos de texto en Cloud Storage a Pub/Sub (transmisión)

Esta plantilla crea una canalización de transmisión que sondea de forma continua los archivos de texto nuevos subidos a Cloud Storage, lee cada archivo línea por línea y publica strings en un tema de Pub/Sub. La plantilla publica registros en un archivo delimitado por saltos de línea que contiene registros JSON o archivos CSV en un tema de Pub/Sub para su procesamiento en tiempo real. Puedes usar esta plantilla para reproducir datos en Pub/Sub.

La canalización se ejecuta de forma indefinida, y se debe finalizar de forma manual mediante una “cancelación” y no un “drain”, debido a su uso de la transformación “Watch”, que es un “SplittableDoFn” que no admite el desvío.

Actualmente, el intervalo de sondeo es fijo y configurado en 10 segundos. Esta plantilla no establece una marca de tiempo en los registros individuales. Es por eso que la hora del evento es la misma que la hora de publicación durante la ejecución. Si tu canalización depende de una hora de evento precisa para el procesamiento, no debes usar esta canalización.

Requisitos de la canalización

  • Los archivos de entrada deben tener el formato JSON delimitado por saltos de línea o CSV. Los registros que abarcan varias líneas en los archivos de origen pueden causar problemas de bajada, ya que cada línea dentro de los archivos se publica como un mensaje en Pub/Sub.
  • El tema de Pub/Sub debe existir antes de la ejecución.
  • La canalización se ejecuta de forma indefinida, y deberás detenerla de forma manual.

Parámetros de la plantilla

Parámetros obligatorios

  • inputFilePattern: El patrón del archivo de entrada para leer. Por ejemplo, gs://bucket-name/files/*.json
  • outputTopic: El tema de entrada de Pub/Sub en el que se desea escribir. El nombre debe tener el formato de projects/<PROJECT_ID>/topics/<TOPIC_NAME>. Por ejemplo, projects/your-project-id/topics/your-topic-name.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Text Files on Cloud Storage to Pub/Sub (Stream) template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona Al menos una vez.
  8. Haz clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
  • FILE_PATTERN: Es el glob de patrón del archivo que se leerá en el bucket de Cloud Storage (por ejemplo, path/*.csv).

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
  • FILE_PATTERN: Es el glob de patrón del archivo que se leerá en el bucket de Cloud Storage (por ejemplo, path/*.csv).
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.TextToPubsub.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Watch;
import org.joda.time.Duration;

/**
 * The {@code TextToPubsubStream} is a streaming version of {@code TextToPubsub} pipeline that
 * publishes records to Cloud Pub/Sub from a set of files. The pipeline continuously polls for new
 * files, reads them row-by-row and publishes each record as a string message. The polling interval
 * is fixed and equals to 10 seconds. At the moment, publishing messages with attributes is
 * unsupported.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Stream_GCS_Text_to_Cloud_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Stream_GCS_Text_to_Cloud_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Text Files on Cloud Storage to Pub/Sub",
    description = {
      "This template creates a streaming pipeline that continuously polls for new text files uploaded to Cloud Storage, reads each file line by line, and publishes strings to a Pub/Sub topic. "
          + "The template publishes records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. "
          + "You can use this template to replay data to Pub/Sub.\n",
      "The pipeline runs indefinitely and needs to be terminated manually via a <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the <code>Watch</code> transform, which is a <code>SplittableDoFn</code> that does not support draining.\n",
      "Currently, the polling interval is fixed and set to 10 seconds. This template does not set any timestamp on the individual records, so the event time is equal to the publishing time during execution. "
          + "If your pipeline relies on an accurate event time for processing, you should not use this pipeline."
    },
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-pubsub-stream",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "Input files must be in newline-delimited JSON or CSV format. Records that span multiple lines in the source files can cause issues downstream, because each line within the files is published as a message to Pub/Sub.",
      "The Pub/Sub topic must exist prior to execution.",
      "The pipeline runs indefinitely and needs to be terminated manually.",
    },
    streaming = true,
    supportsAtLeastOnce = true,
    supportsExactlyOnce = true)
public class TextToPubsubStream {
  private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10);

  /**
   * Main entry-point for the pipeline. Reads in the command-line arguments, parses them, and
   * executes the pipeline.
   *
   * @param args Arguments passed in from the command-line.
   */
  public static void main(String[] args) {

    // Parse the user options passed from the command-line
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    run(options);
  }

  /**
   * Executes the pipeline with the provided execution parameters.
   *
   * @param options The execution parameters.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline.
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps:
     *  1) Read from the text source.
     *  2) Write each text record to Pub/Sub
     */
    pipeline
        .apply(
            "Read Text Data",
            TextIO.read()
                .from(options.getInputFilePattern())
                .watchForNewFiles(DEFAULT_POLL_INTERVAL, Watch.Growth.never()))
        .apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));

    return pipeline.run();
  }
}

¿Qué sigue?