Spanner change streams to BigQuery テンプレート

Spanner change streams to BigQuery テンプレートは、Spanner データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して BigQuery テーブルに書き込むストリーミング パイプラインです。

変更ストリームの監視対象列はすべて、Spanner トランザクションによって変更されたかどうかにかかわらず、各 BigQuery テーブル行に含まれます。監視されていない列は BigQuery 行に含まれません。Dataflow のウォーターマークよりも小さい Spanner の変更は、BigQuery テーブルに正常に適用されるか、再試行のためにデッドレター キューに保存されます。BigQuery の行は、元の Spanner commit タイムスタンプの順序と比較して順不同で挿入されます。

必要な BigQuery テーブルが存在しない場合は、パイプラインによって作成されます。それ以外の場合は、既存の BigQuery テーブルが使用されます。既存の BigQuery テーブルのスキーマには、Spanner テーブルの対応する追跡対象列と、ignoreFields オプションによって明示的に無視されない追加のメタデータ列が含まれている必要があります。次のリストのメタデータ フィールドの説明をご覧ください。新しい BigQuery の各行には、変更レコードのタイムスタンプの時点で、Spanner テーブルの対応する行から変更ストリームによって監視されているすべての列が含まれます。

BigQuery テーブルに次のメタデータ フィールドが追加されます。これらのフィールドの詳細については、「変更ストリームのパーティション、レコード、クエリ」のデータ変更レコードをご覧ください。

  • _metadata_spanner_mod_type: Spanner トランザクションの変更タイプ(挿入、更新、削除)。変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_table_name: Spanner テーブル名このフィールドは、コネクタのメタデータ テーブル名ではありません。
  • _metadata_spanner_commit_timestamp: Spanner の commit タイムスタンプ。変更が commit された時刻です。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_server_transaction_id: 変更が commit された Spanner トランザクションを表す、グローバルに一意の文字列。この値は、変更ストリームのレコードを処理するコンテキストでのみ使用します。Spanner の API では、トランザクション ID と相関はありません。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_record_sequence: Spanner トランザクション内のレコードのシーケンス番号。シーケンス番号は、トランザクション内で一意で、単調に増加することが保証されています(ただし、必ずしも連続ではありません)。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_is_last_record_in_transaction_in_partition: そのレコードが、現在のパーティションにおける Spanner トランザクションの最後のレコードであるかどうかを示します。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_number_of_records_in_transaction: すべての変更ストリーム パーティションの Spanner トランザクションに含まれるデータ変更レコードの数。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_number_of_partitions_in_transaction: Spanner トランザクションのデータ変更レコードを返すパーティションの数。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_big_query_commit_timestamp: 行が BigQuery に挿入されたときの commit タイムスタンプ。useStorageWriteApitrue の場合、この列はパイプラインによって変更履歴テーブルに自動的に作成されません。この場合、必要に応じて、この列を変更履歴テーブルに手動で追加する必要があります。

このテンプレートを使用する場合は、次の点に注意してください。

  • このテンプレートを使用すると、既存のテーブルまたは新しいテーブルの新しい列を Spanner から BigQuery に伝播できます。詳細については、トラッキング テーブルまたは列の追加を処理するをご覧ください。
  • OLD_AND_NEW_VALUES 値と NEW_VALUES 値のキャプチャ タイプで、データ変更レコードに UPDATE 変更が含まれている場合、テンプレートはデータ変更レコードの commit タイムスタンプの時点で Spanner に対してステイル読み取りを実行し、変更されていない監視対象の列を取得する必要があります。ステイル読み取りに対してデータベース「version_retention_period」が正しく構成されていることを確認してください。NEW_ROW 値のキャプチャ タイプでは、データ変更レコードが UPDATE リクエストで更新されない列を含む新しい行をすべてキャプチャするため、テンプレートのほうが効率的で、そのテンプレートではステイル読み取りを行う必要がありません。
  • ネットワークのレイテンシとネットワーク転送のコストを最小限に抑えるには、Spanner インスタンスまたは BigQuery テーブルと同じリージョンから Dataflow ジョブを実行します。使用するソース、シンク、ステージング ファイルのロケーションや、一時ファイルのロケーションがジョブのリージョン外である場合、データがリージョンを越えて送信される可能性があります。詳細については、Dataflow のリージョンをご覧ください。
  • このテンプレートは有効な Spanner のデータ型をすべてサポートしていますが、特に次の場合に、BigQuery の型が Spanner の型より精度が高いと、変換中に精度が失われる可能性があります。
    • Spanner JSON 型では、オブジェクトのメンバーの順序は辞書順に並べ替えられますが、BigQuery JSON 型ではこのような保証はありません。
    • Spanner はナノ秒単位の TIMESTAMP 型をサポートしますが、BigQuery はマイクロ秒単位の TIMESTAMP 型のみをサポートします。
  • このテンプレートでは、BigQuery Storage Write API を Exactly-Once モードで使用することはサポートされていません。

変更ストリームの詳細については、変更ストリーム Dataflow パイプラインの構築方法ベスト プラクティスをご覧ください。

パイプラインの要件

  • パイプラインの実行前に Spanner インスタンスが存在している。
  • パイプラインの実行前に Spanner データベースが存在している。
  • パイプラインの実行前に Spanner メタデータ インスタンスが存在している。
  • パイプラインの実行前に Spanner メタデータ データベースが存在している。
  • パイプラインの実行前に Spanner の変更ストリームが存在している。
  • パイプラインの実行前に BigQuery データセットが存在している。

トラッキング テーブルまたは列の追加を処理する

このセクションでは、パイプラインの実行中にトラッキング用 Spanner テーブルと列の追加を処理するためのベスト プラクティスについて説明します。

  • Spanner 変更ストリーム スコープに新しい列を追加する前に、まず BigQuery 変更ログテーブルに列を追加します。追加する列のデータ型は、一致するデータ型で NULLABLE の必要があります。10 分以上待ってから、Spanner で新しい列またはテーブルの作成を続行します。待機せずに新しい列に書き込むと、デッドレター キュー ディレクトリに無効なエラーコードを持つ未処理のレコードが作成される可能性があります。
  • 新しいテーブルを追加するには、まず Spanner データベースにテーブルを追加します。パイプラインが新しいテーブルのレコードを受信すると、テーブルが BigQuery に自動的に作成されます。
  • Spanner データベースに新しい列またはテーブルを追加した後で、新しい列またはテーブルが暗黙的に追跡されていなければ、追跡されるように変更ストリームを修正します。
  • このテンプレートでは、BigQuery からテーブルや列が削除されることはありません。列が Spanner テーブルから削除された場合、BigQuery から列を手動で削除しない限り、Spanner テーブルから列が削除された後に生成されるレコードの BigQuery 変更ログ列に null 値が入力されます。
  • このテンプレートは、列タイプの更新をサポートしていません。Spanner では、STRING 列を BYTES 列に変更、または BYTES 列を STRING 列に変更することはできますが、BigQuery では既存の列のデータ型を変更したり、同じ列名を異なるデータ型で使用することはできません。Spanner 列を削除してから、同じ名前で異なる型の列を再作成すると、データは既存の BigQuery 列に書き込まれますが、型は変更されません。
  • このテンプレートは、列モードの更新をサポートしていません。BigQuery に複製されたメタデータ列は、REQUIRED モードに設定されます。BigQuery に複製された他のすべての列は、Spanner テーブルで NOT NULL として定義されているかどうかにかかわらず、NULLABLE に設定されます。BigQuery で NULLABLE 列を REQUIRED モードに更新することはできません。
  • 実行中のパイプラインで、変更ストリームの値キャプチャ タイプ を変更することはできません。

テンプレートのパラメータ

必須パラメータ

  • spannerInstanceId: 変更ストリームの読み取り元の Spanner インスタンス。
  • spannerDatabase: 変更ストリームの読み取り元の Spanner データベース。
  • spannerMetadataInstanceId: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner インスタンス。
  • spannerMetadataDatabase: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner データベース。
  • spannerChangeStreamName: 読み取り元の Spanner 変更ストリームの名前。
  • bigQueryDataset: 変更ストリーム出力の BigQuery データセット。

オプション パラメータ

  • spannerProjectId: 変更ストリームの読み取り元のプロジェクト。この値は、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルト値は、Dataflow パイプラインが動作しているプロジェクトです。
  • spannerDatabaseRole: テンプレートの実行時に使用される Spanner データベース ロール。このパラメータは、テンプレートを実行している IAM プリンシパルが、きめ細かいアクセス制御のユーザーである場合にのみ必要です。データベースのロールには、変更ストリームに対する SELECT 権限と、変更ストリームの読み取り機能に対する EXECUTE 権限が必要です。詳細については、変更ストリームのきめ細かなアクセス制御(https://cloud.google.com/spanner/docs/fgac-change-streams)をご覧ください。
  • spannerMetadataTableName: Spanner の変更ストリーム コネクタで使用するメタデータ テーブル名。指定しない場合、パイプライン フロー中に Spanner 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。それ以外の場合は、このパラメータを指定しないでください。
  • rpcPriority: Spanner 呼び出しのリクエストの優先度。値は HIGHMEDIUMLOW のいずれかにする必要があります。デフォルト値は HIGH です。
  • spannerHost: テンプレート内で呼び出す Cloud Spanner のエンドポイント。テストでのみ使われます(例: https://batch-spanner.googleapis.com)。
  • startTimestamp: 変更ストリームの読み取りに使用される開始日時(https://datatracker.ietf.org/doc/html/rfc3339)。この値も含みます。例: 2021-10-12T07:20:50.52Zデフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。
  • endTimestamp: 変更ストリームの読み取りに使用される終了日時(https://datatracker.ietf.org/doc/html/rfc3339)。この値を含みます。例: 2021-10-12T07:20:50.52Z。デフォルトは、現在よりも先の無限の時間です。
  • bigQueryProjectId: BigQuery プロジェクト。デフォルト値は Dataflow ジョブのプロジェクトです。
  • bigQueryChangelogTableNameTemplate: 変更履歴を含む BigQuery テーブルの名前のテンプレート。デフォルトは {_metadata_spanner_table_name}_changelog です。
  • deadLetterQueueDirectory: 未処理のレコードを保存するパス。デフォルトのパスは、Dataflow ジョブの一時保存場所の下のディレクトリです。通常はデフォルト値で十分です。
  • dlqRetryMinutes: デッドレター キューの再試行間隔(分)。デフォルト値は 10 です。
  • ignoreFields: 無視するフィールドのカンマ区切のリスト(大文字と小文字は区別されます)。これらのフィールドは、監視対象テーブルのフィールド、またはパイプラインによって追加されたメタデータ フィールドです。無視されるフィールドは BigQuery に挿入されません。_metadata_spanner_table_name フィールドを無視すると、bigQueryChangelogTableNameTemplate パラメータも無視されます。デフォルトは空です。
  • disableDlqRetries: DLQ の再試行を無効にするかどうか。デフォルトは false です。
  • useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
  • numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Spanner change streams to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • BIGQUERY_DATASET: 変更ストリーム出力の BigQuery データセット

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • BIGQUERY_DATASET: 変更ストリーム出力の BigQuery データセット

次のステップ