Bigtable change streams to BigQuery テンプレート

Bigtable change streams to BigQuery テンプレートは、Dataflow を使用して Bigtable データ変更レコードをストリーミングし、BigQuery テーブルに書き込むストリーミング パイプラインです。

Bigtable 変更ストリームを使用すると、テーブルごとにデータ ミューテーションをサブスクライブできます。テーブル変更ストリームをサブスクライブすると、次の制約が適用されます。

  • 変更されたセルと削除オペレーションの記述子のみが返されます。
  • 変更されたセルの新しい値のみが返されます。

データ変更レコードが BigQuery に書き込まれたときに、元の Bigtable commit タイムスタンプの順序と比べると行が順不同で挿入される可能性があります。

永続的なエラーのために BigQuery に書き込めない変更履歴テーブルの行は、人間による確認またはユーザーによる追加の処理のため、Cloud Storage のデッドレター キュー(未処理メッセージ キュー)ディレクトリに永続的に配置されます。

必要な BigQuery テーブルが存在しない場合は、パイプラインによって作成されます。それ以外の場合は、既存の BigQuery テーブルが使用されます。既存の BigQuery テーブルのスキーマには、次の表の列が含まれている必要があります。

新しい BigQuery の各行には、変更ストリームによって Bigtable テーブルの対応行から返された 1 つのデータ変更レコードが含まれます。

BigQuery 出力テーブルのスキーマ

列名 タイプ null 可能性 説明
row_key STRING または BYTES いいえ 変更された行の行キー。writeRowkeyAsBytes パイプライン オプションが true に設定されている場合、列のタイプは BYTES にする必要があります。それ以外の場合は、STRING タイプを使用します。
mod_type STRING いいえ 行ミューテーションのタイプ。SET_CELLDELETE_CELLSDELETE_FAMILY のいずれかの値を使用します。
column_family STRING いいえ 行ミューテーションの影響を受ける列ファミリー。
column STRING はい 行ミューテーションの影響を受ける列修飾子。DELETE_FAMILY ミューテーション タイプの場合は、NULL に設定します。
commit_timestamp TIMESTAMP いいえ Bigtable がミューテーションを適用する時間。
big_query_commit_timestamp TIMESTAMP はい 省略可: BigQuery が行を出力テーブルに書き込む時間を指定します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
timestamp TIMESTAMP または INT64 はい ミューテーションの影響を受けるセルのタイムスタンプ値。writeNumericTimestamps パイプライン オプションが true に設定されている場合、列のタイプは INT64 にする必要があります。それ以外の場合は、TIMESTAMP タイプを使用します。ミューテーション タイプが DELETE_CELLS および DELETE_FAMILY の場合は、NULL に設定します。
timestamp_from TIMESTAMP または INT64 はい DELETE_CELLS ミューテーションによって削除されたすべてのセルのタイムスタンプ間隔の包括的な開始点を示します。他のミューテーション タイプの場合は、NULL に設定します。
timestamp_to TIMESTAMP または INT64 はい DELETE_CELLS ミューテーションによって削除されたすべてのセルのタイムスタンプ間隔の排他的終了点を示します。他のミューテーション タイプの場合は、NULL に設定します。
is_gc BOOL いいえ 省略可: ミューテーションがガベージ コレクション ポリシーによってトリガーされた場合は、true に設定します。それ以外の場合は false に設定します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
source_instance STRING いいえ (省略可)ミューテーションの取得元である Bigtable インスタンスの名前を記述します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
source_cluster STRING いいえ (省略可)ミューテーションの取得元である Bigtable クラスタの名前を記述します。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
source_table STRING いいえ 省略可: ミューテーションが適用される Bigtable テーブルの名前を記述します。複数の Bigtable テーブルが同じ BigQuery テーブルに変更をストリーミングする場合、この列の値が役に立つことがあります。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
tiebreaker INT64 いいえ (省略可)異なる Bigtable クラスタによって 2 つのミューテーションが同時に登録された場合、tiebreaker 値が最も高いミューテーションがソーステーブルに適用されます。tiebreaker 値が小さいミューテーションは破棄されます。bigQueryChangelogTableFieldsToIgnore パイプライン オプション値に列名が存在する場合、このフィールドには値が入力されません。
value STRING または BYTES はい ミューテーションによって設定された新しい値。writeValuesAsBytes パイプライン オプションが true に設定されている場合、列のタイプは BYTES にする必要があります。それ以外の場合は、STRING タイプを使用します。SET_CELL ミューテーションには値が設定されます。他のミューテーション タイプの場合、値は NULL に設定されます。

パイプラインの要件

  • 指定された Bigtable ソース インスタンス。
  • 指定された Bigtable ソーステーブル。テーブルで変更ストリームが有効になっている必要があります。
  • 指定された Bigtable アプリケーション プロファイル。
  • 指定された BigQuery の宛先データセット。

テンプレートのパラメータ

パラメータ 説明
bigtableReadInstanceId ソース Bigtable インスタンス ID。
bigtableReadTableId ソース Bigtable テーブル ID。
bigtableChangeStreamAppProfile Bigtable アプリケーション プロファイル ID。アプリケーション プロファイルでは、単一クラスタ ルーティングを使用し、単一行のトランザクションを許可する必要があります。
bigQueryDataset 宛先 BigQuery テーブルのデータセット名。
writeNumericTimestamps 省略可: Bigtable のタイムスタンプを BigQuery INT64 として書き込みます。true に設定すると、値が INT64 列に書き込まれます。それ以外の場合、値は TIMESTAMP 列に書き込まれます。影響を受ける列: timestamptimestamp_fromtimestamp_to。デフォルトは false です。true に設定すると、時間は Unix エポック(1970 年 1 月 1 日 UTC)を起点とするマイクロ秒単位で測定されます。
writeRowkeyAsBytes 省略可: 行キーを BigQuery BYTES として書き込みます。true に設定すると、行キーは BYTES 列に書き込まれます。それ以外の場合、行キーは STRING 列に書き込まれます。デフォルトは false です。
writeValuesAsBytes 省略可: 値を BigQuery BYTES として書き込みます。true に設定すると、値が BYTES 列に書き込まれます。それ以外の場合、値は STRING 列に書き込まれます。デフォルトは false です。
bigQueryChangelogTableName 省略可: 宛先 BigQuery テーブル名。指定しない場合、値 bigtableReadTableId + "_changelog" が使用されます。
bigQueryProjectId 省略可: BigQuery データセットのプロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。
bigtableReadProjectId 省略可: Bigtable プロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。
bigtableChangeStreamMetadataInstanceId 省略可: Bigtable 変更ストリーム メタデータのインスタンス ID。
bigtableChangeStreamMetadataTableTableId 省略可: Bigtable 変更ストリームのメタデータのテーブル ID。
bigtableChangeStreamCharset 省略可: 値と列修飾子を読み取るときの Bigtable 変更ストリームの文字セット名
bigtableChangeStreamStartTimestamp (省略可)変更ストリームの読み取りに使用される開始時間のタイムスタンプ(この値を含む)。たとえば、2022-05-05T07:59:59Z のようにします。デフォルトは、パイプラインの開始時間のタイムスタンプです。
bigtableChangeStreamIgnoreColumnFamilies 省略可: 無視する列ファミリー名の変更のカンマ区切りのリスト。
bigtableChangeStreamIgnoreColumns 省略可: 無視する列名の変更のカンマ区切りのリスト。
bigtableChangeStreamName 省略可: クライアント パイプラインの一意の名前。実行中のパイプラインが停止した時点から処理を再開できます。デフォルトは自動生成された名前です。使用される値については、Dataflow ジョブのログをご覧ください。
bigtableChangeStreamResume 省略可: true に設定すると、同じ bigtableChangeStreamName 値で実行中のパイプラインが停止した時点から、新しいパイプラインが処理を再開します。指定された bigtableChangeStreamName 値を持つパイプラインが一度も実行されていない場合、新しいパイプラインは開始されません。false に設定すると、新しいパイプラインが開始されます。指定されたソースに対して同じ bigtableChangeStreamName 値を持つパイプラインがすでに実行されている場合、新しいパイプラインは開始しません。デフォルトは false です。
bigQueryChangelogTableFieldsToIgnore 省略可: 指定した場合、作成されず入力されない変更履歴列のカンマ区切りのリスト。サポートされている値(is_gcsource_instancesource_clustersource_tabletiebreakerbig_query_commit_timestamp)のいずれかを使用します。デフォルトでは、すべての列にデータが入力されます。
bigQueryChangelogTablePartitionExpirationMs 省略可: 変更履歴テーブルのパーティションの有効期限をミリ秒単位で設定します。true に設定すると、指定したミリ秒数より古いパーティションが削除されます。デフォルトでは、有効期限は設定されていません。
bigQueryChangelogTablePartitionGranularity 省略可: 変更履歴テーブルのパーティショニングの粒度を指定します。設定すると、テーブルはパーティショニングされます。サポートされている値(HOURDAYMONTHYEAR)のいずれかを使用します。デフォルトでは、テーブルはパーティショニングされません。
dlqDirectory 省略可: デッドレター キューのディレクトリ。処理に失敗したレコードは、このディレクトリに保存されます。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合、デフォルトのパスを使用できます。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Bigtable change streams to BigQuery template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
bigQueryDataset=BIGQUERY_DESTINATION_DATASET

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable アプリケーション プロファイル ID。
  • BIGQUERY_DESTINATION_DATASET: BigQuery の宛先データセット名

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET"
    }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • BIGTABLE_INSTANCE_ID: Bigtable インスタンス ID。
  • BIGTABLE_TABLE_ID: Bigtable テーブル ID。
  • BIGTABLE_APPLICATION_PROFILE_ID: Bigtable アプリケーション プロファイル ID。
  • BIGQUERY_DESTINATION_DATASET: BigQuery の宛先データセット名

次のステップ