Tema de Pub/Sub a archivos de texto en Cloud Storage

La plantilla de Pub/Sub a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.

Requisitos de la canalización

  • El tema de Pub/Sub debe existir antes de la ejecución.
  • Los mensajes publicados en el tema deben tener formato de texto.
  • Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.

Parámetros de la plantilla

Parámetros obligatorios

  • outputDirectory: Es la ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Por ejemplo, gs://bucket-name/path/ El valor debe terminar con una barra.
  • outputFilenamePrefix: Es el prefijo para colocar en cada archivo con ventanas. Por ejemplo, output- La configuración predeterminada es: output.

Parámetros opcionales

  • inputTopic: El tema de Pub/Sub desde el que se lee la entrada. El nombre del tema debe estar en formato projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • userTempLocation: Es el directorio proporcionado por el usuario al que se exportarán los archivos temporales. Debe terminar con una barra.
  • outputFilenameSuffix: Es el sufijo para colocar en cada archivo con ventanas. Por lo general, es una extensión de archivo como .txt o .csv. La configuración predeterminada es vacía.
  • outputShardTemplate: La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización usa una única fragmentación de salida para el sistema de archivos dentro de cada ventana. Por lo tanto, todos los datos se envían a un solo archivo por ventana. El valor predeterminado outputShardTemplate es to W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de shards. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate es 00-of-01.
  • yearPattern: Es el patrón para dar formato al año. Debe ser uno o más caracteres y o Y. Las mayúsculas y minúsculas no marcan diferencia en el año. De manera opcional, une el patrón con caracteres que no sean alfanuméricos o el carácter de directorio (/). El valor predeterminado es YYYY.
  • monthPattern: Es el patrón para dar formato al mes. Debe ser uno o más caracteres de M. De manera opcional, une el patrón con caracteres que no sean alfanuméricos o el carácter de directorio (/). La configuración predeterminada es MM.
  • dayPattern: Es el patrón para dar formato al día. Debe ser uno o más de d para el día del mes o D para el día del año. De manera opcional, une el patrón con caracteres que no sean alfanuméricos o el carácter de directorio (/). El valor predeterminado es dd.
  • hourPattern: Es el patrón para dar formato a la hora. Debe ser uno o más caracteres de H. De manera opcional, une el patrón con caracteres que no sean alfanuméricos o el carácter de directorio (/). La configuración predeterminada es HH.
  • minutePattern: Es el patrón para dar formato al minuto. Debe ser uno o más caracteres de m. De manera opcional, une el patrón con caracteres que no sean alfanuméricos o el carácter de directorio (/). La configuración predeterminada es mm.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Text Files on Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona Al menos una vez.
  8. Haz clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • BUCKET_NAME: Es el nombre del bucket de Cloud Storage.
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToText.Options;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;

/**
 * This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
 * windowed files at the specified output directory.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_PubSub_to_GCS_Text.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_PubSub_to_GCS_Text",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Text Files on Cloud Storage",
    description =
        "The Pub/Sub to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub topic and "
            + "saves them as a series of Cloud Storage files in text format. "
            + "The template can be used as a quick way to save data in Pub/Sub for future use. "
            + "By default, the template generates a new file every 5 minutes.",
    optionsClass = Options.class,
    skipOptions = {"inputSubscription"},
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-to-text",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Pub/Sub topic must exist prior to execution.",
      "The messages published to the topic must be in text format.",
      "The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file."
    },
    streaming = true,
    supportsAtLeastOnce = true,
    supportsExactlyOnce = true)
public class PubsubToText {

  /**
   * Options supported by the pipeline.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {

    @TemplateParameter.PubsubSubscription(
        order = 1,
        groupName = "Source",
        optional = true,
        description = "Pub/Sub input subscription",
        helpText =
            "Pub/Sub subscription to read the input from, in the format of"
                + " 'projects/your-project-id/subscriptions/your-subscription-name'",
        example = "projects/your-project-id/subscriptions/your-subscription-name")
    ValueProvider<String> getInputSubscription();

    void setInputSubscription(ValueProvider<String> value);

    @TemplateParameter.PubsubTopic(
        order = 2,
        groupName = "Source",
        optional = true,
        description = "Pub/Sub input topic",
        helpText =
            "The Pub/Sub topic to read the input from. The topic name should be in the format "
                + "`projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.")
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @TemplateCreationParameter(value = "false")
    @Description(
        "This determines whether the template reads from a Pub/Sub subscription or a topic")
    @Default.Boolean(false)
    Boolean getUseSubscription();

    void setUseSubscription(Boolean value);

    @TemplateParameter.GcsWriteFolder(
        order = 3,
        groupName = "Target",
        description = "Output file directory in Cloud Storage",
        helpText =
            "The path and filename prefix for writing output files. For example, `gs://bucket-name/path/`. This value must end in a slash.")
    @Required
    ValueProvider<String> getOutputDirectory();

    void setOutputDirectory(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        optional = true,
        description = "User provided temp location",
        helpText =
            "The user provided directory to output temporary files to. Must end with a slash.")
    ValueProvider<String> getUserTempLocation();

    void setUserTempLocation(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        description = "Output filename prefix of the files to write",
        helpText = "The prefix to place on each windowed file. For example, `output-`.")
    @Default.String("output")
    @Required
    ValueProvider<String> getOutputFilenamePrefix();

    void setOutputFilenamePrefix(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 6,
        groupName = "Target",
        optional = true,
        description = "Output filename suffix of the files to write",
        helpText =
            "The suffix to place on each windowed file. Typically a file extension such as `.txt` or `.csv`.")
    @Default.String("")
    ValueProvider<String> getOutputFilenameSuffix();

    void setOutputFilenameSuffix(ValueProvider<String> value);
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    options.setStreaming(true);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> messages = null;

    /*
     * Steps:
     *   1) Read string messages from PubSub
     *   2) Window the messages into minute intervals specified by the executor.
     *   3) Output the windowed files to GCS
     */
    if (options.getUseSubscription()) {
      messages =
          pipeline.apply(
              "Read PubSub Events",
              PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
    } else {
      messages =
          pipeline.apply(
              "Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
    }
    messages
        .apply(
            options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))

        // Apply windowed file writes. Use a NestedValueProvider because the filename
        // policy requires a resourceId generated from the input value at runtime.
        .apply(
            "Write File(s)",
            TextIO.write()
                .withWindowedWrites()
                .withNumShards(options.getNumShards())
                .to(
                    WindowedFilenamePolicy.writeWindowedFiles()
                        .withOutputDirectory(options.getOutputDirectory())
                        .withOutputFilenamePrefix(options.getOutputFilenamePrefix())
                        .withShardTemplate(options.getOutputShardTemplate())
                        .withSuffix(options.getOutputFilenameSuffix())
                        .withYearPattern(options.getYearPattern())
                        .withMonthPattern(options.getMonthPattern())
                        .withDayPattern(options.getDayPattern())
                        .withHourPattern(options.getHourPattern())
                        .withMinutePattern(options.getMinutePattern()))
                .withTempDirectory(
                    NestedValueProvider.of(
                        maybeUseUserTempLocation(
                            options.getUserTempLocation(), options.getOutputDirectory()),
                        (SerializableFunction<String, ResourceId>)
                            input -> FileBasedSink.convertToFileResourceIfPossible(input))));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  /**
   * Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
   * when output bucket is locked and temporary data cannot be deleted.
   *
   * @param userTempLocation user provided temp location
   * @param outputLocation user provided outputDirectory to be used as the default temp location
   * @return userTempLocation if available, otherwise outputLocation is returned.
   */
  private static ValueProvider<String> maybeUseUserTempLocation(
      ValueProvider<String> userTempLocation, ValueProvider<String> outputLocation) {
    return DualInputNestedValueProvider.of(
        userTempLocation,
        outputLocation,
        new SerializableFunction<TranslatorInput<String, String>, String>() {
          @Override
          public String apply(TranslatorInput<String, String> input) {
            return (input.getX() != null) ? input.getX() : input.getY();
          }
        });
  }
}

¿Qué sigue?