Spanner change streams to BigQuery テンプレート

Spanner change streams to BigQuery テンプレートは、Spanner データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して BigQuery テーブルに書き込むストリーミング パイプラインです。

変更ストリームの監視対象列はすべて、Spanner トランザクションによって変更されたかどうかにかかわらず、各 BigQuery テーブル行に含まれます。監視されていない列は BigQuery 行に含まれません。Dataflow のウォーターマークよりも小さい Spanner の変更は、BigQuery テーブルに正常に適用されるか、再試行のためにデッドレター キューに保存されます。BigQuery の行は、元の Spanner commit タイムスタンプの順序と比較して順不同で挿入されます。

必要な BigQuery テーブルが存在しない場合は、パイプラインによって作成されます。それ以外の場合は、既存の BigQuery テーブルが使用されます。既存の BigQuery テーブルのスキーマには、Spanner テーブルの対応する追跡対象列と、ignoreFields オプションによって明示的に無視されない追加のメタデータ列が含まれている必要があります。次のリストのメタデータ フィールドの説明をご覧ください。新しい BigQuery の各行には、変更レコードのタイムスタンプの時点で、Spanner テーブルの対応する行から変更ストリームによって監視されているすべての列が含まれます。

BigQuery テーブルに次のメタデータ フィールドが追加されます。これらのフィールドの詳細については、「変更ストリームのパーティション、レコード、クエリ」のデータ変更レコードをご覧ください。

このテンプレートを使用する場合は、次の点に注意してください。

  • このテンプレートは、スキーマの変更を Spanner から BigQuery に伝播しません。Spanner でスキーマの変更を行うとパイプラインが壊れる可能性が高いため、スキーマの変更後にパイプラインを再作成する必要が生じることがあります。
  • OLD_AND_NEW_VALUES 値と NEW_VALUES 値のキャプチャ タイプで、データ変更レコードに UPDATE 変更が含まれている場合、テンプレートはデータ変更レコードの commit タイムスタンプの時点で Spanner に対してステイル読み取りを実行し、変更されていない監視対象の列を取得する必要があります。ステイル読み取りに対してデータベース「version_retention_period」が正しく構成されていることを確認してください。NEW_ROW 値のキャプチャ タイプでは、データ変更レコードが UPDATE リクエストで更新されない列を含む新しい行をすべてキャプチャするため、テンプレートのほうが効率的で、そのテンプレートではステイル読み取りを行う必要がありません。
  • ネットワークのレイテンシとネットワーク転送のコストを最小限に抑えるには、Spanner インスタンスまたは BigQuery テーブルと同じリージョンから Dataflow ジョブを実行します。使用するソース、シンク、ステージング ファイルのロケーションや、一時ファイルのロケーションがジョブのリージョン外である場合、データがリージョンを越えて送信される可能性があります。詳細については、Dataflow のリージョンをご覧ください。
  • このテンプレートは有効な Spanner のデータ型をすべてサポートしていますが、特に次の場合に、BigQuery の型が Spanner の型より精度が高いと、変換中に精度が失われる可能性があります。
    • Spanner JSON 型では、オブジェクトのメンバーの順序は辞書順に並べ替えられますが、BigQuery JSON 型ではこのような保証はありません。
    • Spanner はナノ秒単位の TIMESTAMP 型をサポートしますが、BigQuery はマイクロ秒単位の TIMESTAMP 型のみをサポートします。
  • このテンプレートでは、BigQuery Storage Write API を Exactly-Once モードで使用することはサポートされていません。

変更ストリームの詳細については、変更ストリーム Dataflow パイプラインの構築方法ベスト プラクティスをご覧ください。

パイプラインの要件

  • パイプラインの実行前に Spanner インスタンスが存在している。
  • パイプラインの実行前に Spanner データベースが存在している。
  • パイプラインの実行前に Spanner メタデータ インスタンスが存在している。
  • パイプラインの実行前に Spanner メタデータ データベースが存在している。
  • パイプラインの実行前に Spanner の変更ストリームが存在している。
  • パイプラインの実行前に BigQuery データセットが存在している必要があります。

テンプレートのパラメータ

必須パラメータ

  • spannerInstanceId: 変更ストリームの読み取り元の Spanner インスタンス。
  • spannerDatabase: 変更ストリームの読み取り元の Spanner データベース。
  • spannerMetadataInstanceId: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner インスタンス。
  • spannerMetadataDatabase: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner データベース。データベース内のすべてのテーブルを追跡する変更ストリームでは、メタデータ テーブルを別のデータベースに配置することをおすすめします。
  • spannerChangeStreamName: 読み取り元の Spanner 変更ストリームの名前。
  • bigQueryDataset: 変更ストリーム出力の BigQuery データセット。dataSetName と完全な dataSetId(例: bigQueryProjectId.dataSetName)の両方が使用できます。

オプション パラメータ

  • spannerProjectId: 変更ストリームの読み取り元のプロジェクト。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
  • spannerDatabaseRole: 変更ストリームから読み取る際にユーザーが想定するデータベース ロール。このデータベース ロールには、変更ストリームからの読み取るための権限が必要です。データベース ロールが指定されていない場合、ユーザーにはデータベースから読み取るための IAM 権限が必要です。
  • spannerMetadataTableName: Cloud Spanner の変更ストリーム コネクタで使用するメタデータ テーブル名。指定しない場合、Cloud Spanner の変更ストリーム コネクタで使用するメタデータ テーブルはパイプライン フロー中に自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。それ以外の場合は指定しないでください。
  • rpcPriority: Cloud Spanner 呼び出しのリクエストの優先度。値は HIGH、MEDIUM、LOW のいずれかである必要があります。デフォルトは HIGH です。
  • spannerHost: テンプレート内で呼び出す Cloud Spanner のエンドポイント。テストでのみ使われます(例: https://batch-spanner.googleapis.com)。
  • startTimestamp: 変更ストリームの読み取りに使用される開始日時(この値を含む)(https://tools.ietf.org/html/rfc3339)。たとえば、2022-05-05T07:59:59Z のように指定します。デフォルトは、パイプライン開始時のタイムスタンプです。
  • endTimestamp: 変更ストリームの読み取りに使用される終了日時(この値を含む)(https://tools.ietf.org/html/rfc3339)。たとえば、2022-05-05T07:59:59Z のように指定します。デフォルトは、現在よりも先の無限の時間です。
  • bigQueryProjectId: BigQuery プロジェクト。デフォルトは Dataflow ジョブのプロジェクトです。
  • bigQueryChangelogTableNameTemplate: 変更履歴を含む BigQuery テーブル名のテンプレート。デフォルトは {_metadata_spanner_table_name}_changelog です。
  • deadLetterQueueDirectory: 処理されなかった理由とともに、未処理のレコードを保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合は、デフォルト値のまま使用できます。
  • dlqRetryMinutes: デッドレター キューの再試行間隔(分)。デフォルトは 10 です。
  • ignoreFields: 無視するフィールドのカンマ区切りリスト。追跡テーブルのフィールド、またはメタデータ フィールド(_metadata_spanner_mod_type、_metadata_spanner_table_name、_metadata_spanner_commit_timestamp、_metadata_spanner_server_transaction_id、_metadata_spanner_record_sequence、_metadata_spanner_is_last_record_in_transaction_in_partition、_metadata_spanner_number_of_records_in_transaction、_metadata_spanner_number_of_partitions_in_transaction、_metadata_big_query_commit_timestamp)です。デフォルトは空です。
  • disableDlqRetries: DLQ の再試行を無効にするかどうか。デフォルトは false です。
  • useStorageWriteApi: true の場合、パイプラインは BigQuery にデータを書き込むときに Storage Write API を使用します(https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api を参照)。デフォルト値は false です。Storage Write API を 1 回限りモードで使用する場合は、BigQuery Storage Write API のストリーム数と BigQuery Storage Write API のトリガー頻度(秒単位)のパラメータを設定する必要があります。Dataflow の 1 回以上モードを有効にするか、useStorageWriteApiAtLeastOnce パラメータを true に設定すると、ストリーム数やトリガー頻度を設定する必要はありません。
  • useStorageWriteApiAtLeastOnce: このパラメータは、BigQuery Storage Write API の使用が有効になっている場合にのみ有効になります。有効になっている場合は、Storage Write API に 1 回以上のセマンティクスが使用され、有効でなければ 1 回限りのセマンティクスが使用されます。デフォルトは false です。
  • numStorageWriteApiStreams: ストリームの数は、BigQueryIO の Write 変換の並列性を定義し、パイプラインで使用される Storage Write API のストリームの数にほぼ対応します。推奨値については、https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api をご覧ください。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: トリガーの頻度は、データが BigQuery でクエリできるようになるまでの時間を決定します。推奨値については、https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api をご覧ください。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Spanner change streams to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • BIGQUERY_DATASET: 変更ストリーム出力の BigQuery データセット

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • BIGQUERY_DATASET: 変更ストリーム出力の BigQuery データセット

次のステップ