Vorlage "Bigtable-Änderungsstreams für Pub/Sub"

Die Vorlage "Bigtable-Änderungsstreams für Pub/Sub" ist eine Streamingpipeline, die Bigtable-Datenänderungsdatensätze streamt und sie mithilfe von Dataflow in einem Pub/Sub-Thema veröffentlicht.

Mit einem Bigtable-Änderungsstream können Sie Datenmutationen auf Tabellenbasis abonnieren. Wenn Sie Änderungsstreams für Tabellen abonnieren, gelten die folgenden Einschränkungen:

  • Es werden nur geänderte Zellen und Deskriptoren von Löschvorgängen zurückgegeben.
  • Es wird nur der neue Wert einer geänderten Zelle zurückgegeben.

Wenn Datensätze zur Datenänderung in einem Pub/Sub-Thema veröffentlicht werden, könnten Nachrichten in der falschen Reihenfolge eingefügt werden im Vergleich zur ursprünglichen Reihenfolge des Bigtable-Commits.

Bigtable-Datenänderungseinträge, die nicht in Pub/Sub-Themen veröffentlicht werden können, werden vorübergehend in einem Verzeichnis für Dead-Letter-Warteschlangen (unverarbeitete Nachrichtenwarteschlange) in Cloud Storage abgelegt. Nach der maximalen Anzahl fehlgeschlagener Wiederholungsversuche werden diese Einträge für eine manuelle Überprüfung oder weitere Verarbeitung durch den Nutzer auf unbestimmte Zeit im selben Warteschlangenverzeichnis für unzustellbare Nachrichten abgelegt.

Für die Pipeline ist es erforderlich, dass das Ziel-Pub/Sub-Thema vorhanden ist. Das Zielthema kann so konfiguriert sein, dass Nachrichten mit einem Schema validiert werden. Wenn für ein Pub/Sub-Thema ein Schema angegeben ist, wird die Pipeline nur gestartet, wenn das Schema gültig ist. Verwenden Sie je nach Schematyp eine der folgenden Schemadefinitionen für das Zielthema:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  
{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

Verwenden Sie das folgende Protobuf-Schema mit JSON-Nachrichtencodierung:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

Jede neue Pub/Sub-Nachricht enthält einen Eintrag aus einem Datensatz zu Datenänderungen aus der entsprechenden Zeile in Ihrer Bigtable-Tabelle. In der Pub/Sub-Vorlage werden die Einträge in jedem Datenänderungseintrag zu einzelnen Änderungen auf Zellebene zusammengeführt.

Beschreibung der Pub/Sub-Ausgabenachricht

Feldname Beschreibung
rowKey Der Zeilenschlüssel der geänderten Zeile. Wird in Form eines Byte-Arrays empfangen. Wenn die JSON-Nachrichtencodierung konfiguriert ist, werden Zeilenschlüssel als Strings zurückgegeben. Wenn useBase64Rowkeys angegeben ist, werden Zeilenschlüssel Base64-codiert. Andernfalls wird ein durch bigtableChangeStreamCharset angegebenes Zeichensatz verwendet, um Zeilenschlüsselbyte in einen String zu decodieren.
modType Der Typ der Zeilenmutation. Verwenden Sie einen der folgenden Werte: SET_CELL, DELETE_CELLS oder DELETE_FAMILY.
columnFamily Die Spaltenfamilie, die von der Zeilenmutation betroffen ist.
column Der Spaltenqualifizierer, der von der Zeilenmutation betroffen ist. Beim Mutationstyp DELETE_FAMILY ist das Spaltenfeld nicht festgelegt. Wird in Form eines Byte-Arrays empfangen. Wenn die JSON-Nachrichtencodierung konfiguriert ist, werden Spalten als Strings zurückgegeben. Wenn useBase64ColumnQualifier angegeben ist, wird das Spaltenfeld Base64-codiert. Andernfalls wird ein durch bigtableChangeStreamCharset angegebenes Zeichensatz verwendet, um Zeilenschlüsselbyte in einen String zu decodieren.
commitTimestamp Der Zeitpunkt, an dem Bigtable die Mutation anwendet. Die Zeit wird in Mikrosekunden seit der Unix-Epoche gemessen (1. Januar 1970 um UTC).
timestamp Der Zeitstempelwert der Zelle, die von der Mutation betroffen ist. Bei den Mutationstypen DELETE_CELLS und DELETE_FAMILY ist der Zeitstempel nicht festgelegt. Die Zeit wird in Mikrosekunden seit der Unix-Epoche gemessen (1. Januar 1970 um UTC).
timestampFrom Beschreibt den inclusive Beginn des Zeitstempelintervalls für alle Zellen, die durch die DELETE_CELLS-Mutation gelöscht wurden. Bei anderen Mutationstypen ist timestampFrom nicht festgelegt. Die Zeit wird in Mikrosekunden seit der Unix-Epoche gemessen (1. Januar 1970 um UTC).
timestampTo Beschreibt ein exklusives Ende des Zeitstempelintervalls für alle Zellen, die durch die DELETE_CELLS-Mutation gelöscht wurden. Bei anderen Mutationstypen ist timestampTo nicht festgelegt.
isGC Ein boolescher Wert, mit dem angegeben wird, ob die Mutation von einem Speicherbereinigungsmechanismus von Bigtable generiert wird.
tieBreaker Wenn zwei Mutationen von verschiedenen Bigtable-Clustern gleichzeitig registriert werden, wird die Mutation mit dem höchsten tiebreaker-Wert auf die Quelltabelle angewendet. Mutationen mit niedrigeren tiebreaker-Werten werden verworfen.
value Der neue Wert, der durch die Mutation festgelegt wurde. Sofern die Pipelineoption stripValues nicht festgelegt ist, wird der Wert für SET_CELL-Mutationen festgelegt. Bei anderen Mutationstypen ist der Wert nicht festgelegt. Wird in Form eines Byte-Arrays empfangen. Wenn die JSON-Nachrichtencodierung konfiguriert ist, werden Werte als Strings zurückgegeben. Wenn useBase64Values angegeben ist, wird der Wert Base64-codiert. Andernfalls wird ein durch bigtableChangeStreamCharset angegebener Zeichensatz verwendet, um Wertbyte in einen String zu decodieren.
sourceInstance Der Name der Bigtable-Instanz, in der die Mutation registriert wurde. Kann vorkommen, wenn mehrere Pipelines Änderungen aus verschiedenen Instanzen zum selben Pub/Sub-Thema streamen.
sourceCluster Der Name des Bigtable-Clusters, in dem die Mutation registriert wurde. Kann verwendet werden, wenn mehrere Pipelines Änderungen aus verschiedenen Instanzen zum selben Pub/Sub-Thema streamen.
sourceTable Der Name der Bigtable-Tabelle, in der die Mutation empfangen wurde. Kann verwendet werden, wenn mehrere Pipelines Änderungen von verschiedenen Tabellen zum selben Pub/Sub-Thema streamen.

Pipelineanforderungen

  • Die angegebene Bigtable-Quellinstanz.
  • Die angegebene Bigtable-Quelltabelle In der Tabelle müssen Änderungsstreams aktiviert sein.
  • Das angegebene Bigtable-Anwendungsprofil.
  • Das angegebene Pub/Sub-Thema muss vorhanden sein.

Vorlagenparameter

Erforderliche Parameter

  • pubSubTopic: Der Name des Pub/Sub-Zielthemas.
  • bigtableChangeStreamAppProfile: Die Bigtable-Anwendungsprofil-ID. Das Anwendungsprofil muss Single-Cluster-Routing verwenden und Transaktionen für einzelne Zeilen zulassen.
  • bigtableReadInstanceId: Die Bigtable-Quellinstanz-ID.
  • bigtableReadTableId: Die Bigtable-Quelltabellen-ID.

Optionale Parameter

  • messageEncoding: Die Codierung der Nachrichten, die im Pub/Sub-Thema veröffentlicht werden sollen. Wenn das Schema des Ziels konfiguriert ist, wird die Nachrichtencodierung durch die Themeneinstellungen bestimmt. Die folgenden Werte werden unterstützt: BINARY und JSON. Die Standardeinstellung ist JSON.
  • messageFormat: Die Codierung der Nachrichten, die im Pub/Sub-Thema veröffentlicht werden sollen. Wenn das Schema des Ziels konfiguriert ist, wird die Nachrichtencodierung durch die Themeneinstellungen bestimmt. Die folgenden Werte werden unterstützt: AVRO, PROTOCOL_BUFFERS und JSON. Der Standardwert ist JSON. Wenn das Format JSON verwendet wird, sind die Zeilen- und Wertfelder der Nachricht Strings, deren Inhalt durch die Pipelineoptionen useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values und bigtableChangeStreamCharset bestimmt wird.
  • stripValues: Wenn dieser Parameter auf true gesetzt ist, werden die SET_CELL-Mutationen zurückgegeben, ohne dass neue Werte festgelegt werden. Die Standardeinstellung ist false. Dieser Parameter ist nützlich, wenn Sie keinen neuen Wert haben müssen, der als Cache-Entwertung bezeichnet wird, oder wenn die Werte extrem groß sind und die Größenbeschränkungen für Pub/Sub-Nachrichten überschreiten.
  • dlqDirectory: Das Verzeichnis für die Warteschlange für unzustellbare Nachrichten. Datensätze, die nicht verarbeitet werden können, werden in diesem Verzeichnis gespeichert. Standardmäßig ist dies ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. In den meisten Fällen können Sie den Standardpfad verwenden.
  • dlqRetryMinutes: Die Anzahl der Minuten zwischen DLQ-Wiederholungen (Dead Letter Queue). Die Standardeinstellung ist 10.
  • dlqMaxRetries: Die maximale Anzahl der Wiederholungsversuche für die Dead-Letter-Queue. Die Standardeinstellung ist 5.
  • useBase64Rowkeys: Wird mit der JSON-Nachrichtencodierung verwendet. Wenn es auf true gesetzt ist, ist das Feld rowKey ein Base64-codierter String. Andernfalls wird rowKey mit bigtableChangeStreamCharset erstellt, um Bytes in einen String zu decodieren. Die Standardeinstellung ist false.
  • pubSubProjectId: Die Bigtable-Projekt-ID. Der Standardwert ist das Projekt für den Dataflow-Job.
  • useBase64ColumnQualifiers: Wird bei der JSON-Nachrichtencodierung verwendet. Wenn es auf true gesetzt ist, ist das Feld column ein Base64-codierter String. Andernfalls wird die Spalte erstellt, indem mit bigtableChangeStreamCharset Bytes in einen String decodiert werden. Die Standardeinstellung ist false.
  • useBase64Values: Wird mit der JSON-Nachrichtencodierung verwendet. Wenn „true“ festgelegt ist, ist das Wertfeld ein Base64-codierter String. Andernfalls wird der Wert durch Decodieren von Bytes in einen String mit bigtableChangeStreamCharset erstellt. Die Standardeinstellung ist false.
  • disableDlqRetries: Gibt an, ob Wiederholungsversuche für den DLQ deaktiviert werden sollen. Die Standardeinstellung ist "false".
  • bigtableChangeStreamMetadataInstanceId: Die Metadateninstanz-ID des Bigtable-Änderungsstreams. Die Standardeinstellung ist leer.
  • bigtableChangeStreamMetadataTableTableId: Die ID der Metadatentabelle des Bigtable-Änderungsstream-Connectors. Wenn nicht angegeben, wird während der Pipelineausführung automatisch eine Metadatentabelle für Bigtable-Änderungsstreams erstellt. Die Standardeinstellung ist leer.
  • bigtableChangeStreamCharset: Der Zeichensatzname des Bigtable-Änderungsstreams. Standardmäßig ist dies auf UTF8 eingestellt.
  • bigtableChangeStreamStartTimestamp: Der Startzeitstempel (https://tools.ietf.org/html/rfc3339) (einschließlich), der zum Lesen von Änderungsstreams verwendet wird. Beispiel: 2022-05-05T07:59:59Z Standardmäßig ist der Zeitstempel für den Start der Pipeline festgelegt.
  • bigtableChangeStreamIgnoreColumnFamilies: Eine durch Kommas getrennte Liste von Änderungen an den Namen der Spaltenfamilien, die ignoriert werden sollen. Die Standardeinstellung ist leer.
  • bigtableChangeStreamIgnoreColumns: Eine durch Kommas getrennte Liste von Änderungen an den Spaltennamen, die ignoriert werden sollen. Die Standardeinstellung ist leer.
  • bigtableChangeStreamName: Ein eindeutiger Name für die Client-Pipeline. Damit wird die Verarbeitung ab dem Punkt fortgesetzt, an dem eine zuvor ausgeführte Pipeline beendet wurde. Standardmäßig wird ein automatisch generierter Name verwendet. Den verwendeten Wert finden Sie in den Dataflow-Joblogs.
  • bigtableChangeStreamResume: Wenn dieser Wert auf true gesetzt ist, setzt eine neue Pipeline die Verarbeitung ab dem Punkt fort, an dem eine zuvor ausgeführte Pipeline mit demselben bigtableChangeStreamName-Wert gestoppt wurde. Wenn die Pipeline mit dem angegebenen bigtableChangeStreamName-Wert noch nie ausgeführt wurde, wird keine neue Pipeline gestartet. Wenn dieser Wert auf false gesetzt ist, wird eine neue Pipeline gestartet. Wenn eine Pipeline mit demselben bigtableChangeStreamName-Wert bereits für die angegebene Quelle ausgeführt wurde, wird keine neue Pipeline gestartet. Die Standardeinstellung ist false.
  • bigtableReadProjectId: Die Bigtable-Projekt-ID. Der Standardwert ist das Projekt für den Dataflow-Job.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bigtable change streams to Pub/Sub templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • BIGTABLE_INSTANCE_ID: Ihre Bigtable-Instanz-ID.
  • BIGTABLE_TABLE_ID: Ihre Bigtable-Tabellen-ID.
  • BIGTABLE_APPLICATION_PROFILE_ID: Ihre Bigtable-Anwendungsprofil-ID.
  • PUBSUB_TOPIC: der Name des Pub/Sub-Zielthemas

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • BIGTABLE_INSTANCE_ID: Ihre Bigtable-Instanz-ID.
  • BIGTABLE_TABLE_ID: Ihre Bigtable-Tabellen-ID.
  • BIGTABLE_APPLICATION_PROFILE_ID: Ihre Bigtable-Anwendungsprofil-ID.
  • PUBSUB_TOPIC: der Name des Pub/Sub-Zielthemas
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub;

import com.google.cloud.Timestamp;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation.MutationType;
import com.google.cloud.bigtable.data.v2.models.DeleteCells;
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions;
import com.google.cloud.teleport.v2.bigtable.utils.UnsupportedEntryException;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamsToPubSubOptions;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.FailsafePublisher.PublishModJsonToTopic;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.BigtableSource;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.MessageEncoding;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.MessageFormat;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.Mod;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.ModType;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.PubSubDestination;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.TestChangeStreamMutation;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.schemautils.PubSubUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.GetTopicRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.ValidateMessageRequest;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests {@link ChangeStreamMutation} from Bigtable change stream. The {@link
 * ChangeStreamMutation} is then broken into {@link Mod}, which converted into PubsubMessage and
 * inserted into Pub/Sub topic.
 */
@Template(
    name = "Bigtable_Change_Streams_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Bigtable Change Streams to PubSub",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into PubSub using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamsToPubSubOptions.class,
    optionsOrder = {
      BigtableChangeStreamsToPubSubOptions.class,
      BigtableCommonOptions.ReadChangeStreamOptions.class,
      BigtableCommonOptions.ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-bigtable-change-streams-to-pubsub",
    flexContainerName = "bigtable-changestreams-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    streaming = true,
    supportsAtLeastOnce = true)
public final class BigtableChangeStreamsToPubSub {

  /** String/String Coder for {@link FailsafeElement}. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  private static final Logger LOG = LoggerFactory.getLogger(BigtableChangeStreamsToPubSub.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    LOG.info("Starting to replicate change records from Cloud Bigtable change streams to PubSub");

    BigtableChangeStreamsToPubSubOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamsToPubSubOptions.class);

    run(options);
  }

  private static void validateOptions(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getDlqRetryMinutes() <= 0) {
      throw new IllegalArgumentException("dlqRetryMinutes must be positive.");
    }
    if (options.getDlqMaxRetries() < 0) {
      throw new IllegalArgumentException("dlqMaxRetries cannot be negative.");
    }
  }

  private static void setOptions(BigtableChangeStreamsToPubSubOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(BigtableChangeStreamsToPubSubOptions options) {
    setOptions(options);
    validateOptions(options);

    String bigtableProject = getBigtableProjectId(options);

    // Retrieve and parse the startTimestamp
    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    BigtableSource sourceInfo =
        new BigtableSource(
            options.getBigtableReadInstanceId(),
            options.getBigtableReadTableId(),
            getBigtableCharset(options),
            options.getBigtableChangeStreamIgnoreColumnFamilies(),
            options.getBigtableChangeStreamIgnoreColumns(),
            startTimestamp);

    Topic topic = null;
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      GetTopicRequest request =
          GetTopicRequest.newBuilder()
              .setTopic(
                  TopicName.ofProjectTopicName(
                          getPubSubProjectId(options), options.getPubSubTopic())
                      .toString())
              .build();
      topic = topicAdminClient.getTopic(request);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }

    try {
      if (!validateSchema(topic, options, sourceInfo)) {
        final String errorMessage = "Configured topic doesn't accept messages of configured format";
        throw new IllegalArgumentException(errorMessage);
      }

    } catch (Exception e) {
      throw new IllegalArgumentException(e);
    }

    PubSubDestination destinationInfo = newPubSubDestination(options, topic);
    PubSubUtils pubSub = new PubSubUtils(sourceInfo, destinationInfo);

    /*
     * Stages: 1) Read {@link ChangeStreamMutation} from change stream. 2) Create {@link
     * FailsafeElement} of {@link Mod} JSON and merge from: - {@link ChangeStreamMutation}. - GCS Dead
     * letter queue. 3) Convert {@link Mod} JSON into PubsubMessage and publish it to PubSub.
     * 4) Write Failures from 2) and 3) to GCS dead letter queue.
     */
    // Step 1
    Pipeline pipeline = Pipeline.create(options);

    // Register the coders for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
    String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";

    if (options.getDisableDlqRetries()) {
      LOG.info(
          "Disabling retries for the DLQ, directly writing into severe DLQ: {}",
          dlqManager.getSevereDlqDirectoryWithDateTime());
      dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
      tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
    }

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
                    : BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProject)
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withStartTime(startTimestamp);

    if (!StringUtils.isBlank(options.getBigtableChangeStreamMetadataTableTableId())) {
      readChangeStream =
          readChangeStream.withMetadataTableTableId(
              options.getBigtableChangeStreamMetadataTableTableId());
    }
    // Step 2: just return the output for sending to pubSub and DLQ
    PCollection<ChangeStreamMutation> dataChangeRecord =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply(Values.create());

    PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
        dataChangeRecord
            .apply(
                "ChangeStreamMutation To Mod JSON",
                ParDo.of(new ChangeStreamMutationToModJsonFn(sourceInfo)))
            .apply(
                "Wrap Mod JSON In FailsafeElement",
                ParDo.of(
                    new DoFn<String, FailsafeElement<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element String input,
                          OutputReceiver<FailsafeElement<String, String>> receiver) {
                        receiver.output(FailsafeElement.of(input, input));
                      }
                    }))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    PCollectionTuple dlqModJson =
        dlqManager.getReconsumerDataTransform(
            pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));

    PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson = null;
    if (options.getDisableDlqRetries()) {
      retryableDlqFailsafeModJson = pipeline.apply(Create.empty(FAILSAFE_ELEMENT_CODER));
    } else {
      retryableDlqFailsafeModJson =
          dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
    }

    PCollection<FailsafeElement<String, String>> failsafeModJson =
        PCollectionList.of(sourceFailsafeModJson)
            .and(retryableDlqFailsafeModJson)
            .apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());

    FailsafePublisher.FailsafeModJsonToPubsubMessageOptions failsafeModJsonToPubsubOptions =
        FailsafePublisher.FailsafeModJsonToPubsubMessageOptions.builder()
            .setCoder(FAILSAFE_ELEMENT_CODER)
            .build();

    PublishModJsonToTopic publishModJsonToTopic =
        new PublishModJsonToTopic(pubSub, failsafeModJsonToPubsubOptions);

    PCollection<FailsafeElement<String, String>> failedToPublish =
        failsafeModJson.apply("Publish Mod JSON To Pubsub", publishModJsonToTopic);

    PCollection<String> transformDlqJson =
        failedToPublish.apply(
            "Failed Mod JSON During Table Row Transformation",
            MapElements.via(new StringDeadLetterQueueSanitizer()));

    PCollectionList.of(transformDlqJson)
        .apply("Merge Failed Mod JSON From Transform And PubSub", Flatten.pCollections())
        .apply(
            "Write Failed Mod JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqDirectory)
                .withTmpDirectory(tempDlqDirectory)
                .setIncludePaneInfo(true)
                .build());

    PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
        dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
    LOG.info(
        "DLQ manager severe DLQ directory with date time: {}",
        dlqManager.getSevereDlqDirectoryWithDateTime());
    LOG.info("DLQ manager severe DLQ directory: {}", dlqManager.getSevereDlqDirectory() + "tmp/");
    nonRetryableDlqModJsonFailsafe
        .apply(
            "Write Mod JSON With Non-retriable Error To DLQ",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static PubSubDestination newPubSubDestination(
      BigtableChangeStreamsToPubSubOptions options, Topic topic) {
    return new PubSubDestination(
        getPubSubProjectId(options),
        options.getPubSubTopic(),
        topic,
        options.getMessageFormat(),
        options.getMessageEncoding(),
        options.getUseBase64Rowkeys(),
        options.getUseBase64ColumnQualifiers(),
        options.getUseBase64Values(),
        options.getStripValues());
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamsToPubSubOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDlqDirectory().isEmpty() ? tempLocation + "dlq/" : options.getDlqDirectory();

    LOG.info("DLQ directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetries());
  }

  private static String getBigtableCharset(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getBigtableChangeStreamCharset())
        ? "UTF-8"
        : options.getBigtableChangeStreamCharset();
  }

  private static String getBigtableProjectId(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static String getPubSubProjectId(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getPubSubProjectId())
        ? options.getProject()
        : options.getPubSubProjectId();
  }

  private static Boolean validateSchema(
      Topic topic, BigtableChangeStreamsToPubSubOptions options, BigtableSource source)
      throws Exception {
    String messageFormatPath = topic.getSchemaSettings().getSchema();
    if (topic.getSchemaSettings().getSchema().isEmpty()) {
      validateIncompatibleEncoding(options);
      LOG.info(
          "Topic has no schema configured, pipeline will use message format: {}, message encoding: {}",
          options.getMessageFormat(),
          options.getMessageEncoding());
      return true;
    } else {
      SchemaName schemaName = SchemaName.parse(topic.getSchemaSettings().getSchema());
      Schema schema;
      try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
        schema = schemaServiceClient.getSchema(schemaName);
      }

      options.setMessageEncoding(toMessageEncoding(topic.getSchemaSettings().getEncoding()));

      Schema.Type schemaType = schema.getType();
      switch (schemaType) {
        case AVRO:
          options.setMessageFormat(MessageFormat.AVRO);
          validateNoUseOfBase64(options);
          break;
        case PROTOCOL_BUFFER:
          if (options.getMessageEncoding() == MessageEncoding.JSON) {
            options.setMessageFormat(MessageFormat.JSON);
          } else {
            options.setMessageFormat(MessageFormat.PROTOCOL_BUFFERS);
            validateNoUseOfBase64(options);
          }
          break;
        case TYPE_UNSPECIFIED:
        case UNRECOGNIZED:
          // Not overriding messageFormat, will try what customer configured or the default if
          // not configured
          break;
        default:
          throw new IllegalArgumentException("Topic schema type is not supported: " + schemaType);
      }

      LOG.info("Topic has schema configured: {}", topic.getSchemaSettings().getSchema());
      LOG.info(
          "Pipeline will use message format: {}, message encoding: {}",
          options.getMessageFormat(),
          options.getMessageEncoding());

      PubSubDestination destination = newPubSubDestination(options, topic);
      PubSubUtils pubSub = new PubSubUtils(source, destination);

      ByteString testChangeMessageData = createTestChangeMessage(pubSub).getData();
      Encoding encoding = toPubSubEncoding(options.getMessageEncoding());
      try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
        String testMessageEncoded = toBase64String(testChangeMessageData);
        LOG.info("Validating a test message (Base64 encoded): {}", testMessageEncoded);
        ValidateMessageRequest request =
            ValidateMessageRequest.newBuilder()
                .setParent("projects/" + pubSub.getDestination().getPubSubProject())
                .setEncoding(encoding)
                .setMessage(testChangeMessageData)
                .setName(messageFormatPath)
                .build();
        schemaServiceClient.validateMessage(request);
        LOG.info("Test message successfully validated.");
      } catch (Exception e) {
        throw new IllegalArgumentException("Failed to validate test message", e);
      }
    }
    return true;
  }

  private static void validateNoUseOfBase64(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getUseBase64Values()) {
      throw new IllegalArgumentException(
          "useBase64Values values can only be used with topics accepting JSON messages");
    }
    if (options.getUseBase64Rowkeys()) {
      throw new IllegalArgumentException(
          "useBase64Rowkeys values can only be used with topics accepting JSON messages");
    }
    if (options.getUseBase64ColumnQualifiers()) {
      throw new IllegalArgumentException(
          "useBase64ColumnQualifiers values can only be used with topics accepting JSON messages");
    }
  }

  private static void validateIncompatibleEncoding(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getMessageEncoding() == MessageEncoding.BINARY
        && options.getMessageFormat() == MessageFormat.JSON) {
      throw new IllegalArgumentException(
          "JSON message format is incompatible with BINARY message encoding");
    }
  }

  private static MessageEncoding toMessageEncoding(Encoding encoding) {
    if (encoding == null) {
      return MessageEncoding.JSON;
    }
    switch (encoding) {
      case JSON:
      case ENCODING_UNSPECIFIED:
      case UNRECOGNIZED:
        return MessageEncoding.JSON;
      case BINARY:
        return MessageEncoding.BINARY;
      default:
        throw new IllegalArgumentException("Topic has unsupported message encoding: " + encoding);
    }
  }

  private static String toBase64String(ByteString testChangeMessageData) {
    return Base64.getEncoder().encodeToString(testChangeMessageData.toByteArray());
  }

  private static Encoding toPubSubEncoding(MessageEncoding messageEncoding) {
    switch (messageEncoding) {
      case BINARY:
        return Encoding.BINARY;
      case JSON:
        return Encoding.JSON;
      default:
        throw new IllegalArgumentException("Unexpected message encoding: " + messageEncoding);
    }
  }

  private static PubsubMessage createTestChangeMessage(PubSubUtils pubSub) throws Exception {
    SetCell setCell =
        SetCell.create(
            "test_column_family",
            ByteString.copyFrom("test_column", Charset.defaultCharset()),
            1000L, // timestamp
            ByteString.copyFrom("test_value", Charset.defaultCharset()));

    TestChangeStreamMutation mutation =
        new TestChangeStreamMutation(
            "test_rowkey",
            MutationType.USER,
            "source_cluster",
            org.threeten.bp.Instant.now(), // commit timestamp
            1, // tiebreaker
            "token",
            org.threeten.bp.Instant.now(), // low watermark
            setCell);

    Mod mod = new Mod(pubSub.getSource(), mutation, setCell);

    switch (pubSub.getDestination().getMessageFormat()) {
      case AVRO:
        return pubSub.mapChangeJsonStringToPubSubMessageAsAvro(mod.getChangeJson());
      case PROTOCOL_BUFFERS:
        return pubSub.mapChangeJsonStringToPubSubMessageAsProto(mod.getChangeJson());
      case JSON:
        return pubSub.mapChangeJsonStringToPubSubMessageAsJson(mod.getChangeJson());
      default:
        throw new IllegalArgumentException(
            "Unexpected message format: " + pubSub.getDestination().getMessageFormat());
    }
  }

  /**
   * DoFn that converts a {@link ChangeStreamMutation} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class ChangeStreamMutationToModJsonFn extends DoFn<ChangeStreamMutation, String> {

    private final BigtableSource sourceInfo;

    ChangeStreamMutationToModJsonFn(BigtableSource source) {
      this.sourceInfo = source;
    }

    @ProcessElement
    public void process(@Element ChangeStreamMutation input, OutputReceiver<String> receiver)
        throws Exception {
      for (Entry entry : input.getEntries()) {
        ModType modType = getModType(entry);

        Mod mod = null;
        switch (modType) {
          case SET_CELL:
            mod = new Mod(sourceInfo, input, (SetCell) entry);
            break;
          case DELETE_CELLS:
            mod = new Mod(sourceInfo, input, (DeleteCells) entry);
            break;
          case DELETE_FAMILY:
            mod = new Mod(sourceInfo, input, (DeleteFamily) entry);
            break;
          default:
          case UNKNOWN:
            throw new UnsupportedEntryException(
                "Cloud Bigtable change stream entry of type "
                    + entry.getClass().getName()
                    + " is not supported. The entry was put into a DLQ directory. "
                    + "Please update your Dataflow template with the latest template version");
        }

        String modJsonString;

        try {
          modJsonString = mod.toJson();
        } catch (IOException e) {
          // Ignore exception and print bad format.
          modJsonString = String.format("\"%s\"", input);
        }
        receiver.output(modJsonString);
      }
    }

    private ModType getModType(Entry entry) {
      if (entry instanceof SetCell) {
        return ModType.SET_CELL;
      } else if (entry instanceof DeleteCells) {
        return ModType.DELETE_CELLS;
      } else if (entry instanceof DeleteFamily) {
        return ModType.DELETE_FAMILY;
      }
      return ModType.UNKNOWN;
    }
  }
}

Nächste Schritte