Plantilla de Pub/Sub a Pub/Sub

La plantilla de Pub/Sub a Pub/Sub es una canalización de transmisión que lee mensajes de una suscripción de Pub/Sub y los escribe en otro tema de Pub/Sub. La canalización también acepta una clave de atributo de mensaje opcional y un valor que se puede usar para filtrar los mensajes que se deben escribir en el tema de Pub/Sub. Puedes usar esta plantilla para copiar mensajes de una suscripción de Pub/Sub a otro tema de Pub/Sub con un filtro de mensajes opcional.

Requisitos de la canalización

  • La suscripción a Pub/Sub de origen debe existir antes de la ejecución.
  • La suscripción de Pub/Sub de origen debe ser una suscripción de extracción.
  • El tema de Pub/Sub de destino debe existir antes de la ejecución.

Parámetros de la plantilla

Parámetros obligatorios

  • inputSubscription: Es la suscripción a Pub/Sub desde la que se lee la entrada. Por ejemplo, projects/your-project-id/subscriptions/your-subscription-name
  • outputTopic: El tema de Pub/Sub en el que se escribirá el resultado. Por ejemplo, projects/your-project-id/topics/your-topic-name

Parámetros opcionales

  • filterKey: Es la clave de atributo que se usará para filtrar eventos. No se aplican filtros si no se especifica filterKey.
  • filterValue: Es el valor del atributo que se usará para filtrar eventos cuando se proporciona un filterKey. De forma predeterminada, se usa un filterValue nulo.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to Pub/Sub template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona Al menos una vez.
  8. Haz clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
  • FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. Las coincidencias parciales (como una substring) no se filtran. De forma predeterminada, se usa un valor de filtro de evento nulo.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: la ubicación para los archivos locales de etapa de pruebas (por ejemplo, gs://your-bucket/staging).
  • SUBSCRIPTION_NAME: Es el nombre de la suscripción a Pub/Sub.
  • TOPIC_NAME: Es el nombre del tema de Pub/Sub.
  • FILTER_KEY: Es la clave de atributo que se usa para filtrar los eventos. No se aplicará ningún filtro si no se especifica una clave.
  • FILTER_VALUE: Es el valor del atributo del filtro que se debe usar si se proporciona una clave de filtro de evento. Acepta una string de regex de Java válida como valor de filtro de evento. En caso de que se proporcione una regex, la expresión completa debe coincidir para que el mensaje se filtre. Las coincidencias parciales (como una substring) no se filtran. De forma predeterminada, se usa un valor de filtro de evento nulo.
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubsubToPubsub.Options;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A template that copies messages from one Pubsub subscription to another Pubsub topic. Allows
 * filtering specific messages.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_PubSub_to_Cloud_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_PubSub_to_Cloud_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Pub/Sub",
    description =
        "The Pub/Sub to Pub/Sub template is a streaming pipeline that reads messages from a Pub/Sub subscription and "
            + "writes the messages to another Pub/Sub topic. The pipeline also accepts an optional message attribute key and a value that can be used to filter the messages that should be written to the Pub/Sub topic. You can use this template to copy messages from a Pub/Sub subscription to another Pub/Sub topic with an optional message filter.",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The source Pub/Sub subscription must exist prior to execution.",
      "The source Pub/Sub subscription must be a <a href=\"https://cloud.google.com/pubsub/docs/pull\">pull subscription</a>.",
      "The destination Pub/Sub topic must exist prior to execution."
    },
    streaming = true,
    supportsAtLeastOnce = true,
    supportsExactlyOnce = true)
public class PubsubToPubsub {

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {

    // Parse the user options passed from the command-line
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    options.setStreaming(true);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     Steps:

     <p>1) Read PubSubMessage with attributes from input PubSub subscription.

     <p>2) Apply any filters if an attribute=value pair is provided.

     <p>3) Write each PubSubMessage to output PubSub topic.
    */
    pipeline
        .apply(
            "Read PubSub Events",
            PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
        .apply(
            "Filter Events If Enabled",
            ParDo.of(
                ExtractAndFilterEventsFn.newBuilder()
                    .withFilterKey(options.getFilterKey())
                    .withFilterValue(options.getFilterValue())
                    .build()))
        .apply("Write PubSub Events", PubsubIO.writeMessages().to(options.getOutputTopic()));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  /**
   * Options supported by {@link PubsubToPubsub}.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options extends PipelineOptions, StreamingOptions {
    @TemplateParameter.PubsubSubscription(
        order = 1,
        groupName = "Source",
        description = "Pub/Sub input subscription",
        helpText = "The Pub/Sub subscription to read the input from.",
        example = "projects/your-project-id/subscriptions/your-subscription-name")
    @Validation.Required
    ValueProvider<String> getInputSubscription();

    void setInputSubscription(ValueProvider<String> inputSubscription);

    @TemplateParameter.PubsubTopic(
        order = 2,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText = "The Pub/Sub topic to write the output to.",
        example = "projects/your-project-id/topics/your-topic-name")
    @Validation.Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> outputTopic);

    @TemplateParameter.Text(
        order = 3,
        optional = true,
        description = "Event filter key",
        helpText =
            "The attribute key to use to filter events. No filters are applied if `filterKey` is not specified.")
    ValueProvider<String> getFilterKey();

    void setFilterKey(ValueProvider<String> filterKey);

    @TemplateParameter.Text(
        order = 4,
        optional = true,
        description = "Event filter value",
        helpText =
            "The attribute value to use to filter events when a `filterKey` is provided. By default, a null `filterValue` is used.")
    ValueProvider<String> getFilterValue();

    void setFilterValue(ValueProvider<String> filterValue);
  }

  /**
   * DoFn that will determine if events are to be filtered. If filtering is enabled, it will only
   * publish events that pass the filter else, it will publish all input events.
   */
  @AutoValue
  public abstract static class ExtractAndFilterEventsFn extends DoFn<PubsubMessage, PubsubMessage> {

    private static final Logger LOG = LoggerFactory.getLogger(ExtractAndFilterEventsFn.class);

    // Counter tracking the number of incoming Pub/Sub messages.
    private static final Counter INPUT_COUNTER =
        Metrics.counter(ExtractAndFilterEventsFn.class, "inbound-messages");

    // Counter tracking the number of output Pub/Sub messages after the user provided filter
    // is applied.
    private static final Counter OUTPUT_COUNTER =
        Metrics.counter(ExtractAndFilterEventsFn.class, "filtered-outbound-messages");

    private Boolean doFilter;
    private String inputFilterKey;
    private Pattern inputFilterValueRegex;
    private Boolean isNullFilterValue;

    public static Builder newBuilder() {
      return new AutoValue_PubsubToPubsub_ExtractAndFilterEventsFn.Builder();
    }

    @Nullable
    abstract ValueProvider<String> filterKey();

    @Nullable
    abstract ValueProvider<String> filterValue();

    @Setup
    public void setup() {

      if (this.doFilter != null) {
        return; // Filter has been evaluated already
      }

      inputFilterKey = (filterKey() == null ? null : filterKey().get());

      if (inputFilterKey == null) {

        // Disable input message filtering.
        this.doFilter = false;

      } else {

        this.doFilter = true; // Enable filtering.

        String inputFilterValue = (filterValue() == null ? null : filterValue().get());

        if (inputFilterValue == null) {

          LOG.warn(
              "User provided a NULL for filterValue. Only messages with a value of NULL for the"
                  + " filterKey: {} will be filtered forward",
              inputFilterKey);

          // For backward compatibility, we are allowing filtering by null filterValue.
          this.isNullFilterValue = true;
          this.inputFilterValueRegex = null;
        } else {

          this.isNullFilterValue = false;
          try {
            inputFilterValueRegex = getFilterPattern(inputFilterValue);
          } catch (PatternSyntaxException e) {
            LOG.error("Invalid regex pattern for supplied filterValue: {}", inputFilterValue);
            throw new RuntimeException(e);
          }
        }

        LOG.info(
            "Enabling event filter [key: " + inputFilterKey + "][value: " + inputFilterValue + "]");
      }
    }

    @ProcessElement
    public void processElement(ProcessContext context) {

      INPUT_COUNTER.inc();
      if (!this.doFilter) {

        // Filter is not enabled
        writeOutput(context, context.element());
      } else {

        PubsubMessage message = context.element();
        String extractedValue = message.getAttribute(this.inputFilterKey);

        if (this.isNullFilterValue) {

          if (extractedValue == null) {
            // If we are filtering for null and the extracted value is null, we forward
            // the message.
            writeOutput(context, message);
          }

        } else {

          if (extractedValue != null
              && this.inputFilterValueRegex.matcher(extractedValue).matches()) {
            // If the extracted value is not null and it matches the filter,
            // we forward the message.
            writeOutput(context, message);
          }
        }
      }
    }

    /**
     * Write a {@link PubsubMessage} and increment the output counter.
     *
     * @param context {@link ProcessContext} to write {@link PubsubMessage} to.
     * @param message {@link PubsubMessage} output.
     */
    private void writeOutput(ProcessContext context, PubsubMessage message) {
      OUTPUT_COUNTER.inc();
      context.output(message);
    }

    /**
     * Return a {@link Pattern} based on a user provided regex string.
     *
     * @param regex Regex string to compile.
     * @return {@link Pattern}
     * @throws PatternSyntaxException If the string is an invalid regex.
     */
    private Pattern getFilterPattern(String regex) throws PatternSyntaxException {
      checkNotNull(regex, "Filter regex cannot be null.");
      return Pattern.compile(regex);
    }

    /** Builder class for {@link ExtractAndFilterEventsFn}. */
    @AutoValue.Builder
    abstract static class Builder {

      abstract Builder setFilterKey(ValueProvider<String> filterKey);

      abstract Builder setFilterValue(ValueProvider<String> filterValue);

      abstract ExtractAndFilterEventsFn build();

      /**
       * Method to set the filterKey used for filtering messages.
       *
       * @param filterKey Lookup key for the {@link PubsubMessage} attribute map.
       * @return {@link Builder}
       */
      public Builder withFilterKey(ValueProvider<String> filterKey) {
        checkArgument(filterKey != null, "withFilterKey(filterKey) called with null input.");
        return setFilterKey(filterKey);
      }

      /**
       * Method to set the filterValue used for filtering messages.
       *
       * @param filterValue Lookup value for the {@link PubsubMessage} attribute map.
       * @return {@link Builder}
       */
      public Builder withFilterValue(ValueProvider<String> filterValue) {
        checkArgument(filterValue != null, "withFilterValue(filterValue) called with null input.");
        return setFilterValue(filterValue);
      }
    }
  }
}

¿Qué sigue?