Bigtable change streams to Pub/Sub テンプレート

Bigtable change streams to Pub/Sub テンプレートは、Dataflow を使用して Bigtable データ変更レコードをストリーミングし、Pub/Sub トピックにパブリッシュするストリーミング パイプラインです。

Bigtable 変更ストリームを使用すると、テーブルごとにデータ ミューテーションをサブスクライブできます。テーブル変更ストリームをサブスクライブすると、次の制約が適用されます。

  • 変更されたセルと削除オペレーションの記述子のみが返されます。
  • 変更されたセルの新しい値のみが返されます。

データ変更レコードが Pub/Sub トピックにパブリッシュされたときに、元の Bigtable commit タイムスタンプの順序と比べるとメッセージが順不同で挿入される可能性があります。

Pub/Sub トピックにパブリッシュできない Bigtable データ変更レコードは、Cloud Storage のデッドレター キュー(未処理メッセージ キュー)ディレクトリに一時的に配置されます。再試行の失敗回数が最大回数に達すると、これらのレコードは同じデッドレター キュー ディレクトリに無期限に配置されます。これは、人間による確認やユーザーの追加の処理に使用されます。

このパイプラインでは、宛先 Pub/Sub トピックが存在している必要があります。宛先トピックが、スキーマを使用してメッセージを検証するように構成されている場合があります。Pub/Sub トピックでスキーマを指定すると、スキーマが有効である場合にのみ、パイプラインが開始されます。スキーマタイプに応じて、宛先トピックに次のいずれかのスキーマ定義を使用します。

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  
{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON メッセージ エンコードで次の Protobuf スキーマを使用します。

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

新しい Pub/Sub メッセージには、変更ストリームによって Bigtable テーブルの対応行から返されたデータ変更レコードのエントリが 1 つ含まれます。Pub/Sub テンプレートは、各データ変更レコードのエントリをフラット化して、個々のセルレベルの変更に変換します。

Pub/Sub 出力メッセージの説明

フィールド名 説明
rowKey 変更された行の行キー。バイト配列の形式で返されます。JSON メッセージ エンコードが構成されている場合、行キーは文字列として返されます。useBase64Rowkeys が指定されている場合、行キーは Base64 でエンコードされます。それ以外の場合は、bigtableChangeStreamCharset で指定された文字セットを使用して、行キーのバイトが文字列にデコードされます。
modType 行ミューテーションのタイプ。SET_CELLDELETE_CELLSDELETE_FAMILY のいずれかの値を使用します。
columnFamily 行ミューテーションの影響を受ける列ファミリー。
column 行ミューテーションの影響を受ける列修飾子。DELETE_FAMILY ミューテーション タイプの場合、列フィールドは設定されません。バイト配列の形式で返されます。JSON メッセージ エンコードが構成されている場合、列は文字列として返されます。useBase64ColumnQualifier が指定された場合、列フィールドは Base64 でエンコードされます。それ以外の場合は、bigtableChangeStreamCharset で指定された文字セットを使用して、行キーのバイトが文字列にデコードされます。
commitTimestamp Bigtable がミューテーションを適用する時間。この時間は、Unix エポック(UTC 1970 年 1 月 1 日)からのマイクロ秒単位で測定されます。
timestamp ミューテーションの影響を受けるセルのタイムスタンプ値。DELETE_CELLS および DELETE_FAMILY ミューテーション タイプの場合、タイムスタンプは設定されません。この時間は、Unix エポック(UTC 1970 年 1 月 1 日)からのマイクロ秒単位で測定されます。
timestampFrom DELETE_CELLS ミューテーションによって削除されたすべてのセルのタイムスタンプ間隔の包括的な開始点を示します。他のミューテーション タイプの場合、timestampFrom は設定されません。この時間は、Unix エポック(UTC 1970 年 1 月 1 日)からのマイクロ秒単位で測定されます。
timestampTo DELETE_CELLS ミューテーションによって削除されたすべてのセルのタイムスタンプ間隔の排他的終了点を示します。他のミューテーション タイプの場合、timestampTo は設定されません。
isGC ミューテーションが Bigtable ガベージ コレクション メカニズムによって生成されたかどうかを示すブール値。
tieBreaker 異なる Bigtable クラスタによって 2 つのミューテーションが同時に登録された場合、tiebreaker 値が最も高いミューテーションがソーステーブルに適用されます。tiebreaker 値が小さいミューテーションは破棄されます。
value ミューテーションによって設定された新しい値。stripValues パイプライン オプションが設定されていない限り、SET_CELL ミューテーションに値が設定されます。他のミューテーション タイプの場合、値は設定されません。バイト配列の形式で返されます。JSON メッセージ エンコードが構成されている場合、値は文字列として返されます。useBase64Values が指定された場合、値は Base64 でエンコードされます。それ以外の場合は、bigtableChangeStreamCharset で指定された文字セットを使用して、値のバイトが文字列にデコードされます。
sourceInstance ミューテーションを登録した Bigtable インスタンスの名前。複数のパイプラインが異なるインスタンスから同じ Pub/Sub トピックに変更をストリーミングする場合があります。
sourceCluster ミューテーションを登録した Bigtable クラスタの名前。複数のパイプラインが異なるインスタンスから同じ Pub/Sub トピックに変更をストリーミングする場合に使用されます。
sourceTable ミューテーションを受け取った Bigtable テーブルの名前。複数のパイプラインが異なるテーブルから同じ Pub/Sub トピックに変更をストリーミングする場合に使用します。

パイプラインの要件

  • 指定された Bigtable ソース インスタンス。
  • 指定された Bigtable ソーステーブル。テーブルで変更ストリームが有効になっている必要があります。
  • 指定された Bigtable アプリケーション プロファイル。
  • 指定した Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

必須パラメータ

  • pubSubTopic: 宛先 Pub/Sub トピックの名前。
  • bigtableChangeStreamAppProfile: Bigtable アプリケーション プロファイル ID。アプリケーション プロファイルでは、単一クラスタ ルーティングを使用し、単一行のトランザクションを許可する必要があります。
  • bigtableReadInstanceId: ソース Bigtable インスタンス ID。
  • bigtableReadTableId: ソース Bigtable テーブル ID。

オプション パラメータ

  • messageEncoding: Pub/Sub トピックにパブリッシュされるメッセージのエンコード。宛先トピックのスキーマが構成されている場合、メッセージ エンコードはトピックの設定によって決まります。サポートされている値は BINARYJSON です。デフォルトは JSON です。
  • messageFormat: Pub/Sub トピックにパブリッシュするメッセージのエンコード。宛先トピックのスキーマが構成されている場合、メッセージ エンコードはトピックの設定によって決まります。サポートされている値は、AVROPROTOCOL_BUFFERSJSON です。デフォルト値は JSON です。JSON 形式が使用されている場合、メッセージの rowKey、column、value フィールドは文字列であり、その内容は useBase64RowkeysuseBase64ColumnQualifiersuseBase64ValuesbigtableChangeStreamCharset のパイプライン オプションによって決まります。
  • stripValues: true に設定すると、新しい値が設定されずに SET_CELL ミューテーションが返されます。デフォルトは false です。このパラメータは、新しい値が存在する必要がない場合(キャッシュの無効化とも呼ばれる)や、値が非常に大きく Pub/Sub メッセージのサイズ上限を超えている場合に便利です。
  • dlqDirectory: デッドレター キューのディレクトリ。処理に失敗したレコードは、このディレクトリに保存されます。デフォルトは、Dataflow ジョブの一時的な保存場所の下にあるディレクトリです。ほとんどの場合、デフォルトのパスを使用できます。
  • dlqRetryMinutes: デッドレター キューの再試行間隔(分)。デフォルトは 10 です。
  • dlqMaxRetries: デッドレターの最大再試行回数。デフォルトは 5 です。
  • useBase64Rowkeys: JSON メッセージのエンコードで使用されます。true に設定した場合、rowKey フィールドは Base64 でエンコードされた文字列です。それ以外の場合、rowKey は、bigtableChangeStreamCharset を使用してバイトを文字列にデコードすることで生成されます。デフォルトは false です。
  • pubSubProjectId: Bigtable プロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。
  • useBase64ColumnQualifiers: JSON メッセージのエンコードで使用されます。true に設定した場合、column フィールドは Base64 でエンコードされた文字列です。それ以外の場合、この列は bigtableChangeStreamCharset を使用してバイトを文字列にデコードすることによって生成されます。デフォルトは false です。
  • useBase64Values: JSON メッセージのエンコードで使用されます。true に設定した場合、value フィールドは Base64 でエンコードされた文字列です。それ以外の場合、bigtableChangeStreamCharset を使用してバイトを文字列にデコードすることで値が生成されます。デフォルトは false です。
  • disableDlqRetries: DLQ の再試行を無効にするかどうか。デフォルトは false です。
  • bigtableChangeStreamMetadataInstanceId: Bigtable 変更ストリーム メタデータのインスタンス ID。デフォルトは空です。
  • bigtableChangeStreamMetadataTableTableId: Bigtable 変更ストリーム コネクタのメタデータ テーブル ID。指定しない場合、パイプライン実行中に Bigtable 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。デフォルトは空です。
  • bigtableChangeStreamCharset: Bigtable 変更ストリームの文字セット名。デフォルトは UTF-8 です。
  • bigtableChangeStreamStartTimestamp: 変更ストリームの読み取りに使用される開始タイムスタンプ(https://tools.ietf.org/html/rfc3339)。例: 2022-05-05T07:59:59Z。デフォルトは、パイプラインの開始時間のタイムスタンプです。
  • bigtableChangeStreamIgnoreColumnFamilies: 無視する列ファミリー名の変更のカンマ区切りのリスト。デフォルトは空です。
  • bigtableChangeStreamIgnoreColumns: 無視する列名の変更のカンマ区切りのリスト。デフォルトは空です。
  • bigtableChangeStreamName: クライアント パイプラインの一意の名前。実行中のパイプラインが停止した時点から処理を再開できます。デフォルトは自動生成された名前です。使用される値については、Dataflow ジョブのログをご覧ください。
  • bigtableChangeStreamResume: true に設定すると、同じ bigtableChangeStreamName 値で実行中のパイプラインが停止した時点から、新しいパイプラインが処理を再開します。指定された bigtableChangeStreamName 値を持つパイプラインが一度も実行されていない場合、新しいパイプラインは開始されません。false に設定すると、新しいパイプラインが開始されます。特定のソースに対して同じ bigtableChangeStreamName 値を持つパイプラインがすでに実行されている場合、新しいパイプラインは開始されません。デフォルトは false です。
  • bigtableReadProjectId: Bigtable プロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Bigtable change streams to Pub/Sub template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable アプリケーション プロファイル ID。
  • PUBSUB_TOPIC: Pub/Sub の宛先トピック名

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable アプリケーション プロファイル ID。
  • PUBSUB_TOPIC: Pub/Sub の宛先トピック名
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub;

import com.google.cloud.Timestamp;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation;
import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation.MutationType;
import com.google.cloud.bigtable.data.v2.models.DeleteCells;
import com.google.cloud.bigtable.data.v2.models.DeleteFamily;
import com.google.cloud.bigtable.data.v2.models.Entry;
import com.google.cloud.bigtable.data.v2.models.SetCell;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions;
import com.google.cloud.teleport.v2.bigtable.utils.UnsupportedEntryException;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.options.BigtableChangeStreamsToPubSubOptions;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.FailsafePublisher.PublishModJsonToTopic;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.BigtableSource;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.MessageEncoding;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.MessageFormat;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.Mod;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.ModType;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.PubSubDestination;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.model.TestChangeStreamMutation;
import com.google.cloud.teleport.v2.templates.bigtablechangestreamstopubsub.schemautils.PubSubUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.GetTopicRequest;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.ValidateMessageRequest;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests {@link ChangeStreamMutation} from Bigtable change stream. The {@link
 * ChangeStreamMutation} is then broken into {@link Mod}, which converted into PubsubMessage and
 * inserted into Pub/Sub topic.
 */
@Template(
    name = "Bigtable_Change_Streams_to_PubSub",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Bigtable Change Streams to PubSub",
    description =
        "Streaming pipeline. Streams Bigtable data change records and writes them into PubSub using Dataflow Runner V2.",
    optionsClass = BigtableChangeStreamsToPubSubOptions.class,
    optionsOrder = {
      BigtableChangeStreamsToPubSubOptions.class,
      BigtableCommonOptions.ReadChangeStreamOptions.class,
      BigtableCommonOptions.ReadOptions.class
    },
    skipOptions = {
      "bigtableReadAppProfile",
      "bigtableAdditionalRetryCodes",
      "bigtableRpcAttemptTimeoutMs",
      "bigtableRpcTimeoutMs"
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-bigtable-change-streams-to-pubsub",
    flexContainerName = "bigtable-changestreams-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    streaming = true,
    supportsAtLeastOnce = true)
public final class BigtableChangeStreamsToPubSub {

  /** String/String Coder for {@link FailsafeElement}. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  private static final Logger LOG = LoggerFactory.getLogger(BigtableChangeStreamsToPubSub.class);

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    LOG.info("Starting to replicate change records from Cloud Bigtable change streams to PubSub");

    BigtableChangeStreamsToPubSubOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigtableChangeStreamsToPubSubOptions.class);

    run(options);
  }

  private static void validateOptions(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getDlqRetryMinutes() <= 0) {
      throw new IllegalArgumentException("dlqRetryMinutes must be positive.");
    }
    if (options.getDlqMaxRetries() < 0) {
      throw new IllegalArgumentException("dlqMaxRetries cannot be negative.");
    }
  }

  private static void setOptions(BigtableChangeStreamsToPubSubOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    boolean hasUseRunnerV2 = false;
    for (String experiment : experiments) {
      if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) {
        hasUseRunnerV2 = true;
        break;
      }
    }
    if (!hasUseRunnerV2) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(BigtableChangeStreamsToPubSubOptions options) {
    setOptions(options);
    validateOptions(options);

    String bigtableProject = getBigtableProjectId(options);

    // Retrieve and parse the startTimestamp
    Instant startTimestamp =
        options.getBigtableChangeStreamStartTimestamp().isEmpty()
            ? Instant.now()
            : toInstant(Timestamp.parseTimestamp(options.getBigtableChangeStreamStartTimestamp()));

    BigtableSource sourceInfo =
        new BigtableSource(
            options.getBigtableReadInstanceId(),
            options.getBigtableReadTableId(),
            getBigtableCharset(options),
            options.getBigtableChangeStreamIgnoreColumnFamilies(),
            options.getBigtableChangeStreamIgnoreColumns(),
            startTimestamp);

    Topic topic = null;
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      GetTopicRequest request =
          GetTopicRequest.newBuilder()
              .setTopic(
                  TopicName.ofProjectTopicName(
                          getPubSubProjectId(options), options.getPubSubTopic())
                      .toString())
              .build();
      topic = topicAdminClient.getTopic(request);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }

    try {
      if (!validateSchema(topic, options, sourceInfo)) {
        final String errorMessage = "Configured topic doesn't accept messages of configured format";
        throw new IllegalArgumentException(errorMessage);
      }

    } catch (Exception e) {
      throw new IllegalArgumentException(e);
    }

    PubSubDestination destinationInfo = newPubSubDestination(options, topic);
    PubSubUtils pubSub = new PubSubUtils(sourceInfo, destinationInfo);

    /*
     * Stages: 1) Read {@link ChangeStreamMutation} from change stream. 2) Create {@link
     * FailsafeElement} of {@link Mod} JSON and merge from: - {@link ChangeStreamMutation}. - GCS Dead
     * letter queue. 3) Convert {@link Mod} JSON into PubsubMessage and publish it to PubSub.
     * 4) Write Failures from 2) and 3) to GCS dead letter queue.
     */
    // Step 1
    Pipeline pipeline = Pipeline.create(options);

    // Register the coders for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    DeadLetterQueueManager dlqManager = buildDlqManager(options);

    String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
    String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";

    if (options.getDisableDlqRetries()) {
      LOG.info(
          "Disabling retries for the DLQ, directly writing into severe DLQ: {}",
          dlqManager.getSevereDlqDirectoryWithDateTime());
      dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
      tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
    }

    BigtableIO.ReadChangeStream readChangeStream =
        BigtableIO.readChangeStream()
            .withChangeStreamName(options.getBigtableChangeStreamName())
            .withExistingPipelineOptions(
                options.getBigtableChangeStreamResume()
                    ? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL
                    : BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS)
            .withProjectId(bigtableProject)
            .withMetadataTableInstanceId(options.getBigtableChangeStreamMetadataInstanceId())
            .withInstanceId(options.getBigtableReadInstanceId())
            .withTableId(options.getBigtableReadTableId())
            .withAppProfileId(options.getBigtableChangeStreamAppProfile())
            .withStartTime(startTimestamp);

    if (!StringUtils.isBlank(options.getBigtableChangeStreamMetadataTableTableId())) {
      readChangeStream =
          readChangeStream.withMetadataTableTableId(
              options.getBigtableChangeStreamMetadataTableTableId());
    }
    // Step 2: just return the output for sending to pubSub and DLQ
    PCollection<ChangeStreamMutation> dataChangeRecord =
        pipeline
            .apply("Read from Cloud Bigtable Change Streams", readChangeStream)
            .apply(Values.create());

    PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
        dataChangeRecord
            .apply(
                "ChangeStreamMutation To Mod JSON",
                ParDo.of(new ChangeStreamMutationToModJsonFn(sourceInfo)))
            .apply(
                "Wrap Mod JSON In FailsafeElement",
                ParDo.of(
                    new DoFn<String, FailsafeElement<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element String input,
                          OutputReceiver<FailsafeElement<String, String>> receiver) {
                        receiver.output(FailsafeElement.of(input, input));
                      }
                    }))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    PCollectionTuple dlqModJson =
        dlqManager.getReconsumerDataTransform(
            pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));

    PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson = null;
    if (options.getDisableDlqRetries()) {
      retryableDlqFailsafeModJson = pipeline.apply(Create.empty(FAILSAFE_ELEMENT_CODER));
    } else {
      retryableDlqFailsafeModJson =
          dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
    }

    PCollection<FailsafeElement<String, String>> failsafeModJson =
        PCollectionList.of(sourceFailsafeModJson)
            .and(retryableDlqFailsafeModJson)
            .apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());

    FailsafePublisher.FailsafeModJsonToPubsubMessageOptions failsafeModJsonToPubsubOptions =
        FailsafePublisher.FailsafeModJsonToPubsubMessageOptions.builder()
            .setCoder(FAILSAFE_ELEMENT_CODER)
            .build();

    PublishModJsonToTopic publishModJsonToTopic =
        new PublishModJsonToTopic(pubSub, failsafeModJsonToPubsubOptions);

    PCollection<FailsafeElement<String, String>> failedToPublish =
        failsafeModJson.apply("Publish Mod JSON To Pubsub", publishModJsonToTopic);

    PCollection<String> transformDlqJson =
        failedToPublish.apply(
            "Failed Mod JSON During Table Row Transformation",
            MapElements.via(new StringDeadLetterQueueSanitizer()));

    PCollectionList.of(transformDlqJson)
        .apply("Merge Failed Mod JSON From Transform And PubSub", Flatten.pCollections())
        .apply(
            "Write Failed Mod JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqDirectory)
                .withTmpDirectory(tempDlqDirectory)
                .setIncludePaneInfo(true)
                .build());

    PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
        dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);
    LOG.info(
        "DLQ manager severe DLQ directory with date time: {}",
        dlqManager.getSevereDlqDirectoryWithDateTime());
    LOG.info("DLQ manager severe DLQ directory: {}", dlqManager.getSevereDlqDirectory() + "tmp/");
    nonRetryableDlqModJsonFailsafe
        .apply(
            "Write Mod JSON With Non-retriable Error To DLQ",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static PubSubDestination newPubSubDestination(
      BigtableChangeStreamsToPubSubOptions options, Topic topic) {
    return new PubSubDestination(
        getPubSubProjectId(options),
        options.getPubSubTopic(),
        topic,
        options.getMessageFormat(),
        options.getMessageEncoding(),
        options.getUseBase64Rowkeys(),
        options.getUseBase64ColumnQualifiers(),
        options.getUseBase64Values(),
        options.getStripValues());
  }

  private static Instant toInstant(Timestamp timestamp) {
    if (timestamp == null) {
      return null;
    } else {
      return Instant.ofEpochMilli(timestamp.getSeconds() * 1000 + timestamp.getNanos() / 1000000);
    }
  }

  private static DeadLetterQueueManager buildDlqManager(
      BigtableChangeStreamsToPubSubOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDlqDirectory().isEmpty() ? tempLocation + "dlq/" : options.getDlqDirectory();

    LOG.info("DLQ directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetries());
  }

  private static String getBigtableCharset(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getBigtableChangeStreamCharset())
        ? "UTF-8"
        : options.getBigtableChangeStreamCharset();
  }

  private static String getBigtableProjectId(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getBigtableReadProjectId())
        ? options.getProject()
        : options.getBigtableReadProjectId();
  }

  private static String getPubSubProjectId(BigtableChangeStreamsToPubSubOptions options) {
    return StringUtils.isEmpty(options.getPubSubProjectId())
        ? options.getProject()
        : options.getPubSubProjectId();
  }

  private static Boolean validateSchema(
      Topic topic, BigtableChangeStreamsToPubSubOptions options, BigtableSource source)
      throws Exception {
    String messageFormatPath = topic.getSchemaSettings().getSchema();
    if (topic.getSchemaSettings().getSchema().isEmpty()) {
      validateIncompatibleEncoding(options);
      LOG.info(
          "Topic has no schema configured, pipeline will use message format: {}, message encoding: {}",
          options.getMessageFormat(),
          options.getMessageEncoding());
      return true;
    } else {
      SchemaName schemaName = SchemaName.parse(topic.getSchemaSettings().getSchema());
      Schema schema;
      try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
        schema = schemaServiceClient.getSchema(schemaName);
      }

      options.setMessageEncoding(toMessageEncoding(topic.getSchemaSettings().getEncoding()));

      Schema.Type schemaType = schema.getType();
      switch (schemaType) {
        case AVRO:
          options.setMessageFormat(MessageFormat.AVRO);
          validateNoUseOfBase64(options);
          break;
        case PROTOCOL_BUFFER:
          if (options.getMessageEncoding() == MessageEncoding.JSON) {
            options.setMessageFormat(MessageFormat.JSON);
          } else {
            options.setMessageFormat(MessageFormat.PROTOCOL_BUFFERS);
            validateNoUseOfBase64(options);
          }
          break;
        case TYPE_UNSPECIFIED:
        case UNRECOGNIZED:
          // Not overriding messageFormat, will try what customer configured or the default if
          // not configured
          break;
        default:
          throw new IllegalArgumentException("Topic schema type is not supported: " + schemaType);
      }

      LOG.info("Topic has schema configured: {}", topic.getSchemaSettings().getSchema());
      LOG.info(
          "Pipeline will use message format: {}, message encoding: {}",
          options.getMessageFormat(),
          options.getMessageEncoding());

      PubSubDestination destination = newPubSubDestination(options, topic);
      PubSubUtils pubSub = new PubSubUtils(source, destination);

      ByteString testChangeMessageData = createTestChangeMessage(pubSub).getData();
      Encoding encoding = toPubSubEncoding(options.getMessageEncoding());
      try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {
        String testMessageEncoded = toBase64String(testChangeMessageData);
        LOG.info("Validating a test message (Base64 encoded): {}", testMessageEncoded);
        ValidateMessageRequest request =
            ValidateMessageRequest.newBuilder()
                .setParent("projects/" + pubSub.getDestination().getPubSubProject())
                .setEncoding(encoding)
                .setMessage(testChangeMessageData)
                .setName(messageFormatPath)
                .build();
        schemaServiceClient.validateMessage(request);
        LOG.info("Test message successfully validated.");
      } catch (Exception e) {
        throw new IllegalArgumentException("Failed to validate test message", e);
      }
    }
    return true;
  }

  private static void validateNoUseOfBase64(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getUseBase64Values()) {
      throw new IllegalArgumentException(
          "useBase64Values values can only be used with topics accepting JSON messages");
    }
    if (options.getUseBase64Rowkeys()) {
      throw new IllegalArgumentException(
          "useBase64Rowkeys values can only be used with topics accepting JSON messages");
    }
    if (options.getUseBase64ColumnQualifiers()) {
      throw new IllegalArgumentException(
          "useBase64ColumnQualifiers values can only be used with topics accepting JSON messages");
    }
  }

  private static void validateIncompatibleEncoding(BigtableChangeStreamsToPubSubOptions options) {
    if (options.getMessageEncoding() == MessageEncoding.BINARY
        && options.getMessageFormat() == MessageFormat.JSON) {
      throw new IllegalArgumentException(
          "JSON message format is incompatible with BINARY message encoding");
    }
  }

  private static MessageEncoding toMessageEncoding(Encoding encoding) {
    if (encoding == null) {
      return MessageEncoding.JSON;
    }
    switch (encoding) {
      case JSON:
      case ENCODING_UNSPECIFIED:
      case UNRECOGNIZED:
        return MessageEncoding.JSON;
      case BINARY:
        return MessageEncoding.BINARY;
      default:
        throw new IllegalArgumentException("Topic has unsupported message encoding: " + encoding);
    }
  }

  private static String toBase64String(ByteString testChangeMessageData) {
    return Base64.getEncoder().encodeToString(testChangeMessageData.toByteArray());
  }

  private static Encoding toPubSubEncoding(MessageEncoding messageEncoding) {
    switch (messageEncoding) {
      case BINARY:
        return Encoding.BINARY;
      case JSON:
        return Encoding.JSON;
      default:
        throw new IllegalArgumentException("Unexpected message encoding: " + messageEncoding);
    }
  }

  private static PubsubMessage createTestChangeMessage(PubSubUtils pubSub) throws Exception {
    SetCell setCell =
        SetCell.create(
            "test_column_family",
            ByteString.copyFrom("test_column", Charset.defaultCharset()),
            1000L, // timestamp
            ByteString.copyFrom("test_value", Charset.defaultCharset()));

    TestChangeStreamMutation mutation =
        new TestChangeStreamMutation(
            "test_rowkey",
            MutationType.USER,
            "source_cluster",
            org.threeten.bp.Instant.now(), // commit timestamp
            1, // tiebreaker
            "token",
            org.threeten.bp.Instant.now(), // low watermark
            setCell);

    Mod mod = new Mod(pubSub.getSource(), mutation, setCell);

    switch (pubSub.getDestination().getMessageFormat()) {
      case AVRO:
        return pubSub.mapChangeJsonStringToPubSubMessageAsAvro(mod.getChangeJson());
      case PROTOCOL_BUFFERS:
        return pubSub.mapChangeJsonStringToPubSubMessageAsProto(mod.getChangeJson());
      case JSON:
        return pubSub.mapChangeJsonStringToPubSubMessageAsJson(mod.getChangeJson());
      default:
        throw new IllegalArgumentException(
            "Unexpected message format: " + pubSub.getDestination().getMessageFormat());
    }
  }

  /**
   * DoFn that converts a {@link ChangeStreamMutation} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class ChangeStreamMutationToModJsonFn extends DoFn<ChangeStreamMutation, String> {

    private final BigtableSource sourceInfo;

    ChangeStreamMutationToModJsonFn(BigtableSource source) {
      this.sourceInfo = source;
    }

    @ProcessElement
    public void process(@Element ChangeStreamMutation input, OutputReceiver<String> receiver)
        throws Exception {
      for (Entry entry : input.getEntries()) {
        ModType modType = getModType(entry);

        Mod mod = null;
        switch (modType) {
          case SET_CELL:
            mod = new Mod(sourceInfo, input, (SetCell) entry);
            break;
          case DELETE_CELLS:
            mod = new Mod(sourceInfo, input, (DeleteCells) entry);
            break;
          case DELETE_FAMILY:
            mod = new Mod(sourceInfo, input, (DeleteFamily) entry);
            break;
          default:
          case UNKNOWN:
            throw new UnsupportedEntryException(
                "Cloud Bigtable change stream entry of type "
                    + entry.getClass().getName()
                    + " is not supported. The entry was put into a DLQ directory. "
                    + "Please update your Dataflow template with the latest template version");
        }

        String modJsonString;

        try {
          modJsonString = mod.toJson();
        } catch (IOException e) {
          // Ignore exception and print bad format.
          modJsonString = String.format("\"%s\"", input);
        }
        receiver.output(modJsonString);
      }
    }

    private ModType getModType(Entry entry) {
      if (entry instanceof SetCell) {
        return ModType.SET_CELL;
      } else if (entry instanceof DeleteCells) {
        return ModType.DELETE_CELLS;
      } else if (entry instanceof DeleteFamily) {
        return ModType.DELETE_FAMILY;
      }
      return ModType.UNKNOWN;
    }
  }
}

次のステップ