Modèle Pub/Sub vers Pub/Sub

Le modèle Pub/Sub vers Pub/Sub est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et les écrit dans un autre sujet Pub/Sub. Le pipeline accepte également une clé facultative d'attribut de message et une valeur qui peut être utilisée pour filtrer les messages devant être écrits dans le sujet Pub/Sub. Vous pouvez utiliser ce modèle pour copier des messages d'un abonnement Pub/Sub à un autre sujet Pub/Sub avec un filtre de message facultatif.

Conditions requises pour ce pipeline

  • L'abonnement Pub/Sub source doit exister avant l'exécution.
  • L'abonnement Pub/Sub source doit être un abonnement pull.
  • Le sujet Pub/Sub de destination doit exister avant l'exécution.

Paramètres de modèle

Paramètres obligatoires

  • inputSubscription: abonnement Pub/Sub à partir duquel lire l'entrée. Exemple :projects/your-project-id/subscriptions/your-subscription-name
  • outputTopic: sujet Pub/Sub dans lequel écrire la sortie. Exemple :projects/your-project-id/topics/your-topic-name

Paramètres facultatifs

  • filterKey: clé de l'attribut à utiliser pour filtrer les événements. Aucun filtre n'est appliqué si filterKey n'est pas spécifié.
  • filterValue: valeur d'attribut à utiliser pour filtrer les événements lorsqu'un filterKey est fourni. Par défaut, une valeur filterValue nulle est utilisée.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Pub/Sub template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement en flux continu de type "au moins une fois", sélectionnez Au moins une fois.
  8. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOPIC_NAME : nom du sujet Pub/Sub
  • FILTER_KEY : clé d'attribut utilisée pour le filtrage des événements. Aucun filtre n'est appliqué si aucune clé n'est spécifiée.
  • FILTER_VALUE : valeur d'attribut de filtre à utiliser si une clé de filtre d'événement est fournie. Accepte une chaîne de l'expression régulière Java valide en tant que valeur de filtre d'événement. En cas d'expression régulière, l'expression complète doit correspondre pour que le message soit filtré. Les correspondances partielles (telles que les sous-chaînes) ne sont pas filtrées. Une valeur de filtre d'événement "null" est utilisée par défaut.

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOPIC_NAME : nom du sujet Pub/Sub
  • FILTER_KEY : clé d'attribut utilisée pour le filtrage des événements. Aucun filtre n'est appliqué si aucune clé n'est spécifiée.
  • FILTER_VALUE : valeur d'attribut de filtre à utiliser si une clé de filtre d'événement est fournie. Accepte une chaîne de l'expression régulière Java valide en tant que valeur de filtre d'événement. En cas d'expression régulière, l'expression complète doit correspondre pour que le message soit filtré. Les correspondances partielles (telles que les sous-chaînes) ne sont pas filtrées. Une valeur de filtre d'événement "null" est utilisée par défaut.
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.PubsubToPubsub.Options;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A template that copies messages from one Pubsub subscription to another Pubsub topic. Allows
 * filtering specific messages.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_PubSub_to_Cloud_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_PubSub_to_Cloud_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Pub/Sub",
    description =
        "The Pub/Sub to Pub/Sub template is a streaming pipeline that reads messages from a Pub/Sub subscription and "
            + "writes the messages to another Pub/Sub topic. The pipeline also accepts an optional message attribute key and a value that can be used to filter the messages that should be written to the Pub/Sub topic. You can use this template to copy messages from a Pub/Sub subscription to another Pub/Sub topic with an optional message filter.",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The source Pub/Sub subscription must exist prior to execution.",
      "The source Pub/Sub subscription must be a <a href=\"https://cloud.google.com/pubsub/docs/pull\">pull subscription</a>.",
      "The destination Pub/Sub topic must exist prior to execution."
    },
    streaming = true,
    supportsAtLeastOnce = true,
    supportsExactlyOnce = true)
public class PubsubToPubsub {

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {

    // Parse the user options passed from the command-line
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    options.setStreaming(true);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     Steps:

     <p>1) Read PubSubMessage with attributes from input PubSub subscription.

     <p>2) Apply any filters if an attribute=value pair is provided.

     <p>3) Write each PubSubMessage to output PubSub topic.
    */
    pipeline
        .apply(
            "Read PubSub Events",
            PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
        .apply(
            "Filter Events If Enabled",
            ParDo.of(
                ExtractAndFilterEventsFn.newBuilder()
                    .withFilterKey(options.getFilterKey())
                    .withFilterValue(options.getFilterValue())
                    .build()))
        .apply("Write PubSub Events", PubsubIO.writeMessages().to(options.getOutputTopic()));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  /**
   * Options supported by {@link PubsubToPubsub}.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options extends PipelineOptions, StreamingOptions {
    @TemplateParameter.PubsubSubscription(
        order = 1,
        groupName = "Source",
        description = "Pub/Sub input subscription",
        helpText = "The Pub/Sub subscription to read the input from.",
        example = "projects/your-project-id/subscriptions/your-subscription-name")
    @Validation.Required
    ValueProvider<String> getInputSubscription();

    void setInputSubscription(ValueProvider<String> inputSubscription);

    @TemplateParameter.PubsubTopic(
        order = 2,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText = "The Pub/Sub topic to write the output to.",
        example = "projects/your-project-id/topics/your-topic-name")
    @Validation.Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> outputTopic);

    @TemplateParameter.Text(
        order = 3,
        optional = true,
        description = "Event filter key",
        helpText =
            "The attribute key to use to filter events. No filters are applied if `filterKey` is not specified.")
    ValueProvider<String> getFilterKey();

    void setFilterKey(ValueProvider<String> filterKey);

    @TemplateParameter.Text(
        order = 4,
        optional = true,
        description = "Event filter value",
        helpText =
            "The attribute value to use to filter events when a `filterKey` is provided. By default, a null `filterValue` is used.")
    ValueProvider<String> getFilterValue();

    void setFilterValue(ValueProvider<String> filterValue);
  }

  /**
   * DoFn that will determine if events are to be filtered. If filtering is enabled, it will only
   * publish events that pass the filter else, it will publish all input events.
   */
  @AutoValue
  public abstract static class ExtractAndFilterEventsFn extends DoFn<PubsubMessage, PubsubMessage> {

    private static final Logger LOG = LoggerFactory.getLogger(ExtractAndFilterEventsFn.class);

    // Counter tracking the number of incoming Pub/Sub messages.
    private static final Counter INPUT_COUNTER =
        Metrics.counter(ExtractAndFilterEventsFn.class, "inbound-messages");

    // Counter tracking the number of output Pub/Sub messages after the user provided filter
    // is applied.
    private static final Counter OUTPUT_COUNTER =
        Metrics.counter(ExtractAndFilterEventsFn.class, "filtered-outbound-messages");

    private Boolean doFilter;
    private String inputFilterKey;
    private Pattern inputFilterValueRegex;
    private Boolean isNullFilterValue;

    public static Builder newBuilder() {
      return new AutoValue_PubsubToPubsub_ExtractAndFilterEventsFn.Builder();
    }

    @Nullable
    abstract ValueProvider<String> filterKey();

    @Nullable
    abstract ValueProvider<String> filterValue();

    @Setup
    public void setup() {

      if (this.doFilter != null) {
        return; // Filter has been evaluated already
      }

      inputFilterKey = (filterKey() == null ? null : filterKey().get());

      if (inputFilterKey == null) {

        // Disable input message filtering.
        this.doFilter = false;

      } else {

        this.doFilter = true; // Enable filtering.

        String inputFilterValue = (filterValue() == null ? null : filterValue().get());

        if (inputFilterValue == null) {

          LOG.warn(
              "User provided a NULL for filterValue. Only messages with a value of NULL for the"
                  + " filterKey: {} will be filtered forward",
              inputFilterKey);

          // For backward compatibility, we are allowing filtering by null filterValue.
          this.isNullFilterValue = true;
          this.inputFilterValueRegex = null;
        } else {

          this.isNullFilterValue = false;
          try {
            inputFilterValueRegex = getFilterPattern(inputFilterValue);
          } catch (PatternSyntaxException e) {
            LOG.error("Invalid regex pattern for supplied filterValue: {}", inputFilterValue);
            throw new RuntimeException(e);
          }
        }

        LOG.info(
            "Enabling event filter [key: " + inputFilterKey + "][value: " + inputFilterValue + "]");
      }
    }

    @ProcessElement
    public void processElement(ProcessContext context) {

      INPUT_COUNTER.inc();
      if (!this.doFilter) {

        // Filter is not enabled
        writeOutput(context, context.element());
      } else {

        PubsubMessage message = context.element();
        String extractedValue = message.getAttribute(this.inputFilterKey);

        if (this.isNullFilterValue) {

          if (extractedValue == null) {
            // If we are filtering for null and the extracted value is null, we forward
            // the message.
            writeOutput(context, message);
          }

        } else {

          if (extractedValue != null
              && this.inputFilterValueRegex.matcher(extractedValue).matches()) {
            // If the extracted value is not null and it matches the filter,
            // we forward the message.
            writeOutput(context, message);
          }
        }
      }
    }

    /**
     * Write a {@link PubsubMessage} and increment the output counter.
     *
     * @param context {@link ProcessContext} to write {@link PubsubMessage} to.
     * @param message {@link PubsubMessage} output.
     */
    private void writeOutput(ProcessContext context, PubsubMessage message) {
      OUTPUT_COUNTER.inc();
      context.output(message);
    }

    /**
     * Return a {@link Pattern} based on a user provided regex string.
     *
     * @param regex Regex string to compile.
     * @return {@link Pattern}
     * @throws PatternSyntaxException If the string is an invalid regex.
     */
    private Pattern getFilterPattern(String regex) throws PatternSyntaxException {
      checkNotNull(regex, "Filter regex cannot be null.");
      return Pattern.compile(regex);
    }

    /** Builder class for {@link ExtractAndFilterEventsFn}. */
    @AutoValue.Builder
    abstract static class Builder {

      abstract Builder setFilterKey(ValueProvider<String> filterKey);

      abstract Builder setFilterValue(ValueProvider<String> filterValue);

      abstract ExtractAndFilterEventsFn build();

      /**
       * Method to set the filterKey used for filtering messages.
       *
       * @param filterKey Lookup key for the {@link PubsubMessage} attribute map.
       * @return {@link Builder}
       */
      public Builder withFilterKey(ValueProvider<String> filterKey) {
        checkArgument(filterKey != null, "withFilterKey(filterKey) called with null input.");
        return setFilterKey(filterKey);
      }

      /**
       * Method to set the filterValue used for filtering messages.
       *
       * @param filterValue Lookup value for the {@link PubsubMessage} attribute map.
       * @return {@link Builder}
       */
      public Builder withFilterValue(ValueProvider<String> filterValue) {
        checkArgument(filterValue != null, "withFilterValue(filterValue) called with null input.");
        return setFilterValue(filterValue);
      }
    }
  }
}

Étape suivante