Spanner change streams to BigQuery テンプレート

Spanner change streams to BigQuery テンプレートは、Spanner データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して BigQuery テーブルに書き込むストリーミング パイプラインです。

変更ストリームの監視対象列はすべて、Spanner トランザクションによって変更されたかどうかにかかわらず、各 BigQuery テーブル行に含まれます。監視されていない列は BigQuery 行に含まれません。Dataflow のウォーターマークよりも小さい Spanner の変更は、BigQuery テーブルに正常に適用されるか、再試行のためにデッドレター キューに保存されます。BigQuery の行は、元の Spanner commit タイムスタンプの順序と比較して順不同で挿入されます。

必要な BigQuery テーブルが存在しない場合は、パイプラインによって作成されます。それ以外の場合は、既存の BigQuery テーブルが使用されます。既存の BigQuery テーブルのスキーマには、Spanner テーブルの対応する追跡対象列と、ignoreFields オプションによって明示的に無視されない追加のメタデータ列が含まれている必要があります。次のリストのメタデータ フィールドの説明をご覧ください。新しい BigQuery の各行には、変更レコードのタイムスタンプの時点で、Spanner テーブルの対応する行から変更ストリームによって監視されているすべての列が含まれます。

BigQuery テーブルに次のメタデータ フィールドが追加されます。これらのフィールドの詳細については、「変更ストリームのパーティション、レコード、クエリ」のデータ変更レコードをご覧ください。

このテンプレートを使用する場合は、次の点に注意してください。

  • このテンプレートは、スキーマの変更を Spanner から BigQuery に伝播しません。Spanner でスキーマの変更を行うとパイプラインが壊れる可能性が高いため、スキーマの変更後にパイプラインを再作成する必要が生じることがあります。
  • OLD_AND_NEW_VALUES 値と NEW_VALUES 値のキャプチャ タイプで、データ変更レコードに UPDATE 変更が含まれている場合、テンプレートはデータ変更レコードの commit タイムスタンプの時点で Spanner に対してステイル読み取りを実行し、変更されていない監視対象の列を取得する必要があります。ステイル読み取りに対してデータベース「version_retention_period」が正しく構成されていることを確認してください。NEW_ROW 値のキャプチャ タイプでは、データ変更レコードが UPDATE リクエストで更新されない列を含む新しい行をすべてキャプチャするため、テンプレートのほうが効率的で、そのテンプレートではステイル読み取りを行う必要がありません。
  • ネットワークのレイテンシとネットワーク転送のコストを最小限に抑えるには、Spanner インスタンスまたは BigQuery テーブルと同じリージョンから Dataflow ジョブを実行します。使用するソース、シンク、ステージング ファイルのロケーションや、一時ファイルのロケーションがジョブのリージョン外である場合、データがリージョンを越えて送信される可能性があります。詳細については、Dataflow のリージョンをご覧ください。
  • このテンプレートは有効な Spanner のデータ型をすべてサポートしていますが、特に次の場合に、BigQuery の型が Spanner の型より精度が高いと、変換中に精度が失われる可能性があります。
    • Spanner JSON 型では、オブジェクトのメンバーの順序は辞書順に並べ替えられますが、BigQuery JSON 型ではこのような保証はありません。
    • Spanner はナノ秒単位の TIMESTAMP 型をサポートしますが、BigQuery はマイクロ秒単位の TIMESTAMP 型のみをサポートします。
  • このテンプレートでは、BigQuery Storage Write API を Exactly-Once モードで使用することはサポートされていません。

変更ストリームの詳細については、変更ストリーム Dataflow パイプラインの構築方法ベスト プラクティスをご覧ください。

パイプラインの要件

  • パイプラインの実行前に Spanner インスタンスが存在している。
  • パイプラインの実行前に Spanner データベースが存在している。
  • パイプラインの実行前に Spanner メタデータ インスタンスが存在している。
  • パイプラインの実行前に Spanner メタデータ データベースが存在している。
  • パイプラインの実行前に Spanner の変更ストリームが存在している。
  • パイプラインの実行前に BigQuery データセットが存在している必要があります。

テンプレートのパラメータ

必須パラメータ

  • spannerInstanceId: 変更ストリームの読み取り元の Spanner インスタンス。
  • spannerDatabase: 変更ストリームの読み取り元の Spanner データベース。
  • spannerMetadataInstanceId: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner インスタンス。
  • spannerMetadataDatabase: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner データベース。
  • spannerChangeStreamName: 読み取り元の Spanner 変更ストリームの名前。
  • bigQueryDataset: 変更ストリーム出力の BigQuery データセット。

オプション パラメータ

  • spannerProjectId: 変更ストリームの読み取り元のプロジェクト。この値は、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルト値は、Dataflow パイプラインが動作しているプロジェクトです。
  • spannerDatabaseRole: テンプレートの実行時に使用される Spanner データベース ロール。このパラメータは、テンプレートを実行している IAM プリンシパルが、きめ細かいアクセス制御のユーザーである場合にのみ必要です。データベースのロールには、変更ストリームに対する SELECT 権限と、変更ストリームの読み取り機能に対する EXECUTE 権限が必要です。詳細については、変更ストリームのきめ細かなアクセス制御(https://cloud.google.com/spanner/docs/fgac-change-streams)をご覧ください。
  • spannerMetadataTableName: Spanner の変更ストリーム コネクタで使用するメタデータ テーブル名。指定しない場合、パイプライン フロー中に Spanner 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。それ以外の場合は、このパラメータを指定しないでください。
  • rpcPriority: Spanner 呼び出しのリクエストの優先度。値は HIGHMEDIUMLOW のいずれかにする必要があります。デフォルト値は HIGH です。
  • spannerHost: テンプレート内で呼び出す Cloud Spanner のエンドポイント。テストでのみ使われます(例: https://batch-spanner.googleapis.com)。
  • startTimestamp: 変更ストリームの読み取りに使用される開始日時(https://datatracker.ietf.org/doc/html/rfc3339)。この値も含みます。例: 2021-10-12T07:20:50.52Zデフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。
  • endTimestamp: 変更ストリームの読み取りに使用される終了日時(https://datatracker.ietf.org/doc/html/rfc3339)。この値を含みます。例: 2021-10-12T07:20:50.52Z。デフォルトは、現在よりも先の無限の時間です。
  • bigQueryProjectId: BigQuery プロジェクト。デフォルト値は Dataflow ジョブのプロジェクトです。
  • bigQueryChangelogTableNameTemplate: 変更履歴を含む BigQuery テーブルの名前のテンプレート。デフォルトは {_metadata_spanner_table_name}_changelog です。
  • deadLetterQueueDirectory: 未処理のレコードを保存するパス。デフォルトのパスは、Dataflow ジョブの一時保存場所の下のディレクトリです。通常はデフォルト値で十分です。
  • dlqRetryMinutes: デッドレター キューの再試行間隔(分)。デフォルト値は 10 です。
  • ignoreFields: 無視するフィールドのカンマ区切のリスト(大文字と小文字は区別されます)。これらのフィールドは、監視対象テーブルのフィールド、またはパイプラインによって追加されたメタデータ フィールドです。無視されるフィールドは BigQuery に挿入されません。_metadata_spanner_table_name フィールドを無視すると、bigQueryChangelogTableNameTemplate パラメータも無視されます。デフォルトは空です。
  • disableDlqRetries: DLQ の再試行を無効にするかどうか。デフォルトは false です。
  • useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
  • numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Spanner change streams to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • BIGQUERY_DATASET: 変更ストリーム出力の BigQuery データセット

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • BIGQUERY_DATASET: 変更ストリーム出力の BigQuery データセット

次のステップ