Tema de Pub/Sub o suscripción a archivos de texto en la planilla de Cloud Storage

El tema de Pub/Sub o la suscripción a archivos de texto en Cloud Storage es una canalización de transmisión que lee registros de Pub/Sub y los guarda como una serie de archivos de Cloud Storage en formato de texto. La plantilla se puede usar como una forma rápida de guardar datos en Pub/Sub para su uso futuro. De forma predeterminada, la plantilla genera un archivo nuevo cada 5 minutos.

Requisitos de la canalización

  • El tema o la suscripción a Pub/Sub deben existir antes de la ejecución.
  • Los mensajes publicados en el tema deben tener formato de texto.
  • Los mensajes publicados en el tema no deben contener líneas nuevas. Ten en cuenta que cada mensaje de Pub/Sub se guarda como una sola línea en el archivo de salida.

Parámetros de la plantilla

Parámetros obligatorios

  • outputDirectory: La ruta de acceso y el prefijo del nombre de archivo donde se escribirán los archivos de salida. El valor debe terminar con una barra. Por ejemplo, gs://your-bucket/your-path/

Parámetros opcionales

  • inputTopic: El tema de Pub/Sub desde el que se lee la entrada. Si se proporciona este parámetro, no uses inputSubscription. Por ejemplo, projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: Es la suscripción a Pub/Sub desde la que se lee la entrada. Si se proporciona este parámetro, no uses inputTopic. Por ejemplo, projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>.
  • userTempLocation: Es el directorio proporcionado por el usuario al que se exportarán los archivos temporales. Debe terminar con una barra.
  • outputFilenamePrefix: Es el prefijo para colocar en cada archivo con ventanas. Por ejemplo, output- La configuración predeterminada es: output.
  • outputFilenameSuffix: Es el sufijo para colocar en cada archivo con ventanas, por lo general, una extensión de archivo como .txt o .csv. Por ejemplo, .txt. La configuración predeterminada es vacía.
  • outputShardTemplate: La plantilla de fragmentación define la parte dinámica de cada archivo con ventanas. De forma predeterminada, la canalización usa una única fragmentación de salida para el sistema de archivos dentro de cada ventana. Esto significa que todos los datos se envían a un solo archivo por ventana. El valor predeterminado outputShardTemplate es W-P-SS-of-NN, en el que W es el período de la ventana, P es la información del panel, S es el número de fragmento y N es la cantidad de shards. En el caso de un solo archivo, la parte SS-of-NN de outputShardTemplate es 00-of-01.
  • numShards: Es la cantidad máxima de fragmentos de salida que se produce con la escritura. Una mayor cantidad de fragmentos implica una mayor capacidad de procesamiento para la escritura en Cloud Storage, pero, también, un mayor costo de agregación de datos entre fragmentos cuando se procesan archivos de salida de Cloud Storage. La configuración predeterminada es 0.
  • windowDuration: La duración de la ventana es el intervalo en el que se escriben los datos en el directorio de salida. Configura la duración en función de la capacidad de procesamiento de la canalización. Por ejemplo, una capacidad de procesamiento mayor puede requerir tamaños de ventana más pequeños para que los datos se ajusten a la memoria. El valor predeterminado es 5m (5 minutos), con un mínimo de 1s (1 segundo). Los formatos permitidos son: [int]s (para segundos, por ejemplo, 5s), [int]m (para minutos, por ejemplo, 12m), [int]h (para horas, por ejemplo, 2h). Por ejemplo, 5m.
  • yearPattern: Es el patrón para dar formato al año. Debe ser uno o más caracteres y o Y. Las mayúsculas y minúsculas no marcan diferencia en el año. De forma opcional, el patrón se puede unir con caracteres que no sean alfanuméricos o el carácter de directorio (/). El valor predeterminado es YYYY.
  • monthPattern: Es el patrón para dar formato al mes. Debe ser uno o más caracteres de M. De forma opcional, el patrón se puede unir con caracteres que no sean alfanuméricos o el carácter de directorio (/). La configuración predeterminada es MM.
  • dayPattern: Es el patrón para dar formato al día. Debe ser uno o más de d para el día del mes o D para el día del año. Las mayúsculas y minúsculas no marcan diferencia en el año. De forma opcional, el patrón se puede unir con caracteres que no sean alfanuméricos o el carácter de directorio (/). El valor predeterminado es dd.
  • hourPattern: Es el patrón para dar formato a la hora. Debe ser uno o más caracteres de H. De forma opcional, el patrón se puede unir con caracteres que no sean alfanuméricos o el carácter de directorio (/). La configuración predeterminada es HH.
  • minutePattern: Es el patrón para dar formato al minuto. Debe ser uno o más caracteres de m. De forma opcional, el patrón se puede unir con caracteres que no sean alfanuméricos o el carácter de directorio (/). La configuración predeterminada es mm.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • BUCKET_NAME: Es el nombre de tu bucket de Cloud Storage.
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.pubsubtotext;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.v2.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;

/**
 * This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
 * windowed files at the specified output directory.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Cloud_PubSub_to_GCS_Text_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_PubSub_to_GCS_Text_Flex",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub Subscription or Topic to Text Files on Cloud Storage",
    description =
        "The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records "
            + "from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.",
    optionsClass = Options.class,
    flexContainerName = "pubsub-to-text",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-subscription-to-text",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Pub/Sub topic or subscription must exist prior to execution.",
      "The messages published to the topic must be in text format.",
      "The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class PubsubToText {

  /**
   * Options supported by the pipeline.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {

    @TemplateParameter.PubsubTopic(
        order = 1,
        groupName = "Source",
        optional = true,
        description = "Pub/Sub input topic",
        helpText =
            "The Pub/Sub topic to read the input from. If this parameter is provided "
                + "don't use `inputSubscription`.",
        example = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>")
    String getInputTopic();

    void setInputTopic(String value);

    @TemplateParameter.PubsubSubscription(
        order = 2,
        groupName = "Source",
        optional = true,
        description = "Pub/Sub input subscription",
        helpText =
            "The Pub/Sub subscription to read the input from. If this parameter is "
                + "provided, don't use `inputTopic`.",
        example = "projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>")
    String getInputSubscription();

    void setInputSubscription(String value);

    @TemplateParameter.GcsWriteFolder(
        order = 3,
        groupName = "Target",
        description = "Output file directory in Cloud Storage",
        helpText =
            "The path and filename prefix to write write output files to. "
                + "This value must end in a slash.",
        example = "gs://your-bucket/your-path/")
    @Required
    String getOutputDirectory();

    void setOutputDirectory(String value);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        optional = true,
        description = "User provided temp location",
        helpText =
            "The user provided directory to output temporary files to. Must end with a slash.")
    String getUserTempLocation();

    void setUserTempLocation(String value);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        optional = true,
        description = "Output filename prefix of the files to write",
        helpText = "The prefix to place on each windowed file.",
        example = "output-")
    @Default.String("output")
    @Required
    String getOutputFilenamePrefix();

    void setOutputFilenamePrefix(String value);

    @TemplateParameter.Text(
        order = 6,
        groupName = "Target",
        optional = true,
        description = "Output filename suffix of the files to write",
        helpText =
            "The suffix to place on each windowed file, typically a file extension such as `.txt` or `.csv`.",
        example = ".txt")
    @Default.String("")
    String getOutputFilenameSuffix();

    void setOutputFilenameSuffix(String value);
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    options.setStreaming(true);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
    boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
    if (useInputSubscription == useInputTopic) {
      throw new IllegalArgumentException(
          "Either input topic or input subscription must be provided, but not both.");
    }

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> messages = null;

    /*
     * Steps:
     *   1) Read string messages from PubSub
     *   2) Window the messages into minute intervals specified by the executor.
     *   3) Output the windowed files to GCS
     */
    if (useInputSubscription) {
      messages =
          pipeline.apply(
              "Read PubSub Events",
              PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
    } else {
      messages =
          pipeline.apply(
              "Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
    }
    messages
        .apply(
            options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))

        // Apply windowed file writes
        .apply(
            "Write File(s)",
            TextIO.write()
                .withWindowedWrites()
                .withNumShards(options.getNumShards())
                .to(
                    WindowedFilenamePolicy.writeWindowedFiles()
                        .withOutputDirectory(options.getOutputDirectory())
                        .withOutputFilenamePrefix(options.getOutputFilenamePrefix())
                        .withShardTemplate(options.getOutputShardTemplate())
                        .withSuffix(options.getOutputFilenameSuffix())
                        .withYearPattern(options.getYearPattern())
                        .withMonthPattern(options.getMonthPattern())
                        .withDayPattern(options.getDayPattern())
                        .withHourPattern(options.getHourPattern())
                        .withMinutePattern(options.getMinutePattern()))
                .withTempDirectory(
                    FileBasedSink.convertToFileResourceIfPossible(
                        maybeUseUserTempLocation(
                            options.getUserTempLocation(), options.getOutputDirectory()))));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  /**
   * Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
   * when output bucket is locked and temporary data cannot be deleted.
   *
   * @param userTempLocation user provided temp location
   * @param outputLocation user provided outputDirectory to be used as the default temp location
   * @return userTempLocation if available, otherwise outputLocation is returned.
   */
  private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) {
    return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation;
  }
}

¿Qué sigue?