Spanner change streams to BigQuery テンプレート

Spanner change streams to BigQuery テンプレートは、Spanner データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して BigQuery テーブルに書き込むストリーミング パイプラインです。

変更ストリームの監視対象列はすべて、Spanner トランザクションによって変更されたかどうかにかかわらず、各 BigQuery テーブル行に含まれます。監視されていない列は BigQuery 行に含まれません。Dataflow のウォーターマークよりも小さい Spanner の変更は、BigQuery テーブルに正常に適用されるか、再試行のためにデッドレター キューに保存されます。BigQuery の行は、元の Spanner commit タイムスタンプの順序と比較して順不同で挿入されます。

必要な BigQuery テーブルが存在しない場合は、パイプラインによって作成されます。それ以外の場合は、既存の BigQuery テーブルが使用されます。既存の BigQuery テーブルのスキーマには、Spanner テーブルの対応する追跡対象列と、ignoreFields オプションによって明示的に無視されない追加のメタデータ列が含まれている必要があります。次のリストのメタデータ フィールドの説明をご覧ください。新しい BigQuery の各行には、変更レコードのタイムスタンプの時点で、Spanner テーブルの対応する行から変更ストリームによって監視されているすべての列が含まれます。

BigQuery テーブルに次のメタデータ フィールドが追加されます。これらのフィールドの詳細については、「変更ストリームのパーティション、レコード、クエリ」のデータ変更レコードをご覧ください。

  • _metadata_spanner_mod_type: Spanner トランザクションの変更タイプ(挿入、更新、削除)。変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_table_name: Spanner テーブル名このフィールドは、コネクタのメタデータ テーブル名ではありません。
  • _metadata_spanner_commit_timestamp: Spanner の commit タイムスタンプ。変更が commit された時刻です。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_server_transaction_id: 変更が commit された Spanner トランザクションを表す、グローバルに一意の文字列。この値は、変更ストリームのレコードを処理するコンテキストでのみ使用します。Spanner の API では、トランザクション ID と相関はありません。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_record_sequence: Spanner トランザクション内のレコードのシーケンス番号。シーケンス番号は、トランザクション内で一意で、単調に増加することが保証されています(ただし、必ずしも連続ではありません)。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_is_last_record_in_transaction_in_partition: そのレコードが、現在のパーティションにおける Spanner トランザクションの最後のレコードであるかどうかを示します。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_number_of_records_in_transaction: すべての変更ストリーム パーティションの Spanner トランザクションに含まれるデータ変更レコードの数。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_spanner_number_of_partitions_in_transaction: Spanner トランザクションのデータ変更レコードを返すパーティションの数。この値は、変更ストリームのデータ変更レコードから抽出されます。
  • _metadata_big_query_commit_timestamp: 行が BigQuery に挿入されたときの commit タイムスタンプ。useStorageWriteApitrue の場合、この列はパイプラインによって変更履歴テーブルに自動的に作成されません。この場合、必要に応じて、この列を変更履歴テーブルに手動で追加する必要があります。

このテンプレートを使用する場合は、次の点に注意してください。

  • このテンプレートを使用すると、既存のテーブルまたは新しいテーブルの新しい列を Spanner から BigQuery に伝播できます。詳細については、トラッキング テーブルまたは列の追加を処理するをご覧ください。
  • OLD_AND_NEW_VALUES 値と NEW_VALUES 値のキャプチャ タイプで、データ変更レコードに UPDATE 変更が含まれている場合、テンプレートはデータ変更レコードの commit タイムスタンプの時点で Spanner に対してステイル読み取りを実行し、変更されていない監視対象の列を取得する必要があります。ステイル読み取りに対してデータベース「version_retention_period」が正しく構成されていることを確認してください。NEW_ROW 値のキャプチャ タイプでは、データ変更レコードが UPDATE リクエストで更新されない列を含む新しい行をすべてキャプチャするため、テンプレートのほうが効率的で、そのテンプレートではステイル読み取りを行う必要がありません。
  • ネットワークのレイテンシとネットワーク転送のコストを最小限に抑えるには、Spanner インスタンスまたは BigQuery テーブルと同じリージョンから Dataflow ジョブを実行します。使用するソース、シンク、ステージング ファイルのロケーションや、一時ファイルのロケーションがジョブのリージョン外である場合、データがリージョンを越えて送信される可能性があります。詳細については、Dataflow のリージョンをご覧ください。
  • このテンプレートは有効な Spanner のデータ型をすべてサポートしていますが、特に次の場合に、BigQuery の型が Spanner の型より精度が高いと、変換中に精度が失われる可能性があります。
    • Spanner JSON 型では、オブジェクトのメンバーの順序は辞書順に並べ替えられますが、BigQuery JSON 型ではこのような保証はありません。
    • Spanner はナノ秒単位の TIMESTAMP 型をサポートしますが、BigQuery はマイクロ秒単位の TIMESTAMP 型のみをサポートします。
  • このテンプレートでは、BigQuery Storage Write API を Exactly-Once モードで使用することはサポートされていません。

変更ストリームの詳細については、変更ストリーム Dataflow パイプラインの構築方法ベスト プラクティスをご覧ください。

パイプラインの要件

  • パイプラインの実行前に Spanner インスタンスが存在している。
  • パイプラインの実行前に Spanner データベースが存在している。
  • パイプラインの実行前に Spanner メタデータ インスタンスが存在している。
  • パイプラインの実行前に Spanner メタデータ データベースが存在している。
  • パイプラインの実行前に Spanner の変更ストリームが存在している。
  • パイプラインの実行前に BigQuery データセットが存在している。

トラッキング テーブルまたは列の追加を処理する

このセクションでは、パイプラインの実行中にトラッキング用 Spanner テーブルと列の追加を処理するためのベスト プラクティスについて説明します。

  • Spanner 変更ストリーム スコープに新しい列を追加する前に、まず BigQuery 変更ログテーブルに列を追加します。追加する列のデータ型は、一致するデータ型で NULLABLE の必要があります。10 分以上待ってから、Spanner で新しい列またはテーブルの作成を続行します。待機せずに新しい列に書き込むと、デッドレター キュー ディレクトリに無効なエラーコードを持つ未処理のレコードが作成される可能性があります。
  • 新しいテーブルを追加するには、まず Spanner データベースにテーブルを追加します。パイプラインが新しいテーブルのレコードを受信すると、テーブルが BigQuery に自動的に作成されます。
  • Spanner データベースに新しい列またはテーブルを追加した後で、新しい列またはテーブルが暗黙的に追跡されていなければ、追跡されるように変更ストリームを修正します。
  • このテンプレートでは、BigQuery からテーブルや列が削除されることはありません。列が Spanner テーブルから削除された場合、BigQuery から列を手動で削除しない限り、Spanner テーブルから列が削除された後に生成されるレコードの BigQuery 変更ログ列に null 値が入力されます。
  • このテンプレートは、列タイプの更新をサポートしていません。Spanner では、STRING 列を BYTES 列に変更、または BYTES 列を STRING 列に変更することはできますが、BigQuery では既存の列のデータ型を変更したり、同じ列名を異なるデータ型で使用することはできません。Spanner 列を削除してから、同じ名前で異なる型の列を再作成すると、データは既存の BigQuery 列に書き込まれますが、型は変更されません。
  • このテンプレートは、列モードの更新をサポートしていません。BigQuery に複製されたメタデータ列は、REQUIRED モードに設定されます。BigQuery に複製された他のすべての列は、Spanner テーブルで NOT NULL として定義されているかどうかにかかわらず、NULLABLE に設定されます。BigQuery で NULLABLE 列を REQUIRED モードに更新することはできません。
  • 実行中のパイプラインで、変更ストリームの値キャプチャ タイプ を変更することはできません。

テンプレートのパラメータ

必須パラメータ

  • spannerInstanceId: 変更ストリームの読み取り元の Spanner インスタンス。
  • spannerDatabase: 変更ストリームの読み取り元の Spanner データベース。
  • spannerMetadataInstanceId: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner インスタンス。
  • spannerMetadataDatabase: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner データベース。
  • spannerChangeStreamName: 読み取り元の Spanner 変更ストリームの名前。
  • bigQueryDataset: 変更ストリーム出力の BigQuery データセット。

オプション パラメータ

  • spannerProjectId: 変更ストリームの読み取り元のプロジェクト。この値は、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルト値は、Dataflow パイプラインが動作しているプロジェクトです。
  • spannerDatabaseRole: テンプレートの実行時に使用される Spanner データベース ロール。このパラメータは、テンプレートを実行している IAM プリンシパルが、きめ細かいアクセス制御のユーザーである場合にのみ必要です。データベースのロールには、変更ストリームに対する SELECT 権限と、変更ストリームの読み取り機能に対する EXECUTE 権限が必要です。詳細については、変更ストリームのきめ細かなアクセス制御(https://cloud.google.com/spanner/docs/fgac-change-streams)をご覧ください。
  • spannerMetadataTableName: Spanner の変更ストリーム コネクタで使用するメタデータ テーブル名。指定しない場合、パイプライン フロー中に Spanner 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。それ以外の場合は、このパラメータを指定しないでください。
  • rpcPriority: Spanner 呼び出しのリクエストの優先度。値は HIGHMEDIUMLOW のいずれかにする必要があります。デフォルト値は HIGH です。
  • spannerHost: テンプレート内で呼び出す Cloud Spanner のエンドポイント。テストでのみ使われます(例: https://batch-spanner.googleapis.com)。
  • startTimestamp: 変更ストリームの読み取りに使用される開始日時(https://datatracker.ietf.org/doc/html/rfc3339)。この値も含みます。例: 2021-10-12T07:20:50.52Zデフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。
  • endTimestamp: 変更ストリームの読み取りに使用される終了日時(https://datatracker.ietf.org/doc/html/rfc3339)。この値を含みます。例: 2021-10-12T07:20:50.52Z。デフォルトは、現在よりも先の無限の時間です。
  • bigQueryProjectId: BigQuery プロジェクト。デフォルト値は Dataflow ジョブのプロジェクトです。
  • bigQueryChangelogTableNameTemplate: 変更履歴を含む BigQuery テーブルの名前のテンプレート。デフォルトは {_metadata_spanner_table_name}_changelog です。
  • deadLetterQueueDirectory: 未処理のレコードを保存するパス。デフォルトのパスは、Dataflow ジョブの一時保存場所の下のディレクトリです。通常はデフォルト値で十分です。
  • dlqRetryMinutes: デッドレター キューの再試行間隔(分)。デフォルト値は 10 です。
  • ignoreFields: 無視するフィールドのカンマ区切のリスト(大文字と小文字は区別されます)。これらのフィールドは、監視対象テーブルのフィールド、またはパイプラインによって追加されたメタデータ フィールドです。無視されるフィールドは BigQuery に挿入されません。_metadata_spanner_table_name フィールドを無視すると、bigQueryChangelogTableNameTemplate パラメータも無視されます。デフォルトは空です。
  • disableDlqRetries: DLQ の再試行を無効にするかどうか。デフォルトは false です。
  • useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
  • numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Spanner change streams to BigQuery template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • BIGQUERY_DATASET: 変更ストリーム出力の BigQuery データセット

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • BIGQUERY_DATASET: 変更ストリーム出力の BigQuery データセット
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.Timestamp;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToBigQueryOptions;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.Mod;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.model.ModColumnType;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.BigQueryUtils;
import com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.schemautils.OptionsUtils;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO(haikuo-google): Add integration test.
// TODO(haikuo-google): Add README.
// TODO(haikuo-google): Add stackdriver metrics.
// TODO(haikuo-google): Ideally side input should be used to store schema information and shared
// accross DoFns, but since side input fix is not yet deployed at the moment, we read schema
// information in the beginning of the DoFn as a work around. We should use side input instead when
// it's available.
// TODO(haikuo-google): Test the case where tables or columns are added while the pipeline is
// running.
/**
 * This pipeline ingests {@link DataChangeRecord} from Spanner change stream. The {@link
 * DataChangeRecord} is then broken into {@link Mod}, which converted into {@link TableRow} and
 * inserted into BigQuery table.
 */
@Template(
    name = "Spanner_Change_Streams_to_BigQuery",
    category = TemplateCategory.STREAMING,
    displayName = "Cloud Spanner change streams to BigQuery",
    description = {
      "The Cloud Spanner change streams to BigQuery template is a streaming pipeline that streams"
          + " Cloud Spanner data change records and writes them into BigQuery tables using Dataflow"
          + " Runner V2.\n",
      "All change stream watched columns are included in each BigQuery table row, regardless of"
          + " whether they are modified by a Cloud Spanner transaction. Columns not watched are not"
          + " included in the BigQuery row. Any Cloud Spanner change less than the Dataflow"
          + " watermark are either successfully applied to the BigQuery tables or are stored in the"
          + " dead-letter queue for retry. BigQuery rows are inserted out of order compared to the"
          + " original Cloud Spanner commit timestamp ordering.\n",
      "If the necessary BigQuery tables don't exist, the pipeline creates them. Otherwise, existing"
          + " BigQuery tables are used. The schema of existing BigQuery tables must contain the"
          + " corresponding tracked columns of the Cloud Spanner tables and any additional metadata"
          + " columns that are not ignored explicitly by the ignoreFields option. See the"
          + " description of the metadata fields in the following list. Each new BigQuery row"
          + " includes all columns watched by the change stream from its corresponding row in your"
          + " Cloud Spanner table at the change record's timestamp.\n",
      "The following metadata fields are added to BigQuery tables. For more details about these"
          + " fields, see Data change records in \"Change streams partitions, records, and"
          + " queries.\"\n"
          + "- _metadata_spanner_mod_type: The modification type (insert, update, or delete) of the"
          + " Cloud Spanner transaction. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_table_name: The Cloud Spanner table name. Note this field is not"
          + " the metadata table name of the connector.\n"
          + "- _metadata_spanner_commit_timestamp: The Spanner commit timestamp, which is the time"
          + " when a change is committed. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_server_transaction_id: A globally unique string that represents"
          + " the Spanner transaction in which the change was committed. Only use this value in the"
          + " context of processing change stream records. It isn't correlated with the transaction"
          + " ID in Spanner's API. Extracted from change stream data change record.\n"
          + "- _metadata_spanner_record_sequence: The sequence number for the record within the"
          + " Spanner transaction. Sequence numbers are guaranteed to be unique and monotonically"
          + " increasing (but not necessarily contiguous) within a transaction. Extracted from"
          + " change stream data change record.\n"
          + "- _metadata_spanner_is_last_record_in_transaction_in_partition: Indicates whether the"
          + " record is the last record for a Spanner transaction in the current partition."
          + " Extracted from change stream data change record.\n"
          + "- _metadata_spanner_number_of_records_in_transaction: The number of data change"
          + " records that are part of the Spanner transaction across all change stream partitions."
          + " Extracted from change stream data change record.\n"
          + "- _metadata_spanner_number_of_partitions_in_transaction: The number of partitions that"
          + " return data change records for the Spanner transaction. Extracted from change stream"
          + " data change record.\n"
          + "- _metadata_big_query_commit_timestamp: The commit timestamp of when the row is"
          + " inserted into BigQuery.\n",
      "Notes:\n"
          + "- This template does not propagate schema changes from Cloud Spanner to BigQuery."
          + " Because performing a schema change in Cloud Spanner is likely going to break the"
          + " pipeline, you might need to recreate the pipeline after the schema change.\n"
          + "- For OLD_AND_NEW_VALUES and NEW_VALUES value capture types, when the data change"
          + " record contains an UPDATE change, the template needs to do a stale read to Cloud"
          + " Spanner at the commit timestamp of the data change record to retrieve the unchanged"
          + " but watched columns. Make sure to configure your database 'version_retention_period'"
          + " properly for the stale read. For the NEW_ROW value capture type, the template is more"
          + " efficient, because the data change record captures the full new row including columns"
          + " that are not updated in UPDATEs, and the template does not need to do a stale read.\n"
          + "- You can minimize network latency and network transport costs by running the Dataflow"
          + " job from the same region as your Cloud Spanner instance or BigQuery tables. If you"
          + " use sources, sinks, staging file locations, or temporary file locations that are"
          + " located outside of your job's region, your data might be sent across regions. See"
          + " more about Dataflow regional endpoints.\n"
          + "- This template supports all valid Cloud Spanner data types, but if the BigQuery type"
          + " is more precise than the Cloud Spanner type, precision loss might occur during the"
          + " transformation. Specifically:\n"
          + "  - For Cloud Spanner JSON type, the order of the members of an object is"
          + " lexicographically ordered, but there is no such guarantee for BigQuery JSON type.\n"
          + "  - Cloud Spanner supports nanoseconds TIMESTAMP type, BigQuery only supports"
          + " microseconds TIMESTAMP type.\n",
      "Learn more about <a href=\"https://cloud.google.com/spanner/docs/change-streams\">change"
          + " streams</a>, <a"
          + " href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow\">how to"
          + " build change streams Dataflow pipelines</a>, and <a"
          + " href=\"https://cloud.google.com/spanner/docs/change-streams/use-dataflow#best_practices\">best"
          + " practices</a>."
    },
    optionsClass = SpannerChangeStreamsToBigQueryOptions.class,
    flexContainerName = "spanner-changestreams-to-bigquery",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-bigquery",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Cloud Spanner instance must exist prior to running the pipeline.",
      "The Cloud Spanner database must exist prior to running the pipeline.",
      "The Cloud Spanner metadata instance must exist prior to running the pipeline.",
      "The Cloud Spanner metadata database must exist prior to running the pipeline.",
      "The Cloud Spanner change stream must exist prior to running the pipeline.",
      "The BigQuery dataset must exist prior to running the pipeline."
    },
    streaming = true,
    supportsExactlyOnce = true,
    supportsAtLeastOnce = true)
public final class SpannerChangeStreamsToBigQuery {

  /** String/String Coder for {@link FailsafeElement}. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  private static final Logger LOG = LoggerFactory.getLogger(SpannerChangeStreamsToBigQuery.class);

  // Max number of deadletter queue retries.
  private static final int DLQ_MAX_RETRIES = 5;

  private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2";

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting to replicate change records from Spanner change streams to BigQuery");

    SpannerChangeStreamsToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(SpannerChangeStreamsToBigQueryOptions.class);

    run(options);
  }

  private static void validateOptions(SpannerChangeStreamsToBigQueryOptions options) {
    if (options.getDlqRetryMinutes() <= 0) {
      throw new IllegalArgumentException("dlqRetryMinutes must be positive.");
    }
    if (options
        .getBigQueryChangelogTableNameTemplate()
        .equals(BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME)) {
      throw new IllegalArgumentException(
          String.format(
              "bigQueryChangelogTableNameTemplate cannot be set to '{%s}'. This value is reserved"
                  + " for the Cloud Spanner table name.",
              BigQueryUtils.BQ_CHANGELOG_FIELD_NAME_TABLE_NAME));
    }

    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
  }

  private static void setOptions(SpannerChangeStreamsToBigQueryOptions options) {
    options.setStreaming(true);
    options.setEnableStreamingEngine(true);

    // Add use_runner_v2 to the experiments option, since change streams connector is only supported
    // on Dataflow runner v2.
    List<String> experiments = options.getExperiments();
    if (experiments == null) {
      experiments = new ArrayList<>();
    }
    if (!experiments.contains(USE_RUNNER_V2_EXPERIMENT)) {
      experiments.add(USE_RUNNER_V2_EXPERIMENT);
    }
    options.setExperiments(experiments);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(SpannerChangeStreamsToBigQueryOptions options) {
    setOptions(options);
    validateOptions(options);

    /**
     * Stages: 1) Read {@link DataChangeRecord} from change stream. 2) Create {@link
     * FailsafeElement} of {@link Mod} JSON and merge from: - {@link DataChangeRecord}. - GCS Dead
     * letter queue. 3) Convert {@link Mod} JSON into {@link TableRow} by reading from Spanner at
     * commit timestamp. 4) Append {@link TableRow} to BigQuery. 5) Write Failures from 2), 3) and
     * 4) to GCS dead letter queue.
     */
    Pipeline pipeline = Pipeline.create(options);
    DeadLetterQueueManager dlqManager = buildDlqManager(options);
    String spannerProjectId = OptionsUtils.getSpannerProjectId(options);

    String dlqDirectory = dlqManager.getRetryDlqDirectoryWithDateTime();
    String tempDlqDirectory = dlqManager.getRetryDlqDirectory() + "tmp/";

    /**
     * There are two types of errors that can occur in this pipeline:
     *
     * <p>1) Error originating from modJsonStringToTableRow. Errors here are either due to pk values
     * missing, a spanner table / column missing in the in-memory map, or some Spanner read error
     * happening in readSpannerRow. We already retry the Spanner read error inline 3 times. Th other
     * types of errors are more likely to be un-retriable.
     *
     * <p>2) Error originating from BigQueryIO.write. BigQuery storage write API already retries all
     * transient errors and outputs more permanent errors.
     *
     * <p>As a result, it is reasonable to write all errors happening in the pipeline directly into
     * the permanent DLQ, since most of the errors are likely to be non-transient.
     */
    if (options.getDisableDlqRetries()) {
      LOG.info(
          "Disabling retries for the DLQ, directly writing into severe DLQ: {}",
          dlqManager.getSevereDlqDirectoryWithDateTime());
      dlqDirectory = dlqManager.getSevereDlqDirectoryWithDateTime();
      tempDlqDirectory = dlqManager.getSevereDlqDirectory() + "tmp/";
    }

    // Retrieve and parse the startTimestamp and endTimestamp.
    Timestamp startTimestamp =
        options.getStartTimestamp().isEmpty()
            ? Timestamp.now()
            : Timestamp.parseTimestamp(options.getStartTimestamp());
    Timestamp endTimestamp =
        options.getEndTimestamp().isEmpty()
            ? Timestamp.MAX_VALUE
            : Timestamp.parseTimestamp(options.getEndTimestamp());

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
            .withProjectId(spannerProjectId)
            .withInstanceId(options.getSpannerInstanceId())
            .withDatabaseId(options.getSpannerDatabase())
            .withRpcPriority(options.getRpcPriority());
    // Propagate database role for fine-grained access control on change stream.
    if (options.getSpannerDatabaseRole() != null) {
      spannerConfig =
          spannerConfig.withDatabaseRole(
              ValueProvider.StaticValueProvider.of(options.getSpannerDatabaseRole()));
    }

    SpannerIO.ReadChangeStream readChangeStream =
        SpannerIO.readChangeStream()
            .withSpannerConfig(spannerConfig)
            .withMetadataInstance(options.getSpannerMetadataInstanceId())
            .withMetadataDatabase(options.getSpannerMetadataDatabase())
            .withChangeStreamName(options.getSpannerChangeStreamName())
            .withInclusiveStartAt(startTimestamp)
            .withInclusiveEndAt(endTimestamp)
            .withRpcPriority(options.getRpcPriority());

    String spannerMetadataTableName = options.getSpannerMetadataTableName();
    if (spannerMetadataTableName != null) {
      readChangeStream = readChangeStream.withMetadataTable(spannerMetadataTableName);
    }

    PCollection<DataChangeRecord> dataChangeRecord =
        pipeline
            .apply("Read from Spanner Change Streams", readChangeStream)
            .apply("Reshuffle DataChangeRecord", Reshuffle.viaRandomKey());

    PCollection<FailsafeElement<String, String>> sourceFailsafeModJson =
        dataChangeRecord
            .apply("DataChangeRecord To Mod JSON", ParDo.of(new DataChangeRecordToModJsonFn()))
            .apply(
                "Wrap Mod JSON In FailsafeElement",
                ParDo.of(
                    new DoFn<String, FailsafeElement<String, String>>() {
                      @ProcessElement
                      public void process(
                          @Element String input,
                          OutputReceiver<FailsafeElement<String, String>> receiver) {
                        receiver.output(FailsafeElement.of(input, input));
                      }
                    }))
            .setCoder(FAILSAFE_ELEMENT_CODER);

    PCollectionTuple dlqModJson =
        dlqManager.getReconsumerDataTransform(
            pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
    PCollection<FailsafeElement<String, String>> retryableDlqFailsafeModJson =
        dlqModJson.get(DeadLetterQueueManager.RETRYABLE_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);

    PCollection<FailsafeElement<String, String>> failsafeModJson =
        PCollectionList.of(sourceFailsafeModJson)
            .and(retryableDlqFailsafeModJson)
            .apply("Merge Source And DLQ Mod JSON", Flatten.pCollections());

    ImmutableSet.Builder<String> ignoreFieldsBuilder = ImmutableSet.builder();
    for (String ignoreField : options.getIgnoreFields().split(",")) {
      ignoreFieldsBuilder.add(ignoreField);
    }
    ImmutableSet<String> ignoreFields = ignoreFieldsBuilder.build();
    FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions
        failsafeModJsonToTableRowOptions =
            FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRowOptions.builder()
                .setSpannerConfig(spannerConfig)
                .setSpannerChangeStream(options.getSpannerChangeStreamName())
                .setIgnoreFields(ignoreFields)
                .setCoder(FAILSAFE_ELEMENT_CODER)
                .setUseStorageWriteApi(options.getUseStorageWriteApi())
                .build();
    FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRow failsafeModJsonToTableRow =
        new FailsafeModJsonToTableRowTransformer.FailsafeModJsonToTableRow(
            failsafeModJsonToTableRowOptions);

    PCollectionTuple tableRowTuple =
        failsafeModJson.apply("Mod JSON To TableRow", failsafeModJsonToTableRow);
    // If users pass in the full BigQuery dataset ID (projectId.datasetName), extract the dataset
    // name for the setBigQueryDataset parameter.
    List<String> results = OptionsUtils.processBigQueryProjectAndDataset(options);
    String bigqueryProject = results.get(0);
    String bigqueryDataset = results.get(1);

    BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions
        bigQueryDynamicDestinationsOptions =
            BigQueryDynamicDestinations.BigQueryDynamicDestinationsOptions.builder()
                .setSpannerConfig(spannerConfig)
                .setChangeStreamName(options.getSpannerChangeStreamName())
                .setIgnoreFields(ignoreFields)
                .setBigQueryProject(bigqueryProject)
                .setBigQueryDataset(bigqueryDataset)
                .setBigQueryTableTemplate(options.getBigQueryChangelogTableNameTemplate())
                .setUseStorageWriteApi(options.getUseStorageWriteApi())
                .build();
    WriteResult writeResult;
    if (!options.getUseStorageWriteApi()) {
      writeResult =
          tableRowTuple
              .get(failsafeModJsonToTableRow.transformOut)
              .apply(
                  "Write To BigQuery",
                  BigQueryIO.<TableRow>write()
                      .to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
                      .withFormatFunction(element -> removeIntermediateMetadataFields(element))
                      .withFormatRecordOnFailureFunction(element -> element)
                      .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                      .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
                      .withExtendedErrorInfo()
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
    } else {
      writeResult =
          tableRowTuple
              .get(failsafeModJsonToTableRow.transformOut)
              .apply(
                  "Write To BigQuery",
                  BigQueryIO.<TableRow>write()
                      .to(BigQueryDynamicDestinations.of(bigQueryDynamicDestinationsOptions))
                      .withFormatFunction(element -> removeIntermediateMetadataFields(element))
                      .withFormatRecordOnFailureFunction(element -> element)
                      .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                      .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
                      .ignoreUnknownValues()
                      .withAutoSchemaUpdate(true) // only supported when using STORAGE_WRITE_API or
                      // STORAGE_API_AT_LEAST_ONCE.
                      .withExtendedErrorInfo()
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
    }

    PCollection<String> transformDlqJson =
        tableRowTuple
            .get(failsafeModJsonToTableRow.transformDeadLetterOut)
            .apply(
                "Failed Mod JSON During Table Row Transformation",
                MapElements.via(new StringDeadLetterQueueSanitizer()));

    PCollection<String> bqWriteDlqJson =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "Failed Mod JSON During BigQuery Writes",
                MapElements.via(new BigQueryDeadLetterQueueSanitizer()));

    PCollectionList.of(transformDlqJson)
        // Generally BigQueryIO storage write retries transient errors, and only more
        // persistent errors make it into DLQ.
        .and(bqWriteDlqJson)
        .apply("Merge Failed Mod JSON From Transform And BigQuery", Flatten.pCollections())
        .apply(
            "Write Failed Mod JSON To DLQ",
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqDirectory)
                .withTmpDirectory(tempDlqDirectory)
                .setIncludePaneInfo(true)
                .build());

    PCollection<FailsafeElement<String, String>> nonRetryableDlqModJsonFailsafe =
        dlqModJson.get(DeadLetterQueueManager.PERMANENT_ERRORS).setCoder(FAILSAFE_ELEMENT_CODER);

    nonRetryableDlqModJsonFailsafe
        .apply(
            "Write Mod JSON With Non-retryable Error To DLQ",
            MapElements.via(new StringDeadLetterQueueSanitizer()))
        .setCoder(StringUtf8Coder.of())
        .apply(
            DLQWriteTransform.WriteDLQ.newBuilder()
                .withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
                .withTmpDirectory(dlqManager.getSevereDlqDirectory() + "tmp/")
                .setIncludePaneInfo(true)
                .build());

    return pipeline.run();
  }

  private static DeadLetterQueueManager buildDlqManager(
      SpannerChangeStreamsToBigQueryOptions options) {
    String tempLocation =
        options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
            ? options.as(DataflowPipelineOptions.class).getTempLocation()
            : options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
    String dlqDirectory =
        options.getDeadLetterQueueDirectory().isEmpty()
            ? tempLocation + "dlq/"
            : options.getDeadLetterQueueDirectory();

    LOG.info("Dead letter queue directory: {}", dlqDirectory);
    return DeadLetterQueueManager.create(dlqDirectory, DLQ_MAX_RETRIES);
  }

  /**
   * Remove the following intermediate metadata fields that are not user data from {@link TableRow}:
   * _metadata_error, _metadata_retry_count, _metadata_spanner_original_payload_json.
   */
  private static TableRow removeIntermediateMetadataFields(TableRow tableRow) {
    TableRow cleanTableRow = tableRow.clone();
    Set<String> rowKeys = tableRow.keySet();
    Set<String> metadataFields = BigQueryUtils.getBigQueryIntermediateMetadataFieldNames();

    for (String rowKey : rowKeys) {
      if (metadataFields.contains(rowKey)) {
        cleanTableRow.remove(rowKey);
      } else if (rowKeys.contains("_type_" + rowKey)) {
        cleanTableRow.remove("_type_" + rowKey);
      }
    }

    return cleanTableRow;
  }

  /**
   * DoFn that converts a {@link DataChangeRecord} to multiple {@link Mod} in serialized JSON
   * format.
   */
  static class DataChangeRecordToModJsonFn extends DoFn<DataChangeRecord, String> {

    @ProcessElement
    public void process(@Element DataChangeRecord input, OutputReceiver<String> receiver) {
      for (org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod changeStreamsMod :
          input.getMods()) {
        Mod mod =
            new Mod(
                changeStreamsMod.getKeysJson(),
                changeStreamsMod.getNewValuesJson(),
                input.getCommitTimestamp(),
                input.getServerTransactionId(),
                input.isLastRecordInTransactionInPartition(),
                input.getRecordSequence(),
                input.getTableName(),
                input.getRowType().stream().map(ModColumnType::new).collect(Collectors.toList()),
                input.getModType(),
                input.getValueCaptureType(),
                input.getNumberOfRecordsInTransaction(),
                input.getNumberOfPartitionsInTransaction());

        String modJsonString;

        try {
          modJsonString = mod.toJson();
        } catch (IOException e) {
          // Ignore exception and print bad format.
          modJsonString = String.format("\"%s\"", input);
        }
        receiver.output(modJsonString);
      }
    }
  }
}

次のステップ