Spanner Change Streams to Source Database テンプレート

ストリーミング パイプライン。Spanner 変更ストリームからデータを読み取り、ソースに書き込みます。

テンプレートのパラメータ

パラメータ 説明
changeStreamName パイプラインが読み取る Spanner 変更ストリームの名前。
instanceId 変更ストリームが存在する Spanner インスタンスの名前。
databaseId 変更ストリームがモニタリングする Spanner データベースの名前。
spannerProjectId Spanner プロジェクトの名前。
metadataInstance コネクタが変更ストリーム API データの使用量を制御するために使用するメタデータを保存するインスタンス。
metadataDatabase コネクタが変更ストリーム API データの使用量を制御するために使用するメタデータを保存するデータベース。
sourceShardsFilePath ソース シャードの接続プロファイル情報を含む Cloud Storage ファイルのパス。
startTimestamp 省略可: 変更の読み取りの開始タイムスタンプ。デフォルトは空です。
endTimestamp 省略可: 変更の読み取りの終了タイムスタンプ。タイムスタンプが指定されていない場合、無期限に読み取られます。デフォルトは空です。
shadowTablePrefix 省略可。シャドウ テーブルの名前に使用される接頭辞。デフォルト: shadow_
sessionFilePath 省略可: HarbourBridge のマッピング情報が含まれる Cloud Storage 内のセッションパス。
filtrationMode 省略可: フィルタリング モード。条件に基づいて特定のレコードを削除する方法を指定する。サポートされているモードは、none(何もフィルタしない)、forward_migration(転送移行パイプラインを使用して書き込まれたレコードをフィルタする)です。デフォルトは forward_migration です。
shardingCustomJarPath 省略可: シャード ID を取得するためのカスタマイズ ロジックを含む、Cloud Storage 内のカスタム JAR ファイルの場所。このパラメータを設定する場合は、shardingCustomJarPath パラメータを設定します。デフォルトは空です。
shardingCustomClassName 省略可: カスタム シャード ID の実装が含まれる完全修飾クラス名。shardingCustomJarPath を指定する場合、このパラメータは必須です。デフォルトは空です。
shardingCustomParameters 省略可: カスタム シャーディング クラスに渡すカスタム パラメータが含まれる文字列。デフォルトは空です。
sourceDbTimezoneOffset 省略可: ソース データベースの UTC からのタイムゾーン オフセット。値の例: +10:00。デフォルト: +00:00
dlqGcsPubSubSubscription 省略可: 通常モードでの実行時にデッドレター キュー再試行ディレクトリ用の Cloud Storage 通知ポリシーで使用されている Pub/Sub サブスクリプション。名前は、projects/<project-id>/subscriptions/<subscription-name> の形式にする必要があります。設定すると、deadLetterQueueDirectory と dlqRetryMinutes は無視されます。
skipDirectoryName 省略可: リバース レプリケーションからスキップされたレコードがこのディレクトリに書き込まれます。デフォルトのディレクトリ名は skip です。
maxShardConnections 省略可: 特定のシャードが受け入れることができる接続の最大数。デフォルト: 10000
deadLetterQueueDirectory 省略可: エラーキューの出力の保存に使用されるパス。デフォルトのパスは、Dataflow ジョブの一時保存場所の下のディレクトリです。
dlqMaxRetryCount 省略可: デッドレター キューで一時的なエラーが発生した場合に再試行できる最大回数。デフォルトは 500 です。
runMode 省略可: 実行モードのタイプ。サポートされる値: regularretryDLQ。デフォルト: regularretryDLQ を指定すると、重大なデッドレター キュー レコードのみが再試行されます。
dlqRetryMinutes 省略可: デッドレター キューの再試行間隔の分数。デフォルトは 10 です。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Spanner Change Streams to Source Database template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud CLI

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • CHANGE_STREAM_NAME: 読み取る変更ストリームの名前
  • INSTANCE_ID: Cloud Spanner インスタンス ID。
  • DATABASE_ID: Cloud Spanner データベース ID。
  • SPANNER_PROJECT_ID: Cloud Spanner プロジェクト ID。
  • METADATA_INSTANCE: changestream から読み取るときにメタデータを保存する Cloud Spanner インスタンス
  • METADATA_DATABASE: 変更ストリームから読み取るときにメタデータを保存する Cloud Spanner データベース
  • SOURCE_SHARDS_FILE_PATH: ソースシャードの詳細を含む GCS ファイルのパス

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • CHANGE_STREAM_NAME: 読み取る変更ストリームの名前
  • INSTANCE_ID: Cloud Spanner インスタンス ID。
  • DATABASE_ID: Cloud Spanner データベース ID。
  • SPANNER_PROJECT_ID: Cloud Spanner プロジェクト ID。
  • METADATA_INSTANCE: changestream から読み取るときにメタデータを保存する Cloud Spanner インスタンス
  • METADATA_DATABASE: 変更ストリームから読み取るときにメタデータを保存する Cloud Spanner データベース
  • SOURCE_SHARDS_FILE_PATH: ソースシャードの詳細を含む GCS ファイルのパス