Modèle de flux de modifications Bigtable vers BigQuery

Le modèle de flux de modifications Bigtable vers BigQuery est un pipeline de flux de données qui diffuse les enregistrements de modification de données Bigtable et les écrit dans des tables BigQuery à l'aide de Dataflow.

Un flux de modifications Bigtable vous permet de vous abonner aux mutations de données par table. Lorsque vous vous abonnez à des flux de modification de table, les contraintes suivantes s'appliquent :

  • Seuls les cellules et les descripteurs modifiés des opérations de suppression sont renvoyés.
  • Seule la nouvelle valeur d'une cellule modifiée est renvoyée.

Lorsque des enregistrements de modification de données sont écrits dans BigQuery, les lignes peuvent être insérées dans le désordre par rapport à l'ordre d'horodatage de commit Bigtable d'origine.

Les lignes de la table du journal des modifications ne pouvant pas être écrites dans BigQuery en raison d'une erreur persistante sont définitivement placées dans une file d'attente de lettres mortes (file d'attente de messages non traités) dans Cloud Storage pour un examen manuel ou un traitement ultérieur réalisé par l'utilisateur.

Si la table BigQuery nécessaire n'existe pas, le pipeline la crée. Sinon, une table BigQuery existante est utilisée. Le schéma des tables BigQuery existantes doit contenir les colonnes dans la table suivante.

Chaque nouvelle ligne BigQuery inclut un enregistrement de modification de données renvoyé par le flux de modifications depuis la ligne correspondante dans la table Bigtable.

Schéma de la table de sortie BigQuery

Nom de la colonne Type Nullable Description
row_key STRING ou BYTES Non Clé de ligne de la ligne modifiée. Lorsque l'option de pipeline writeRowkeyAsBytes est définie sur true, le type de la colonne doit être BYTES. Sinon, utilisez le type STRING.
mod_type STRING Non Type de mutation de ligne. Utilisez l'une des valeurs suivantes : SET_CELL, DELETE_CELLS ou DELETE_FAMILY.
column_family STRING Non Famille de colonnes affectée par la mutation de ligne.
column STRING Oui Qualificatif de colonne affecté par la mutation de ligne. Pour le type de mutation DELETE_FAMILY, définissez cette valeur sur NULL.
commit_timestamp TIMESTAMP Non Date/heure à laquelle Bigtable applique la mutation.
big_query_commit_timestamp TIMESTAMP Oui Facultatif : spécifie la date et l'heure auxquelles BigQuery écrit la ligne dans une table de sortie. Le champ n'est pas renseigné si le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
timestamp TIMESTAMP ou INT64 Oui Valeur d'horodatage de la cellule affectée par la mutation. Lorsque l'option de pipeline writeNumericTimestamps est définie sur true, le type de la colonne doit être INT64. Sinon, utilisez le type TIMESTAMP. Pour les types de mutation DELETE_CELLS et DELETE_FAMILY, définissez la valeur sur NULL.
timestamp_from TIMESTAMP ou INT64 Oui Décrit un début inclusif de l'intervalle d'horodatage pour toutes les cellules supprimées par la mutation DELETE_CELLS. Pour les autres types de mutation, définissez cette valeur sur NULL.
timestamp_to TIMESTAMP ou INT64 Oui Décrit une fin exclusive de l'intervalle d'horodatage pour toutes les cellules supprimées par la mutation DELETE_CELLS. Pour les autres types de mutation, définissez cette valeur sur NULL.
is_gc BOOL Non Facultatif : lorsque la mutation est déclenchée par une stratégie de récupération de mémoire, définissez cette valeur sur true. Dans tous les autres cas, définissez cette valeur sur false. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
source_instance STRING Non Facultatif : décrit le nom de l'instance Bigtable d'où provient la mutation. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
source_cluster STRING Non Facultatif : décrit le nom du cluster Bigtable d'où provient la mutation. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
source_table STRING Non Facultatif : décrit le nom de la table Bigtable à laquelle la mutation s'applique. La valeur dans cette colonne peut être utile si plusieurs tables Bigtable diffusent des modifications vers la même table BigQuery. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
tiebreaker INT64 Non Facultatif : lorsque deux mutations sont enregistrées en même temps par différents clusters Bigtable, la mutation avec la valeur tiebreaker la plus élevée est appliquée à la table source. Les mutations ayant des valeurs tiebreaker inférieures sont supprimées. Le champ n'est pas renseigné lorsque le nom de la colonne est présent dans la valeur de l'option de pipeline bigQueryChangelogTableFieldsToIgnore.
value STRING ou BYTES Oui Nouvelle valeur définie par la mutation. Lorsque l'option de pipeline writeValuesAsBytes est définie sur true, le type de la colonne doit être BYTES. Sinon, utilisez le type STRING. La valeur est définie pour les mutations SET_CELL. Pour les autres types de mutation, la valeur est définie sur NULL.

Conditions requises pour ce pipeline

  • Instance source Bigtable spécifiée.
  • Table source Bigtable spécifiée. Les flux de modifications doivent être activés dans la table.
  • Profil d'application Bigtable spécifié.
  • Ensemble de données de destination BigQuery spécifié.

Paramètres de modèle

Paramètres obligatoires

  • bigQueryDataset: nom de l'ensemble de données de la table BigQuery de destination.
  • bigtableChangeStreamAppProfile: ID du profil d'application Bigtable. Le profil d'application doit utiliser un routage à cluster unique et autoriser les transactions à ligne unique.
  • bigtableReadInstanceId: ID de l'instance Bigtable source.
  • bigtableReadTableId: ID de la table Bigtable source.

Paramètres facultatifs

  • writeRowkeyAsBytes: indique s'il faut écrire les clés de ligne en tant que BYTES BigQuery. Lorsque ce paramètre est défini sur true, les clés de ligne sont écrites dans la colonne BYTES. Sinon, les clés de ligne sont écrites dans la colonne STRING. La valeur par défaut est false.
  • writeValuesAsBytes: lorsque ce paramètre est défini sur true, les valeurs sont écrites dans une colonne de type BYTES. Sinon, elles sont écrites dans une colonne de type STRING. La valeur par défaut est : false.
  • writeNumericTimestamps: indique s'il faut écrire le code temporel Bigtable en tant que INT64 BigQuery. Lorsque ce paramètre est défini sur true, les valeurs sont écrites dans la colonne INT64. Sinon, les valeurs sont écrites dans la colonne TIMESTAMP. Colonnes concernées: timestamp, timestamp_from et timestamp_to. La valeur par défaut est false. Lorsque ce paramètre est défini sur true, le temps est mesuré en microsecondes depuis l'époque Unix (1er janvier 1970 à l'heure UTC).
  • bigQueryProjectId: ID de projet de l'ensemble de données BigQuery. La valeur par défaut est le projet pour le job Dataflow.
  • bigQueryChangelogTableName: nom de la table BigQuery de destination. Si cette option n'est pas spécifiée, la valeur bigtableReadTableId + "_changelog" est utilisée. La valeur par défaut est vide.
  • bigQueryChangelogTablePartitionGranularity: spécifie la précision pour le partitionnement de la table des journaux de modifications. Lorsque ce paramètre est défini, la table est partitionnée. Utilisez l'une des valeurs acceptées suivantes: HOUR, DAY, MONTH ou YEAR. Par défaut, la table n'est pas partitionnée.
  • bigQueryChangelogTablePartitionExpirationMs: définit le délai d'expiration de la partition de table des journaux de modifications, en millisecondes. Lorsque ce paramètre est défini sur true, les partitions antérieures au nombre de millisecondes spécifié sont supprimées. Par défaut, aucun délai d'expiration n'est défini.
  • bigQueryChangelogTableFieldsToIgnore: liste des colonnes du journal des modifications, séparées par une virgule, qui ne sont pas créées ni renseignées si ce paramètre est spécifié. Utilisez l'une des valeurs acceptées suivantes : is_gc, source_instance, source_cluster, source_table, tiebreaker ou big_query_commit_timestamp. Par défaut, toutes les colonnes sont renseignées.
  • dlqDirectory: répertoire à utiliser pour la file d'attente de lettres mortes. Les enregistrements dont le traitement échoue sont stockés dans ce répertoire. La valeur par défaut est un répertoire situé sous l'emplacement temporaire du job Dataflow. Dans la plupart des cas, vous pouvez utiliser le chemin par défaut.
  • bigtableChangeStreamMetadataInstanceId: ID d'instance de métadonnées du flux de modifications Bigtable. La valeur par défaut est vide.
  • bigtableChangeStreamMetadataTableTableId: ID de la table de métadonnées du connecteur de flux de modifications Bigtable. Si aucune valeur n'est fournie, une table de métadonnées du connecteur de flux de modifications Bigtable est automatiquement créée pendant l'exécution du pipeline. La valeur par défaut est vide.
  • bigtableChangeStreamCharset: nom du charset de flux de modifications Bigtable. La valeur par défaut est UTF8.
  • bigtableChangeStreamStartTimestamp: code temporel de début (https://tools.ietf.org/html/rfc3339), inclusif, à utiliser pour la lecture des flux de modifications. Par exemple, 2022-05-05T07:59:59Z. La valeur par défaut est l'horodatage de l'heure de début du pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: liste des modifications de noms de familles de colonnes, séparées par une virgule, à ignorer. La valeur par défaut est vide.
  • bigtableChangeStreamIgnoreColumns: liste des modifications de noms de colonnes, séparées par une virgule, à ignorer. La valeur par défaut est vide.
  • bigtableChangeStreamName: nom unique pour le pipeline client. Permet de reprendre le traitement à partir du moment où un pipeline précédemment exécuté s'est arrêté. La valeur par défaut est un nom généré automatiquement. Consultez les journaux du job Dataflow pour connaître la valeur utilisée.
  • bigtableChangeStreamResume: lorsque ce paramètre est défini sur true, un nouveau pipeline reprend le traitement à partir du moment où un pipeline précédemment exécuté avec la même valeur bigtableChangeStreamName s'est arrêté. Si le pipeline avec la valeur bigtableChangeStreamName donnée n'a jamais été exécuté, aucun nouveau pipeline ne démarre. Lorsque ce paramètre est défini sur false, un nouveau pipeline démarre. Si un pipeline avec la même valeur bigtableChangeStreamName a déjà été exécuté pour la source donnée, aucun nouveau pipeline ne démarre. La valeur par défaut est false.
  • bigtableReadProjectId: ID du projet Bigtable. La valeur par défaut est le projet pour le job Dataflow.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Bigtable change streams to BigQuery template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • BIGTABLE_INSTANCE_ID : ID de votre instance Bigtable.
  • BIGTABLE_TABLE_ID : ID de votre table Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID : ID de votre profil d'application Bigtable.
  • BIGQUERY_DESTINATION_DATASET : nom de l'ensemble de données de destination BigQuery.

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • BIGTABLE_INSTANCE_ID : ID de votre instance Bigtable.
  • BIGTABLE_TABLE_ID : ID de votre table Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID : ID de votre profil d'application Bigtable.
  • BIGQUERY_DESTINATION_DATASET : nom de l'ensemble de données de destination BigQuery.
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.Timestamp;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.DeleteCells;
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadChangeStreamOptions;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions.ReadOptions;
import com.google.cloud.teleport.v2.bigtable.utils.UnsupportedEntryException;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamToBigQueryOptions;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.BigQueryDestination;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.BigtableSource;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.model.ModType;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.ExistingPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests {@link ChangeStreamMutation} from Bigtable change stream. The {@link
 * ChangeStreamMutation} is then broken into {@link Mod}, which converted into {@link TableRow} and
 * inserted into BigQuery table.
 */
@Template(
    name = "Bigtable_Change_Streams_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Bigtable Change Streams to BigQuery",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into BigQuery using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamToBigQueryOptions.class,
    optionsOrder = {
      BigtableChangeStreamToBigQueryOptions.class,
      ReadChangeStreamOptions.class,
      ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-bigtable-change-streams-to-bigquery",
    flexContainerName = "bigtable-changestreams-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    streaming = true)
public final class BigtableChangeStreamsToBigQuery {
  private static final Logger LOG = LoggerFactory.getLogger(BigtableChangeStreamsToBigQuery.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    LOG.info("Starting to replicate change records from Cloud Bigtable change streams to BigQuery");

    BigtableChangeStreamToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamToBigQueryOptions.class);

    run(options);
  }

  private static void setOptions(BigtableChangeStreamToBigQueryOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(BigtableChangeStreamToBigQueryOptions options) {
    setOptions(options);

    String changelogTableName = getBigQueryChangelogTableName(options);
    String bigtableProject = getBigtableProjectId(options);
    String bigQueryProject = getBigQueryProjectId(options);
    String bigQueryDataset = options.getBigQueryDataset();

    // If dataset doesn't exist and not checked, pipeline will start failing only after it sees the
    // first change from Cloud Bigtable. BigQueryIO can create table if it doesn't exist, but it
    // cannot create a dataset
    validateBigQueryDatasetExists(bigQueryProject, bigQueryDataset);

    // Retrieve and parse the startTimestamp
    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    BigtableSource sourceInfo =
        new BigtableSource(
            options.getBigtableReadInstanceId(),
            options.getBigtableReadTableId(),
            getBigtableCharset(options),
            options.getBigtableChangeStreamIgnoreColumnFamilies(),
            options.getBigtableChangeStreamIgnoreColumns(),
            startTimestamp);

    BigQueryDestination destinationInfo =
        new BigQueryDestination(
            bigQueryProject,
            bigQueryDataset,
            changelogTableName,
            options.getWriteRowkeyAsBytes(),
            options.getWriteValuesAsBytes(),
            options.getWriteNumericTimestamps(),
            options.getBigQueryChangelogTablePartitionGranularity(),
            options.getBigQueryChangelogTablePartitionExpirationMs(),
            options.getBigQueryChangelogTableFieldsToIgnore());

    BigQueryUtils bigQuery = new BigQueryUtils(sourceInfo, destinationInfo);

    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? ExistingPipelineOptions.RESUME_OR_FAIL
                    : ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProject)
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withStartTime(startTimestamp);

    if (!StringUtils.isBlank(options.getBigtableChangeStreamMetadataTableTableId())) {
      readChangeStream =
          readChangeStream.withMetadataTableTableId(
              options.getBigtableChangeStreamMetadataTableTableId());
    }

    PCollection<ChangeStreamMutation> dataChangeRecord =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply(Values.create());

    PCollection<TableRow> changeStreamMutationToTableRow =
        dataChangeRecord.apply(
            "ChangeStreamMutation To TableRow",
            ParDo.of(new ChangeStreamMutationToTableRowFn(sourceInfo, bigQuery)));

    Write<TableRow> bigQueryWrite =
        BigQueryIO.<TableRow>write()
            .to(destinationInfo.getBigQueryTableReference())
            .withSchema(bigQuery.getDestinationTableSchema())
            .withFormatFunction(element -> element)
            .withFormatRecordOnFailureFunction(element -> element)
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withExtendedErrorInfo()
            .withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
            .withNumStorageWriteApiStreams(0)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors());

    if (destinationInfo.isPartitioned()) {
      bigQueryWrite = bigQueryWrite.withTimePartitioning(bigQuery.getTimePartitioning());
    }

    // Unfortunately, due to https://github.com/apache/beam/issues/24090, it is no longer possible
    // to pass metadata via fake columns when writing to BigQuery. Previously we'd pass something
    // like retry count and then format it out before writing, but BQ would return original object
    // which would allow us to increment retry count and store it to DLQ with incremented number.
    // Because WRITE API doesn't allow access to original object, all metadata values are stripped
    // and we can only rely on retry policy and put all other persistently failing rows to DLQ as
    // a non-retriable severe failure.
    //
    // Since we're not going to be retrying such failures, we'll not use any reading from DLQ
    // capability.

    WriteResult writeResult =
        changeStreamMutationToTableRow.apply("Write To BigQuery", bigQueryWrite);

    writeResult
        .getFailedStorageApiInserts()
        .apply(
            "Failed Mod JSON During BigQuery Writes",
            MapElements.via(new BigQueryDeadLetterQueueSanitizer()))
        .apply(
            "Write rejected TableRow JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectory() + "YYYY/MM/dd/HH/mm/")
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static void validateBigQueryDatasetExists(
      String bigQueryProject, String bigQueryDataset) {
    BigQueryOptions options = BigQueryOptions.newBuilder().build();
    options.setThrowNotFound(true);

    BigQuery bigQuery = options.getService();
    bigQuery.getDataset(DatasetId.of(bigQueryProject, bigQueryDataset));
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamToBigQueryOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDlqDirectory().isEmpty() ? tempLocation + "dlq/" : options.getDlqDirectory();

    LOG.info("Dead letter queue directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, 1);
  }

  private static String getBigtableCharset(BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigtableChangeStreamCharset())
        ? "UTF-8"
        : options.getBigtableChangeStreamCharset();
  }

  private static String getBigtableProjectId(BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static String getBigQueryChangelogTableName(
      BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigQueryChangelogTableName())
        ? options.getBigtableReadTableId() + "_changelog"
        : options.getBigQueryChangelogTableName();
  }

  private static String getBigQueryProjectId(BigtableChangeStreamToBigQueryOptions options) {
    return StringUtils.isEmpty(options.getBigQueryProjectId())
        ? options.getProject()
        : options.getBigQueryProjectId();
  }

  /**
   * DoFn that converts a {@link ChangeStreamMutation} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class ChangeStreamMutationToTableRowFn extends DoFn<ChangeStreamMutation, TableRow> {
    private final BigtableSource sourceInfo;
    private final BigQueryUtils bigQuery;

    ChangeStreamMutationToTableRowFn(BigtableSource source, BigQueryUtils bigQuery) {
      this.sourceInfo = source;
      this.bigQuery = bigQuery;
    }

    @ProcessElement
    public void process(@Element ChangeStreamMutation input, OutputReceiver<TableRow> receiver)
        throws Exception {
      for (Entry entry : input.getEntries()) {
        ModType modType = getModType(entry);

        Mod mod = null;
        switch (modType) {
          case SET_CELL:
            mod = new Mod(sourceInfo, input, (SetCell) entry);
            break;
          case DELETE_CELLS:
            mod = new Mod(sourceInfo, input, (DeleteCells) entry);
            break;
          case DELETE_FAMILY:
            mod = new Mod(sourceInfo, input, (DeleteFamily) entry);
            break;
          default:
          case UNKNOWN:
            throw new UnsupportedEntryException(
                "Cloud Bigtable change stream entry of type "
                    + entry.getClass().getName()
                    + " is not supported. The entry was put into a dead letter queue directory. "
                    + "Please update your Dataflow template with the latest template version");
        }

        TableRow tableRow = new TableRow();
        if (bigQuery.setTableRowFields(mod, tableRow)) {
          receiver.output(tableRow);
        }
      }
    }

    private ModType getModType(Entry entry) {
      if (entry instanceof SetCell) {
        return ModType.SET_CELL;
      } else if (entry instanceof DeleteCells) {
        return ModType.DELETE_CELLS;
      } else if (entry instanceof DeleteFamily) {
        return ModType.DELETE_FAMILY;
      }
      return ModType.UNKNOWN;
    }
  }
}

Étape suivante