Bigtable change streams to BigQuery テンプレート

Bigtable change streams to BigQuery テンプレートは、Dataflow を使用して Bigtable データ変更レコードをストリーミングし、BigQuery テーブルに書き込むストリーミング パイプラインです。

Bigtable 変更ストリームを使用すると、テーブルごとにデータ ミューテーションをサブスクライブできます。テーブル変更ストリームをサブスクライブすると、次の制約が適用されます。

  • 変更されたセルと削除オペレーションの記述子のみが返されます。
  • 変更されたセルの新しい値のみが返されます。

データ変更レコードが BigQuery に書き込まれたときに、元の Bigtable commit タイムスタンプの順序と比べると行が順不同で挿入される可能性があります。

永続的なエラーのために BigQuery に書き込めない変更履歴テーブルの行は、人間による確認またはユーザーによる追加の処理のため、Cloud Storage のデッドレター キュー(未処理メッセージ キュー)ディレクトリに永続的に配置されます。

必要な BigQuery テーブルが存在しない場合は、パイプラインによって作成されます。それ以外の場合は、既存の BigQuery テーブルが使用されます。既存の BigQuery テーブルのスキーマには、次の表の列が含まれている必要があります。

新しい BigQuery の各行には、変更ストリームによって Bigtable テーブルの対応行から返された 1 つのデータ変更レコードが含まれます。

BigQuery 出力テーブルのスキーマ

列名 タイプ null 可能性 説明
row_key STRING または BYTES いいえ 変更された行の行キー。writeRowkeyAsBytes パイプライン オプションが true に設定されている場合、列のタイプは BYTES にする必要があります。それ以外の場合は、STRING タイプを使用します。
mod_type STRING いいえ 行ミューテーションのタイプ。SET_CELLDELETE_CELLSDELETE_FAMILY のいずれかの値を使用します。
column_family STRING いいえ 行ミューテーションの影響を受ける列ファミリー。
column STRING はい 行ミューテーションの影響を受ける列修飾子。DELETE_FAMILY ミューテーション タイプの場合は、NULL に設定します。
commit_timestamp TIMESTAMP いいえ Bigtable がミューテーションを適用する時間。
big_query_commit_timestamp TIMESTAMP はい 省略可: BigQuery が行を出力テーブルに書き込む時間を指定します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
timestamp TIMESTAMP または INT64 はい ミューテーションの影響を受けるセルのタイムスタンプ値。writeNumericTimestamps パイプライン オプションが true に設定されている場合、列のタイプは INT64 にする必要があります。それ以外の場合は、TIMESTAMP タイプを使用します。ミューテーション タイプが DELETE_CELLS および DELETE_FAMILY の場合は、NULL に設定します。
timestamp_from TIMESTAMP または INT64 はい DELETE_CELLS ミューテーションによって削除されたすべてのセルのタイムスタンプ間隔の包括的な開始点を示します。他のミューテーション タイプの場合は、NULL に設定します。
timestamp_to TIMESTAMP または INT64 はい DELETE_CELLS ミューテーションによって削除されたすべてのセルのタイムスタンプ間隔の排他的終了点を示します。他のミューテーション タイプの場合は、NULL に設定します。
is_gc BOOL いいえ 省略可: ミューテーションがガベージ コレクション ポリシーによってトリガーされた場合は、true に設定します。それ以外の場合は false に設定します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
source_instance STRING いいえ (省略可)ミューテーションの取得元である Bigtable インスタンスの名前を記述します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
source_cluster STRING いいえ (省略可)ミューテーションの取得元である Bigtable クラスタの名前を記述します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
source_table STRING いいえ 省略可: ミューテーションが適用される Bigtable テーブルの名前を記述します。複数の Bigtable テーブルが同じ BigQuery テーブルに変更をストリーミングする場合、この列の値が役に立つことがあります。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
tiebreaker INT64 いいえ (省略可)異なる Bigtable クラスタによって 2 つのミューテーションが同時に登録された場合、tiebreaker 値が最も高いミューテーションがソーステーブルに適用されます。tiebreaker 値が小さいミューテーションは破棄されます。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
value STRING または BYTES はい ミューテーションによって設定された新しい値。writeValuesAsBytes パイプライン オプションが true に設定されている場合、列のタイプは BYTES にする必要があります。それ以外の場合は、STRING タイプを使用します。SET_CELL ミューテーションには値が設定されます。他のミューテーション タイプの場合、値は NULL に設定されます。

パイプラインの要件

  • 指定された Bigtable ソース インスタンス。
  • 指定された Bigtable ソーステーブル。テーブルで変更ストリームが有効になっている必要があります。
  • 指定された Bigtable アプリケーション プロファイル。
  • 指定された BigQuery の宛先データセット。

テンプレートのパラメータ

必須パラメータ

  • bigQueryDataset: 宛先 BigQuery テーブルのデータセット名
  • bigtableChangeStreamAppProfile: Bigtable アプリケーション プロファイル ID。アプリケーション プロファイルでは、単一クラスタ ルーティングを使用し、単一行のトランザクションを許可する必要があります。
  • bigtableReadInstanceId: ソース Bigtable インスタンス ID。
  • bigtableReadTableId: ソース Bigtable テーブル ID。

オプション パラメータ

  • writeRowkeyAsBytes: 行キーを BigQuery BYTES として書き込むかどうか。true に設定すると、行キーは BYTES 列に書き込まれます。それ以外の場合、行キーは STRING 列に書き込まれます。デフォルトは false です。
  • writeValuesAsBytes: true に設定すると、値が BYTES 列に書き込まれます。それ以外の場合は、STRING 列に書き込まれます。デフォルトは false です。
  • writeNumericTimestamps: Bigtable のタイムスタンプを BigQuery の INT64 として書き込むかどうか。true に設定すると、値が INT64 列に書き込まれます。それ以外の場合、値は TIMESTAMP 列に書き込まれます。影響を受ける列: timestamptimestamp_fromtimestamp_to。デフォルトは false です。true に設定すると、時間は Unix エポック(1970 年 1 月 1 日 UTC)を起点とするマイクロ秒単位で測定されます。
  • bigQueryProjectId: BigQuery データセット プロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。
  • bigQueryChangelogTableName: 宛先 BigQuery テーブルの名前。指定しない場合、値 bigtableReadTableId + "_changelog" が使用されます。デフォルトは空です。
  • bigQueryChangelogTablePartitionGranularity: 変更履歴テーブルのパーティショニングの粒度を指定します。設定すると、テーブルはパーティショニングされます。サポートされている値(HOURDAYMONTHYEAR)のいずれかを使用します。デフォルトでは、テーブルはパーティショニングされません。
  • bigQueryChangelogTablePartitionExpirationMs: 変更履歴テーブルのパーティションの有効期限をミリ秒単位で設定します。true に設定すると、指定したミリ秒数より古いパーティションが削除されます。デフォルトでは、有効期限は設定されていません。
  • bigQueryChangelogTableFieldsToIgnore: 指定した場合、作成されず入力されない変更履歴列のカンマ区切りのリスト。サポートされている値(is_gcsource_instancesource_clustersource_tabletiebreakerbig_query_commit_timestamp)のいずれかを使用します。デフォルトでは、すべての列にデータが入力されます。
  • dlqDirectory: デッドレター キューに使用するディレクトリ。処理に失敗したレコードは、このディレクトリに保存されます。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合、デフォルトのパスを使用できます。
  • bigtableChangeStreamMetadataInstanceId: Bigtable 変更ストリーム メタデータのインスタンス ID。デフォルトは空です。
  • bigtableChangeStreamMetadataTableTableId: Bigtable 変更ストリーム コネクタのメタデータ テーブル ID。指定しない場合、パイプライン実行中に Bigtable 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。デフォルトは空です。
  • bigtableChangeStreamCharset: Bigtable 変更ストリームの文字セット名。デフォルトは UTF-8 に設定されます。
  • bigtableChangeStreamStartTimestamp: 変更ストリームの読み取りに使用される開始タイムスタンプ(https://tools.ietf.org/html/rfc3339)。たとえば、2022-05-05T07:59:59Z のようにします。デフォルトは、パイプラインの開始時間のタイムスタンプです。
  • bigtableChangeStreamIgnoreColumnFamilies: 無視する列ファミリー名の変更のカンマ区切りのリスト。デフォルトは空です。
  • bigtableChangeStreamIgnoreColumns: 無視する列名の変更のカンマ区切りのリスト。デフォルトは空です。
  • bigtableChangeStreamName: クライアント パイプラインの一意の名前。実行中のパイプラインが停止した時点から処理を再開できます。デフォルトは自動生成された名前です。使用される値については、Dataflow ジョブのログをご覧ください。
  • bigtableChangeStreamResume: true に設定すると、同じ bigtableChangeStreamName 値で実行中のパイプラインが停止した時点から、新しいパイプラインが処理を再開します。指定された bigtableChangeStreamName 値を持つパイプラインが一度も実行されていない場合、新しいパイプラインは開始されません。false に設定すると、新しいパイプラインが開始されます。特定のソースに対して同じ bigtableChangeStreamName 値を持つパイプラインがすでに実行されている場合、新しいパイプラインは開始されません。デフォルトは false です。
  • bigtableReadProjectId: Bigtable プロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Bigtable change streams to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable アプリケーション プロファイル ID。
  • BIGQUERY_DESTINATION_DATASET: BigQuery の宛先データセット名

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable アプリケーション プロファイル ID。
  • BIGQUERY_DESTINATION_DATASET: BigQuery の宛先データセット名

次のステップ