Bigtable change streams to Pub/Sub テンプレート

Bigtable change streams to Pub/Sub テンプレートは、Dataflow を使用して Bigtable データ変更レコードをストリーミングし、Pub/Sub トピックにパブリッシュするストリーミング パイプラインです。

Bigtable 変更ストリームを使用すると、テーブルごとにデータ ミューテーションをサブスクライブできます。テーブル変更ストリームをサブスクライブすると、次の制約が適用されます。

  • 変更されたセルと削除オペレーションの記述子のみが返されます。
  • 変更されたセルの新しい値のみが返されます。

データ変更レコードが Pub/Sub トピックにパブリッシュされたときに、元の Bigtable commit タイムスタンプの順序と比べるとメッセージが順不同で挿入される可能性があります。

Pub/Sub トピックにパブリッシュできない Bigtable データ変更レコードは、Cloud Storage のデッドレター キュー(未処理メッセージ キュー)ディレクトリに一時的に配置されます。再試行の失敗回数が最大回数に達すると、これらのレコードは同じデッドレター キュー ディレクトリに無期限に配置されます。これは、人間による確認やユーザーの追加の処理に使用されます。

このパイプラインでは、宛先 Pub/Sub トピックが存在している必要があります。宛先トピックが、スキーマを使用してメッセージを検証するように構成されている場合があります。Pub/Sub トピックでスキーマを指定すると、スキーマが有効である場合にのみ、パイプラインが開始されます。スキーマタイプに応じて、宛先トピックに次のいずれかのスキーマ定義を使用します。

プロトコル バッファ

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

JSON メッセージ エンコードで次の Protobuf スキーマを使用します。

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

新しい Pub/Sub メッセージには、変更ストリームによって Bigtable テーブルの対応行から返されたデータ変更レコードのエントリが 1 つ含まれます。Pub/Sub テンプレートは、各データ変更レコードのエントリをフラット化して、個々のセルレベルの変更に変換します。

Pub/Sub 出力メッセージの説明

フィールド名 説明
rowKey 変更された行の行キー。バイト配列の形式で返されます。JSON メッセージ エンコードが構成されている場合、行キーは文字列として返されます。useBase64Rowkeys が指定されている場合、行キーは Base64 でエンコードされます。それ以外の場合は、bigtableChangeStreamCharset で指定された文字セットを使用して、行キーのバイトが文字列にデコードされます。
modType 行ミューテーションのタイプ。SET_CELLDELETE_CELLSDELETE_FAMILY のいずれかの値を使用します。
columnFamily 行ミューテーションの影響を受ける列ファミリー。
column 行ミューテーションの影響を受ける列修飾子。DELETE_FAMILY ミューテーション タイプの場合、列フィールドは設定されません。バイト配列の形式で返されます。JSON メッセージ エンコードが構成されている場合、列は文字列として返されます。useBase64ColumnQualifier が指定された場合、列フィールドは Base64 でエンコードされます。それ以外の場合は、bigtableChangeStreamCharset で指定された文字セットを使用して、行キーのバイトが文字列にデコードされます。
commitTimestamp Bigtable がミューテーションを適用する時間。この時間は、Unix エポック(UTC 1970 年 1 月 1 日)からのマイクロ秒単位で測定されます。
timestamp ミューテーションの影響を受けるセルのタイムスタンプ値。DELETE_CELLS および DELETE_FAMILY ミューテーション タイプの場合、タイムスタンプは設定されません。この時間は、Unix エポック(UTC 1970 年 1 月 1 日)からのマイクロ秒単位で測定されます。
timestampFrom DELETE_CELLS ミューテーションによって削除されたすべてのセルのタイムスタンプ間隔の包括的な開始点を示します。他のミューテーション タイプの場合、timestampFrom は設定されません。この時間は、Unix エポック(UTC 1970 年 1 月 1 日)からのマイクロ秒単位で測定されます。
timestampTo DELETE_CELLS ミューテーションによって削除されたすべてのセルのタイムスタンプ間隔の排他的終了点を示します。他のミューテーション タイプの場合、timestampTo は設定されません。
isGC ミューテーションが Bigtable ガベージ コレクション メカニズムによって生成されたかどうかを示すブール値。
tieBreaker 異なる Bigtable クラスタによって 2 つのミューテーションが同時に登録された場合、tiebreaker 値が最も高いミューテーションがソーステーブルに適用されます。tiebreaker 値が小さいミューテーションは破棄されます。
value ミューテーションによって設定された新しい値。stripValues パイプライン オプションが設定されていない限り、SET_CELL ミューテーションに値が設定されます。他のミューテーション タイプの場合、値は設定されません。バイト配列の形式で返されます。JSON メッセージ エンコードが構成されている場合、値は文字列として返されます。useBase64Values が指定された場合、値は Base64 でエンコードされます。それ以外の場合は、bigtableChangeStreamCharset で指定された文字セットを使用して、値のバイトが文字列にデコードされます。
sourceInstance ミューテーションを登録した Bigtable インスタンスの名前。複数のパイプラインが異なるインスタンスから同じ Pub/Sub トピックに変更をストリーミングする場合があります。
sourceCluster ミューテーションを登録した Bigtable クラスタの名前。複数のパイプラインが異なるインスタンスから同じ Pub/Sub トピックに変更をストリーミングする場合に使用されます。
sourceTable ミューテーションを受け取った Bigtable テーブルの名前。複数のパイプラインが異なるテーブルから同じ Pub/Sub トピックに変更をストリーミングする場合に使用します。

パイプラインの要件

  • 指定された Bigtable ソース インスタンス。
  • 指定された Bigtable ソーステーブル。テーブルで変更ストリームが有効になっている必要があります。
  • 指定された Bigtable アプリケーション プロファイル。
  • 指定した Pub/Sub トピックが存在している必要があります。

テンプレートのパラメータ

必須パラメータ

  • pubSubTopic: 宛先 Pub/Sub トピックの名前。
  • bigtableChangeStreamAppProfile: Bigtable アプリケーション プロファイル ID。アプリケーション プロファイルでは、単一クラスタ ルーティングを使用し、単一行のトランザクションを許可する必要があります。
  • bigtableReadInstanceId: ソース Bigtable インスタンス ID。
  • bigtableReadTableId: ソース Bigtable テーブル ID。

オプション パラメータ

  • messageEncoding: Pub/Sub トピックにパブリッシュされるメッセージのエンコード。宛先トピックのスキーマが構成されている場合、メッセージ エンコードはトピックの設定によって決まります。サポートされている値は BINARYJSON です。デフォルトは JSON です。
  • messageFormat: Pub/Sub トピックにパブリッシュするメッセージのエンコード。宛先トピックのスキーマが構成されている場合、メッセージ エンコードはトピックの設定によって決まります。サポートされている値は、AVROPROTOCOL_BUFFERSJSON です。デフォルト値は JSON です。JSON 形式が使用されている場合、メッセージの rowKey、column、value フィールドは文字列であり、その内容は useBase64RowkeysuseBase64ColumnQualifiersuseBase64ValuesbigtableChangeStreamCharset のパイプライン オプションによって決まります。
  • stripValues: true に設定すると、新しい値が設定されずに SET_CELL ミューテーションが返されます。デフォルトは false です。このパラメータは、新しい値が存在する必要がない場合(キャッシュの無効化とも呼ばれる)や、値が非常に大きく Pub/Sub メッセージのサイズ上限を超えている場合に便利です。
  • dlqDirectory: デッドレター キューのディレクトリ。処理に失敗したレコードは、このディレクトリに保存されます。デフォルトは、Dataflow ジョブの一時的な保存場所の下にあるディレクトリです。ほとんどの場合、デフォルトのパスを使用できます。
  • dlqRetryMinutes: デッドレター キューの再試行間隔(分)。デフォルトは 10 です。
  • dlqMaxRetries: デッドレターの最大再試行回数。デフォルトは 5 です。
  • useBase64Rowkeys: JSON メッセージのエンコードで使用されます。true に設定した場合、rowKey フィールドは Base64 でエンコードされた文字列です。それ以外の場合、rowKey は、bigtableChangeStreamCharset を使用してバイトを文字列にデコードすることで生成されます。デフォルトは false です。
  • pubSubProjectId: Bigtable プロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。
  • useBase64ColumnQualifiers: JSON メッセージのエンコードで使用されます。true に設定した場合、column フィールドは Base64 でエンコードされた文字列です。それ以外の場合、この列は bigtableChangeStreamCharset を使用してバイトを文字列にデコードすることによって生成されます。デフォルトは false です。
  • useBase64Values: JSON メッセージのエンコードで使用されます。true に設定した場合、value フィールドは Base64 でエンコードされた文字列です。それ以外の場合、bigtableChangeStreamCharset を使用してバイトを文字列にデコードすることで値が生成されます。デフォルトは false です。
  • disableDlqRetries: DLQ の再試行を無効にするかどうか。デフォルトは false です。
  • bigtableChangeStreamMetadataInstanceId: Bigtable 変更ストリーム メタデータのインスタンス ID。デフォルトは空です。
  • bigtableChangeStreamMetadataTableTableId: Bigtable 変更ストリーム コネクタのメタデータ テーブル ID。指定しない場合、パイプライン実行中に Bigtable 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。デフォルトは空です。
  • bigtableChangeStreamCharset: Bigtable 変更ストリームの文字セット名。デフォルトは UTF-8 に設定されます。
  • bigtableChangeStreamStartTimestamp: 変更ストリームの読み取りに使用される開始タイムスタンプ(https://tools.ietf.org/html/rfc3339)。たとえば、2022-05-05T07:59:59Z のようにします。デフォルトは、パイプラインの開始時間のタイムスタンプです。
  • bigtableChangeStreamIgnoreColumnFamilies: 無視する列ファミリー名の変更のカンマ区切りのリスト。デフォルトは空です。
  • bigtableChangeStreamIgnoreColumns: 無視する列名の変更のカンマ区切りのリスト。デフォルトは空です。
  • bigtableChangeStreamName: クライアント パイプラインの一意の名前。実行中のパイプラインが停止した時点から処理を再開できます。デフォルトは自動生成された名前です。使用される値については、Dataflow ジョブのログをご覧ください。
  • bigtableChangeStreamResume: true に設定すると、同じ bigtableChangeStreamName 値で実行中のパイプラインが停止した時点から、新しいパイプラインが処理を再開します。指定された bigtableChangeStreamName 値を持つパイプラインが一度も実行されていない場合、新しいパイプラインは開始されません。false に設定すると、新しいパイプラインが開始されます。特定のソースに対して同じ bigtableChangeStreamName 値を持つパイプラインがすでに実行されている場合、新しいパイプラインは開始されません。デフォルトは false です。
  • bigtableReadProjectId: Bigtable プロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Bigtable change streams to Pub/Sub template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable アプリケーション プロファイル ID。
  • PUBSUB_TOPIC: Pub/Sub の宛先トピック名

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable アプリケーション プロファイル ID。
  • PUBSUB_TOPIC: Pub/Sub の宛先トピック名

次のステップ