Plantilla de Pub/Sub a archivos Avro en Cloud Storage

La plantilla de Pub/Sub a archivos de Avro en Cloud Storage es una canalización de transmisión que lee datos de un tema de Pub/Sub y escribe archivos de Avro en el bucket de Cloud Storage especificado.

Requisitos de la canalización

  • El tema de entrada de Pub/Sub debe existir antes de la ejecución de la canalización.

Parámetros de la plantilla

Parámetros obligatorios

  • inputTopic: El tema de Pub/Sub al que se suscribirán para el consumo de mensajes. El nombre del tema debe estar en formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • outputDirectory: Es el directorio de salida en el que se archivan los archivos Avro de salida. Debe contener / al final. Por ejemplo: gs://example-bucket/example-directory/.
  • avroTempDirectory: Es el directorio para los archivos Avro temporales. Debe contener / al final. Por ejemplo: gs://example-bucket/example-directory/.

Parámetros opcionales

  • outputFilenamePrefix: Es el prefijo del nombre de archivo de salida para los archivos Avro. La configuración predeterminada es: output.
  • outputFilenameSuffix: Es el sufijo del nombre de archivo de salida para los archivos Avro. La configuración predeterminada es vacía.
  • outputShardTemplate: La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización usa una única fragmentación de salida para el sistema de archivos dentro de cada ventana. Por lo tanto, todos los datos se envían a un solo archivo por ventana. El valor predeterminado outputShardTemplate es to W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de shards. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate es 00-of-01.
  • yearPattern: Es el patrón para dar formato al año. Debe ser uno o más caracteres y o Y. Las mayúsculas y minúsculas no marcan diferencia en el año. De manera opcional, une el patrón con caracteres que no sean alfanuméricos o el carácter de directorio (/). El valor predeterminado es YYYY.
  • monthPattern: Es el patrón para dar formato al mes. Debe ser uno o más caracteres de M. De manera opcional, une el patrón con caracteres que no sean alfanuméricos o el carácter de directorio (/). La configuración predeterminada es MM.
  • dayPattern: Es el patrón para dar formato al día. Debe ser uno o más de d para el día del mes o D para el día del año. De manera opcional, une el patrón con caracteres que no sean alfanuméricos o el carácter de directorio (/). El valor predeterminado es dd.
  • hourPattern: Es el patrón para dar formato a la hora. Debe ser uno o más caracteres de H. De manera opcional, une el patrón con caracteres que no sean alfanuméricos o el carácter de directorio (/). La configuración predeterminada es HH.
  • minutePattern: Es el patrón para dar formato al minuto. Debe ser uno o más caracteres de m. De manera opcional, une el patrón con caracteres que no sean alfanuméricos o el carácter de directorio (/). La configuración predeterminada es mm.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Avro Files on Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
  • FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
  • SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
  • FILENAME_PREFIX: Es el prefijo del nombre de archivo de salida que prefieras.
  • FILENAME_SUFFIX: Es el sufijo del nombre de archivo de salida que prefieras.
  • SHARD_TEMPLATE: Es la plantilla de fragmentación de salida que prefieras.
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.avro.AvroPubsubMessageRecord;
import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToAvro.Options;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;

/**
 * This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
 * windowed Avro files at the specified output directory.
 *
 * <p>Files output will have the following schema:
 *
 * <pre>
 *   {
 *      "type": "record",
 *      "name": "AvroPubsubMessageRecord",
 *      "namespace": "com.google.cloud.teleport.avro",
 *      "fields": [
 *        {"name": "message", "type": {"type": "array", "items": "bytes"}},
 *        {"name": "attributes", "type": {"type": "map", "values": "string"}},
 *        {"name": "timestamp", "type": "long"}
 *      ]
 *   }
 * </pre>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_PubSub_to_Avro.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_PubSub_to_Avro",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Avro Files on Cloud Storage",
    description =
        "The Pub/Sub to Avro files on Cloud Storage template is a streaming pipeline that reads data from a Pub/Sub "
            + "topic and writes Avro files into the specified Cloud Storage bucket.",
    optionsClass = Options.class,
    skipOptions = "inputSubscription",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-avro",
    contactInformation = "https://cloud.google.com/support",
    requirements = {"The input Pub/Sub topic must exist prior to pipeline execution."},
    streaming = true,
    supportsAtLeastOnce = true)
public class PubsubToAvro {

  /**
   * Options supported by the pipeline.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {
    @TemplateParameter.PubsubSubscription(
        order = 1,
        groupName = "Source",
        description = "Pub/Sub input subscription",
        helpText = "The Pub/Sub subscription to read the input from.",
        example = "projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>")
    ValueProvider<String> getInputSubscription();

    void setInputSubscription(ValueProvider<String> value);

    @TemplateParameter.PubsubTopic(
        order = 2,
        groupName = "Source",
        description = "Pub/Sub input topic",
        helpText =
            "The Pub/Sub topic to subscribe to for message consumption. The topic name must be in the format `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.")
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @TemplateCreationParameter(value = "false")
    @Description(
        "This determines whether the template reads from " + "a pub/sub subscription or a topic")
    @Default.Boolean(false)
    Boolean getUseSubscription();

    void setUseSubscription(Boolean value);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        groupName = "Target",
        description = "Output file directory in Cloud Storage",
        helpText =
            "The output directory where output Avro files are archived. Must contain `/` at the end. For example: `gs://example-bucket/example-directory/`")
    @Required
    ValueProvider<String> getOutputDirectory();

    void setOutputDirectory(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        optional = true,
        description = "Output filename prefix of the files to write",
        helpText = "The output filename prefix for the Avro files.",
        regexes = "^[a-zA-Z\\-]+$")
    @Default.String("output")
    ValueProvider<String> getOutputFilenamePrefix();

    void setOutputFilenamePrefix(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 6,
        groupName = "Target",
        optional = true,
        description = "Output filename suffix of the files to write",
        helpText = "The output filename suffix for the Avro files.")
    @Default.String("")
    ValueProvider<String> getOutputFilenameSuffix();

    void setOutputFilenameSuffix(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 7,
        description = "Temporary Avro write directory",
        helpText =
            "The directory for temporary Avro files. Must contain `/` at the end. For example: `gs://example-bucket/example-directory/`.")
    @Required
    ValueProvider<String> getAvroTempDirectory();

    void setAvroTempDirectory(ValueProvider<String> value);
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    PCollection<PubsubMessage> messages = null;

    /*
     * Steps:
     *   1) Read messages from PubSub
     *   2) Window the messages into minute intervals specified by the executor.
     *   3) Output the windowed data into Avro files, one per window by default.
     */

    if (options.getUseSubscription()) {
      messages =
          pipeline.apply(
              "Read PubSub Events",
              PubsubIO.readMessagesWithAttributes()
                  .fromSubscription(options.getInputSubscription()));
    } else {
      messages =
          pipeline.apply(
              "Read PubSub Events",
              PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()));
    }
    messages
        .apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
        .apply(
            options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))

        // Apply windowed file writes. Use a NestedValueProvider because the filename
        // policy requires a resourceId generated from the input value at runtime.
        .apply(
            "Write File(s)",
            AvroIO.write(AvroPubsubMessageRecord.class)
                .to(
                    WindowedFilenamePolicy.writeWindowedFiles()
                        .withOutputDirectory(options.getOutputDirectory())
                        .withOutputFilenamePrefix(options.getOutputFilenamePrefix())
                        .withShardTemplate(options.getOutputShardTemplate())
                        .withSuffix(options.getOutputFilenameSuffix())
                        .withYearPattern(options.getYearPattern())
                        .withMonthPattern(options.getMonthPattern())
                        .withDayPattern(options.getDayPattern())
                        .withHourPattern(options.getHourPattern())
                        .withMinutePattern(options.getMinutePattern()))
                .withTempDirectory(
                    NestedValueProvider.of(
                        options.getAvroTempDirectory(),
                        (SerializableFunction<String, ResourceId>)
                            input -> FileBasedSink.convertToFileResourceIfPossible(input)))
                /*.withTempDirectory(FileSystems.matchNewResource(
                options.getAvroTempDirectory(),
                Boolean.TRUE))
                */
                .withWindowedWrites()
                .withNumShards(options.getNumShards()));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  /**
   * Converts an incoming {@link PubsubMessage} to the {@link AvroPubsubMessageRecord} class by
   * copying its fields and the timestamp of the message.
   */
  static class PubsubMessageToArchiveDoFn extends DoFn<PubsubMessage, AvroPubsubMessageRecord> {
    @ProcessElement
    public void processElement(ProcessContext context) {
      PubsubMessage message = context.element();
      context.output(
          new AvroPubsubMessageRecord(
              message.getPayload(), message.getAttributeMap(), context.timestamp().getMillis()));
    }
  }
}

¿Qué sigue?