Modèle de flux de modifications Bigtable vers Pub/Sub

Le modèle de flux de modifications Bigtable vers Pub/Sub est un pipeline de traitement en flux continu qui diffuse les enregistrements de modifications des données Bigtable et les publie vers un sujet Pub/Sub à l'aide de Dataflow.

Un flux de modifications Bigtable vous permet de vous abonner aux mutations de données par table. Lorsque vous vous abonnez à des flux de modification de table, les contraintes suivantes s'appliquent :

  • Seuls les cellules et les descripteurs modifiés des opérations de suppression sont renvoyés.
  • Seule la nouvelle valeur d'une cellule modifiée est renvoyée.

Lorsque des enregistrements de modification de données sont publiés dans un sujet Pub/Sub, les messages peuvent être insérés dans le désordre par rapport à l'ordre d'horodatage de commit Bigtable d'origine.

Les enregistrements de modification de données Bigtable qui ne peuvent pas être publiés dans des sujets Pub/Sub sont temporairement placés dans un répertoire de file d'attente de lettres mortes (file d'attente de messages non traités) dans Storage. Une fois que le nombre maximal de tentatives infructueuses a été atteint, ces enregistrements sont indéfiniment placés dans le même répertoire de files d'attente de lettres mortes pour un examen manuel ou un traitement ultérieur réalisé par l'utilisateur.

Le pipeline nécessite que le sujet Pub/Sub de destination existe. Le sujet de destination peut être configuré pour valider les messages à l'aide d'un schéma. Lorsqu'un sujet Pub/Sub spécifie un schéma, le pipeline ne démarre que si ce schéma est valide. Selon le type de schéma, utilisez l'une des définitions de schéma suivantes pour le sujet de destination :

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  
{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

Utilisez le schéma Protobuf suivant avec l'encodage de messages JSON :

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

Chaque nouveau message Pub/Sub inclut une entrée d'un enregistrement de modification de données renvoyé par le flux de modification à partir de la ligne correspondante dans votre table Bigtable. Le modèle Pub/Sub aplatit les entrées de chaque enregistrement de modification de données en modifications individuelles au niveau de la cellule.

Description du message de sortie Pub/Sub

Nom du champ Description
rowKey Clé de ligne de la ligne modifiée. Arrive sous la forme d'un tableau d'octets. Lorsque l'encodage de messages JSON est configuré, les clés de ligne sont renvoyées sous forme de chaînes. Lorsque useBase64Rowkeys est spécifié, les clés de ligne sont encodées en base64. Sinon, un charset spécifié par bigtableChangeStreamCharset est utilisé pour décoder les octets de clés de ligne en une chaîne.
modType Type de mutation de ligne. Utilisez l'une des valeurs suivantes : SET_CELL, DELETE_CELLS ou DELETE_FAMILY.
columnFamily Famille de colonnes affectée par la mutation de ligne.
column Qualificatif de colonne affecté par la mutation de ligne. Pour le type de mutation DELETE_FAMILY, le champ de colonne n'est pas défini. Arrive sous la forme d'un tableau d'octets. Lorsque l'encodage de messages JSON est configuré, les colonnes sont renvoyées sous forme de chaînes. Lorsque useBase64ColumnQualifier est spécifié, le champ de la colonne est encodé en base64. Sinon, un charset spécifié par bigtableChangeStreamCharset est utilisé pour décoder les octets de clés de ligne en une chaîne.
commitTimestamp Date/heure à laquelle Bigtable applique la mutation. Ce temps est mesuré en microsecondes depuis l'epoch Unix (1er janvier 1970 à l'heure UTC).
timestamp Valeur d'horodatage de la cellule affectée par la mutation. Pour les types de mutation DELETE_CELLS et DELETE_FAMILY, l'horodatage n'est pas défini. Ce temps est mesuré en microsecondes depuis l'epoch Unix (1er janvier 1970 à l'heure UTC).
timestampFrom Décrit un début inclusif de l'intervalle d'horodatage pour toutes les cellules supprimées par la mutation DELETE_CELLS. Pour les autres types de mutation, timestampFrom n'est pas défini. Ce temps est mesuré en microsecondes depuis l'epoch Unix (1er janvier 1970 à l'heure UTC).
timestampTo Décrit une fin exclusive de l'intervalle d'horodatage pour toutes les cellules supprimées par la mutation DELETE_CELLS. Pour les autres types de mutation, timestampTo n'est pas défini.
isGC Valeur booléenne indiquant si la mutation est générée par un mécanisme de récupération de mémoire Bigtable.
tieBreaker Facultatif : lorsque deux mutations sont enregistrées en même temps par différents clusters Bigtable, la mutation avec la valeur tiebreaker la plus élevée est appliquée à la table source. Les mutations ayant des valeurs tiebreaker inférieures sont supprimées.
value Nouvelle valeur définie par la mutation. Sauf si l'option de pipeline stripValues est définie, la valeur est définie pour les mutations SET_CELL. Pour les autres types de mutation, la valeur n'est pas définie. Arrive sous la forme d'un tableau d'octets. Lorsque l'encodage des messages JSON est configuré, les valeurs sont renvoyées sous forme de chaînes. Lorsque useBase64Values est spécifié, la valeur est encodée en base64. Sinon, un charset spécifié par bigtableChangeStreamCharset est utilisé pour décoder les octets de valeur en une chaîne.
sourceInstance Nom de l'instance Bigtable qui a enregistré la mutation. Peut être utilisé lorsque plusieurs pipelines diffusent des modifications depuis différentes instances vers le même sujet Pub/Sub.
sourceCluster Nom du cluster Bigtable qui a enregistré la mutation. Peut être utilisé lorsque plusieurs pipelines diffusent des modifications depuis différentes instances vers le même sujet Pub/Sub.
sourceTable Nom de la table Bigtable ayant reçu la mutation. Peut être utilisé lorsque plusieurs pipelines diffusent des modifications depuis différentes tables vers le même sujet Pub/Sub.

Conditions requises pour ce pipeline

  • Instance source Bigtable spécifiée.
  • Table source Bigtable spécifiée. Les flux de modifications doivent être activés dans la table.
  • Profil d'application Bigtable spécifié.
  • Le sujet Pub/Sub spécifié doit exister.

Paramètres de modèle

Paramètres obligatoires

  • pubSubTopic: nom du sujet Pub/Sub de destination.
  • bigtableChangeStreamAppProfile: ID du profil d'application Bigtable. Le profil d'application doit utiliser un routage à cluster unique et autoriser les transactions à ligne unique.
  • bigtableReadInstanceId: ID de l'instance Bigtable source.
  • bigtableReadTableId: ID de la table Bigtable source.

Paramètres facultatifs

  • messageEncoding: encodage des messages à publier dans le sujet Pub/Sub. Lorsque le schéma du sujet de destination est configuré, l'encodage des messages est déterminé par les paramètres du sujet. Les valeurs suivantes sont acceptées : BINARY et JSON. La valeur par défaut est JSON.
  • messageFormat: encodage des messages à publier dans le sujet Pub/Sub. Lorsque le schéma du sujet de destination est configuré, l'encodage des messages est déterminé par les paramètres du sujet. Les valeurs suivantes sont acceptées : AVRO, PROTOCOL_BUFFERS et JSON. La valeur par défaut est JSON. Lorsque le format JSON est utilisé, les champs "rowKey", "column" et "value" du message sont des chaînes, dont le contenu est déterminé par les options de pipeline useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values et bigtableChangeStreamCharset.
  • stripValues: lorsque cette valeur est définie sur true, les mutations SET_CELL sont renvoyées sans nouvelle valeur définie. La valeur par défaut est false. Ce paramètre est utile lorsque vous n'avez pas besoin d'une nouvelle valeur (également appelé invalidation de cache) ou lorsque les valeurs sont extrêmement volumineuses et dépassent les limites de taille des messages Pub/Sub.
  • dlqDirectory: répertoire de la file d'attente de lettres mortes. Les enregistrements dont le traitement échoue sont stockés dans ce répertoire. Correspond par défaut à un répertoire situé sous l'emplacement temporaire du job Dataflow. Dans la plupart des cas, vous pouvez utiliser le chemin par défaut.
  • dlqRetryMinutes: nombre de minutes entre les tentatives d'exécution de la file d'attente de lettres mortes. La valeur par défaut est 10.
  • dlqMaxRetries: nombre maximal de tentatives d'exécution de lettres mortes. La valeur par défaut est 5.
  • useBase64Rowkeys: utilisé avec l'encodage de message JSON. Lorsque ce paramètre est défini sur true, le champ rowKey est une chaîne encodée en base64. Sinon, le rowKey est généré à l'aide de bigtableChangeStreamCharset pour décoder les octets en une chaîne. La valeur par défaut est false.
  • pubSubProjectId: ID du projet Bigtable. La valeur par défaut est le projet du job Dataflow.
  • useBase64ColumnQualifiers: utilisé avec l'encodage de message JSON. Lorsque ce paramètre est défini sur true, le champ column est une chaîne encodée en base64. Sinon, la colonne est générée à l'aide de bigtableChangeStreamCharset pour décoder les octets en une chaîne. La valeur par défaut est false.
  • useBase64Values: utilisé avec l'encodage de message JSON. Lorsque ce paramètre est défini sur true, le champ de valeur est une chaîne encodée en base64. Sinon, la valeur est générée à l'aide de bigtableChangeStreamCharset pour décoder les octets en une chaîne. La valeur par défaut est false.
  • disableDlqRetries: indique si les nouvelles tentatives doivent être désactivées pour la file d'attente de lettres mortes. La valeur par défaut est "false".
  • bigtableChangeStreamMetadataInstanceId: ID d'instance de métadonnées du flux de modifications Bigtable. La valeur par défaut est vide.
  • bigtableChangeStreamMetadataTableTableId: ID de la table de métadonnées du connecteur de flux de modifications Bigtable. Si aucune valeur n'est fournie, une table de métadonnées du connecteur de flux de modifications Bigtable est automatiquement créée pendant l'exécution du pipeline. La valeur par défaut est vide.
  • bigtableChangeStreamCharset: nom du charset de flux de modifications Bigtable. La valeur par défaut est UTF8.
  • bigtableChangeStreamStartTimestamp: code temporel de début (https://tools.ietf.org/html/rfc3339), inclusif, à utiliser pour la lecture des flux de modifications. Par exemple, 2022-05-05T07:59:59Z. La valeur par défaut est l'horodatage de l'heure de début du pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: liste des modifications de noms de familles de colonnes, séparées par une virgule, à ignorer. La valeur par défaut est vide.
  • bigtableChangeStreamIgnoreColumns: liste des modifications de noms de colonnes, séparées par une virgule, à ignorer. La valeur par défaut est vide.
  • bigtableChangeStreamName: nom unique pour le pipeline client. Permet de reprendre le traitement à partir du moment où un pipeline précédemment exécuté s'est arrêté. La valeur par défaut est un nom généré automatiquement. Consultez les journaux du job Dataflow pour connaître la valeur utilisée.
  • bigtableChangeStreamResume: lorsque ce paramètre est défini sur true, un nouveau pipeline reprend le traitement à partir du moment où un pipeline précédemment exécuté avec la même valeur bigtableChangeStreamName s'est arrêté. Si le pipeline avec la valeur bigtableChangeStreamName donnée n'a jamais été exécuté, aucun nouveau pipeline ne démarre. Lorsque ce paramètre est défini sur false, un nouveau pipeline démarre. Si un pipeline avec la même valeur bigtableChangeStreamName a déjà été exécuté pour la source donnée, aucun nouveau pipeline ne démarre. La valeur par défaut est false.
  • bigtableReadProjectId: ID du projet Bigtable. La valeur par défaut est le projet pour le job Dataflow.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bigtable change streams to Pub/Sub template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • BIGTABLE_INSTANCE_ID : ID de votre instance Bigtable.
  • BIGTABLE_TABLE_ID : ID de votre table Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID : ID de votre profil d'application Bigtable.
  • PUBSUB_TOPIC : nom du sujet de destination Pub/Sub.

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • BIGTABLE_INSTANCE_ID : ID de votre instance Bigtable.
  • BIGTABLE_TABLE_ID : ID de votre table Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID : ID de votre profil d'application Bigtable.
  • PUBSUB_TOPIC : nom du sujet de destination Pub/Sub.
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub;

import com.google.cloud.Timestamp;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation.MutationType;
import com.google.cloud.bigtable.data.v2.models.DeleteCells;
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions;
import com.google.cloud.teleport.v2.bigtable.utils.UnsupportedEntryException;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamsToPubSubOptions;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.FailsafePublisher.PublishModJsonToTopic;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.BigtableSource;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.MessageEncoding;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.MessageFormat;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.Mod;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.ModType;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.PubSubDestination;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.TestChangeStreamMutation;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.schemautils.PubSubUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.GetTopicRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.ValidateMessageRequest;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests {@link ChangeStreamMutation} from Bigtable change stream. The {@link
 * ChangeStreamMutation} is then broken into {@link Mod}, which converted into PubsubMessage and
 * inserted into Pub/Sub topic.
 */
@Template(
    name = "Bigtable_Change_Streams_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Bigtable Change Streams to PubSub",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into PubSub using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamsToPubSubOptions.class,
    optionsOrder = {
      BigtableChangeStreamsToPubSubOptions.class,
      BigtableCommonOptions.ReadChangeStreamOptions.class,
      BigtableCommonOptions.ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-bigtable-change-streams-to-pubsub",
    flexContainerName = "bigtable-changestreams-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    streaming = true,
    supportsAtLeastOnce = true)
public final class BigtableChangeStreamsToPubSub {

  /** String/String Coder for {@link FailsafeElement}. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  private static final Logger LOG = LoggerFactory.getLogger(BigtableChangeStreamsToPubSub.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    LOG.info("Starting to replicate change records from Cloud Bigtable change streams to PubSub");

    BigtableChangeStreamsToPubSubOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamsToPubSubOptions.class);

    run(options);
  }

  private static void validateOptions(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getDlqRetryMinutes() <= 0) {
      throw new IllegalArgumentException("dlqRetryMinutes must be positive.");
    }
    if (options.getDlqMaxRetries() < 0) {
      throw new IllegalArgumentException("dlqMaxRetries cannot be negative.");
    }
  }

  private static void setOptions(BigtableChangeStreamsToPubSubOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(BigtableChangeStreamsToPubSubOptions options) {
    setOptions(options);
    validateOptions(options);

    String bigtableProject = getBigtableProjectId(options);

    // Retrieve and parse the startTimestamp
    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    BigtableSource sourceInfo =
        new BigtableSource(
            options.getBigtableReadInstanceId(),
            options.getBigtableReadTableId(),
            getBigtableCharset(options),
            options.getBigtableChangeStreamIgnoreColumnFamilies(),
            options.getBigtableChangeStreamIgnoreColumns(),
            startTimestamp);

    Topic topic = null;
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      GetTopicRequest request =
          GetTopicRequest.newBuilder()
              .setTopic(
                  TopicName.ofProjectTopicName(
                          getPubSubProjectId(options), options.getPubSubTopic())
                      .toString())
              .build();
      topic = topicAdminClient.getTopic(request);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }

    try {
      if (!validateSchema(topic, options, sourceInfo)) {
        final String errorMessage = "Configured topic doesn't accept messages of configured format";
        throw new IllegalArgumentException(errorMessage);
      }

    } catch (Exception e) {
      throw new IllegalArgumentException(e);
    }

    PubSubDestination destinationInfo = newPubSubDestination(options, topic);
    PubSubUtils pubSub = new PubSubUtils(sourceInfo, destinationInfo);

    /*
     * Stages: 1) Read {@link ChangeStreamMutation} from change stream. 2) Create {@link
     * FailsafeElement} of {@link Mod} JSON and merge from: - {@link ChangeStreamMutation}. - GCS Dead
     * letter queue. 3) Convert {@link Mod} JSON into PubsubMessage and publish it to PubSub.
     * 4) Write Failures from 2) and 3) to GCS dead letter queue.
     */
    // Step 1
    Pipeline pipeline = Pipeline.create(options);

    // Register the coders for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
    String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";

    if (options.getDisableDlqRetries()) {
      LOG.info(
          "Disabling retries for the DLQ, directly writing into severe DLQ: {}",
          dlqManager.getSevereDlqDirectoryWithDateTime());
      dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
      tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
    }

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
                    : BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProject)
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withStartTime(startTimestamp);

    if (!StringUtils.isBlank(options.getBigtableChangeStreamMetadataTableTableId())) {
      readChangeStream =
          readChangeStream.withMetadataTableTableId(
              options.getBigtableChangeStreamMetadataTableTableId());
    }
    // Step 2: just return the output for sending to pubSub and DLQ
    PCollection<ChangeStreamMutation> dataChangeRecord =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply(Values.create());

    PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
        dataChangeRecord
            .apply(
                "ChangeStreamMutation To Mod JSON",
                ParDo.of(new ChangeStreamMutationToModJsonFn(sourceInfo)))
            .apply(
                "Wrap Mod JSON In FailsafeElement",
                ParDo.of(
                    new DoFn<String, FailsafeElement<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element String input,
                          OutputReceiver<FailsafeElement<String, String>> receiver) {
                        receiver.output(FailsafeElement.of(input, input));
                      }
                    }))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    PCollectionTuple dlqModJson =
        dlqManager.getReconsumerDataTransform(
            pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));

    PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson = null;
    if (options.getDisableDlqRetries()) {
      retryableDlqFailsafeModJson = pipeline.apply(Create.empty(FAILSAFE_ELEMENT_CODER));
    } else {
      retryableDlqFailsafeModJson =
          dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
    }

    PCollection<FailsafeElement<String, String>> failsafeModJson =
        PCollectionList.of(sourceFailsafeModJson)
            .and(retryableDlqFailsafeModJson)
            .apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());

    FailsafePublisher.FailsafeModJsonToPubsubMessageOptions failsafeModJsonToPubsubOptions =
        FailsafePublisher.FailsafeModJsonToPubsubMessageOptions.builder()
            .setCoder(FAILSAFE_ELEMENT_CODER)
            .build();

    PublishModJsonToTopic publishModJsonToTopic =
        new PublishModJsonToTopic(pubSub, failsafeModJsonToPubsubOptions);

    PCollection<FailsafeElement<String, String>> failedToPublish =
        failsafeModJson.apply("Publish Mod JSON To Pubsub", publishModJsonToTopic);

    PCollection<String> transformDlqJson =
        failedToPublish.apply(
            "Failed Mod JSON During Table Row Transformation",
            MapElements.via(new StringDeadLetterQueueSanitizer()));

    PCollectionList.of(transformDlqJson)
        .apply("Merge Failed Mod JSON From Transform And PubSub", Flatten.pCollections())
        .apply(
            "Write Failed Mod JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqDirectory)
                .withTmpDirectory(tempDlqDirectory)
                .setIncludePaneInfo(true)
                .build());

    PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
        dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
    LOG.info(
        "DLQ manager severe DLQ directory with date time: {}",
        dlqManager.getSevereDlqDirectoryWithDateTime());
    LOG.info("DLQ manager severe DLQ directory: {}", dlqManager.getSevereDlqDirectory() + "tmp/");
    nonRetryableDlqModJsonFailsafe
        .apply(
            "Write Mod JSON With Non-retriable Error To DLQ",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static PubSubDestination newPubSubDestination(
      BigtableChangeStreamsToPubSubOptions options, Topic topic) {
    return new PubSubDestination(
        getPubSubProjectId(options),
        options.getPubSubTopic(),
        topic,
        options.getMessageFormat(),
        options.getMessageEncoding(),
        options.getUseBase64Rowkeys(),
        options.getUseBase64ColumnQualifiers(),
        options.getUseBase64Values(),
        options.getStripValues());
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamsToPubSubOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDlqDirectory().isEmpty() ? tempLocation + "dlq/" : options.getDlqDirectory();

    LOG.info("DLQ directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetries());
  }

  private static String getBigtableCharset(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getBigtableChangeStreamCharset())
        ? "UTF-8"
        : options.getBigtableChangeStreamCharset();
  }

  private static String getBigtableProjectId(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static String getPubSubProjectId(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getPubSubProjectId())
        ? options.getProject()
        : options.getPubSubProjectId();
  }

  private static Boolean validateSchema(
      Topic topic, BigtableChangeStreamsToPubSubOptions options, BigtableSource source)
      throws Exception {
    String messageFormatPath = topic.getSchemaSettings().getSchema();
    if (topic.getSchemaSettings().getSchema().isEmpty()) {
      validateIncompatibleEncoding(options);
      LOG.info(
          "Topic has no schema configured, pipeline will use message format: {}, message encoding: {}",
          options.getMessageFormat(),
          options.getMessageEncoding());
      return true;
    } else {
      SchemaName schemaName = SchemaName.parse(topic.getSchemaSettings().getSchema());
      Schema schema;
      try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
        schema = schemaServiceClient.getSchema(schemaName);
      }

      options.setMessageEncoding(toMessageEncoding(topic.getSchemaSettings().getEncoding()));

      Schema.Type schemaType = schema.getType();
      switch (schemaType) {
        case AVRO:
          options.setMessageFormat(MessageFormat.AVRO);
          validateNoUseOfBase64(options);
          break;
        case PROTOCOL_BUFFER:
          if (options.getMessageEncoding() == MessageEncoding.JSON) {
            options.setMessageFormat(MessageFormat.JSON);
          } else {
            options.setMessageFormat(MessageFormat.PROTOCOL_BUFFERS);
            validateNoUseOfBase64(options);
          }
          break;
        case TYPE_UNSPECIFIED:
        case UNRECOGNIZED:
          // Not overriding messageFormat, will try what customer configured or the default if
          // not configured
          break;
        default:
          throw new IllegalArgumentException("Topic schema type is not supported: " + schemaType);
      }

      LOG.info("Topic has schema configured: {}", topic.getSchemaSettings().getSchema());
      LOG.info(
          "Pipeline will use message format: {}, message encoding: {}",
          options.getMessageFormat(),
          options.getMessageEncoding());

      PubSubDestination destination = newPubSubDestination(options, topic);
      PubSubUtils pubSub = new PubSubUtils(source, destination);

      ByteString testChangeMessageData = createTestChangeMessage(pubSub).getData();
      Encoding encoding = toPubSubEncoding(options.getMessageEncoding());
      try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
        String testMessageEncoded = toBase64String(testChangeMessageData);
        LOG.info("Validating a test message (Base64 encoded): {}", testMessageEncoded);
        ValidateMessageRequest request =
            ValidateMessageRequest.newBuilder()
                .setParent("projects/" + pubSub.getDestination().getPubSubProject())
                .setEncoding(encoding)
                .setMessage(testChangeMessageData)
                .setName(messageFormatPath)
                .build();
        schemaServiceClient.validateMessage(request);
        LOG.info("Test message successfully validated.");
      } catch (Exception e) {
        throw new IllegalArgumentException("Failed to validate test message", e);
      }
    }
    return true;
  }

  private static void validateNoUseOfBase64(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getUseBase64Values()) {
      throw new IllegalArgumentException(
          "useBase64Values values can only be used with topics accepting JSON messages");
    }
    if (options.getUseBase64Rowkeys()) {
      throw new IllegalArgumentException(
          "useBase64Rowkeys values can only be used with topics accepting JSON messages");
    }
    if (options.getUseBase64ColumnQualifiers()) {
      throw new IllegalArgumentException(
          "useBase64ColumnQualifiers values can only be used with topics accepting JSON messages");
    }
  }

  private static void validateIncompatibleEncoding(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getMessageEncoding() == MessageEncoding.BINARY
        && options.getMessageFormat() == MessageFormat.JSON) {
      throw new IllegalArgumentException(
          "JSON message format is incompatible with BINARY message encoding");
    }
  }

  private static MessageEncoding toMessageEncoding(Encoding encoding) {
    if (encoding == null) {
      return MessageEncoding.JSON;
    }
    switch (encoding) {
      case JSON:
      case ENCODING_UNSPECIFIED:
      case UNRECOGNIZED:
        return MessageEncoding.JSON;
      case BINARY:
        return MessageEncoding.BINARY;
      default:
        throw new IllegalArgumentException("Topic has unsupported message encoding: " + encoding);
    }
  }

  private static String toBase64String(ByteString testChangeMessageData) {
    return Base64.getEncoder().encodeToString(testChangeMessageData.toByteArray());
  }

  private static Encoding toPubSubEncoding(MessageEncoding messageEncoding) {
    switch (messageEncoding) {
      case BINARY:
        return Encoding.BINARY;
      case JSON:
        return Encoding.JSON;
      default:
        throw new IllegalArgumentException("Unexpected message encoding: " + messageEncoding);
    }
  }

  private static PubsubMessage createTestChangeMessage(PubSubUtils pubSub) throws Exception {
    SetCell setCell =
        SetCell.create(
            "test_column_family",
            ByteString.copyFrom("test_column", Charset.defaultCharset()),
            1000L, // timestamp
            ByteString.copyFrom("test_value", Charset.defaultCharset()));

    TestChangeStreamMutation mutation =
        new TestChangeStreamMutation(
            "test_rowkey",
            MutationType.USER,
            "source_cluster",
            org.threeten.bp.Instant.now(), // commit timestamp
            1, // tiebreaker
            "token",
            org.threeten.bp.Instant.now(), // low watermark
            setCell);

    Mod mod = new Mod(pubSub.getSource(), mutation, setCell);

    switch (pubSub.getDestination().getMessageFormat()) {
      case AVRO:
        return pubSub.mapChangeJsonStringToPubSubMessageAsAvro(mod.getChangeJson());
      case PROTOCOL_BUFFERS:
        return pubSub.mapChangeJsonStringToPubSubMessageAsProto(mod.getChangeJson());
      case JSON:
        return pubSub.mapChangeJsonStringToPubSubMessageAsJson(mod.getChangeJson());
      default:
        throw new IllegalArgumentException(
            "Unexpected message format: " + pubSub.getDestination().getMessageFormat());
    }
  }

  /**
   * DoFn that converts a {@link ChangeStreamMutation} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class ChangeStreamMutationToModJsonFn extends DoFn<ChangeStreamMutation, String> {

    private final BigtableSource sourceInfo;

    ChangeStreamMutationToModJsonFn(BigtableSource source) {
      this.sourceInfo = source;
    }

    @ProcessElement
    public void process(@Element ChangeStreamMutation input, OutputReceiver<String> receiver)
        throws Exception {
      for (Entry entry : input.getEntries()) {
        ModType modType = getModType(entry);

        Mod mod = null;
        switch (modType) {
          case SET_CELL:
            mod = new Mod(sourceInfo, input, (SetCell) entry);
            break;
          case DELETE_CELLS:
            mod = new Mod(sourceInfo, input, (DeleteCells) entry);
            break;
          case DELETE_FAMILY:
            mod = new Mod(sourceInfo, input, (DeleteFamily) entry);
            break;
          default:
          case UNKNOWN:
            throw new UnsupportedEntryException(
                "Cloud Bigtable change stream entry of type "
                    + entry.getClass().getName()
                    + " is not supported. The entry was put into a DLQ directory. "
                    + "Please update your Dataflow template with the latest template version");
        }

        String modJsonString;

        try {
          modJsonString = mod.toJson();
        } catch (IOException e) {
          // Ignore exception and print bad format.
          modJsonString = String.format("\"%s\"", input);
        }
        receiver.output(modJsonString);
      }
    }

    private ModType getModType(Entry entry) {
      if (entry instanceof SetCell) {
        return ModType.SET_CELL;
      } else if (entry instanceof DeleteCells) {
        return ModType.DELETE_CELLS;
      } else if (entry instanceof DeleteFamily) {
        return ModType.DELETE_FAMILY;
      }
      return ModType.UNKNOWN;
    }
  }
}

Étape suivante