Datastream to Spanner テンプレート

Datastream to Spanner テンプレートは、Cloud Storage バケットから Datastream イベントを読み取り、Spanner データベースに書き込むストリーミング パイプラインです。これは、Datastream ソースから Spanner へのデータ移行を目的としています。

テンプレートの実行前に、移行に必要なすべてのテーブルが移行先の Spanner データベースに存在している必要があります。したがって、データ移行に先立ち、ソース データベースから移行先 Spanner へのスキーマの移行が完了している必要があります。移行前にテーブルにデータが存在する可能性があります。このテンプレートでは、Datastream スキーマの変更が Spanner データベースに伝播されません。

データの整合性が保証されるのは、すべてのデータが Spanner に書き込まれ、移行が終了したときだけです。Spanner に書き込まれる各レコードの順序指定の情報を保存するために、このテンプレートは Spanner データベース内の各テーブルに対して追加のテーブル(シャドウ テーブルと呼ばれる)を作成します。これは、移行終了時の整合性を確保するために使用されます。シャドウ テーブルは移行後に削除されないため、移行終了時の検証に使用できます。

オペレーション中に発生したエラー(スキーマの不一致、不正な形式の JSON ファイル、変換の実行によるエラーなど)は、エラーキューに記録されます。エラーキューは、エラーが発生したすべての Datastream イベントと、エラーの理由をテキスト形式で保存する Cloud Storage フォルダです。エラーは一時的な場合も永続的な場合もあり、エラーキューの適切な Cloud Storage フォルダに保存されます。一時的なエラーは自動的に再試行されますが、永続的なエラーは再試行されません。永続的なエラーが発生した場合は、テンプレートの実行中に変更イベントを修正し、再試行可能なバケットに移動することもできます。

パイプラインの要件

  • ステータスが [実行中] または [開始されていません] の Datastream ストリーム。
  • Datastream イベントが複製される Cloud Storage バケット。
  • 既存のテーブルを含む Spanner データベース。テーブルは空でも、データを含んでいてもかまいません。

テンプレートのパラメータ

必須パラメータ

  • instanceId: 変更が複製される Spanner インスタンス。
  • databaseId: 変更が複製される Spanner データベース。

オプション パラメータ

  • inputFilePattern: 複製する Datastream ファイルが含まれる Cloud Storage ファイルの場所。通常、これはストリームのルートパスです。この機能のサポートは無効になりました。
  • inputFileFormat: Datastream によって生成された出力ファイルの形式。例: avro,json。デフォルトは avro です。
  • sessionFilePath: HarbourBridge のマッピング情報が含まれる Cloud Storage 内のセッション ファイルのパス。
  • projectId: Spanner プロジェクト ID。
  • spannerHost: テンプレート内で呼び出す Cloud Spanner のエンドポイント(例: https://batch-spanner.googleapis.com)。デフォルトは https://batch-spanner.googleapis.com です。
  • gcsPubSubSubscription: Cloud Storage 通知ポリシーで使用されている Pub/Sub サブスクリプション。名前は projects/ の形式にします。
  • streamName: スキーマ情報とソースタイプについてポーリングするストリームの名前またはテンプレート。
  • shadowTablePrefix: シャドウ テーブルの名前に使用される接頭辞。デフォルトは shadow_ です。
  • shouldCreateShadowTables: Cloud Spanner データベースでシャドウ テーブルを作成する必要があるかどうかを示すフラグ。デフォルトは true です。
  • rfcStartDateTime: Cloud Storage(https://tools.ietf.org/html/rfc3339)からのフェッチに使用される開始日時。デフォルトは 1970-01-01T00:00:00.00Z です。
  • fileReadConcurrency: 同時に読み取る DataStream ファイルの数。デフォルトは 30 です。
  • deadLetterQueueDirectory: エラーキューの出力の保存に使用されるファイルパス。デフォルトのファイルパスは、Dataflow ジョブの一時保存場所の下のディレクトリです。
  • dlqRetryMinutes: デッドレター キューの再試行間隔(分)。デフォルトは 10 です。
  • dlqMaxRetryCount: デッドレター キューで一時的なエラーが発生した場合に再試行できる最大回数。デフォルトは 500 です。
  • dataStreamRootUrl: Datastream API のルート URL。デフォルトは https://datastream.googleapis.com/ です。
  • datastreamSourceType: Datastream が接続するソース データベースのタイプ。例: mysql/oracle。実行中の Datastream なしでテストする場合に設定する必要があります。
  • roundJsonDecimals: JSON 列内の小数値を、精度の低下なしで格納できる数値に丸めるかどうかを指定するフラグ。デフォルトは false です。
  • runMode: 実行モードのタイプ。値は regular(通常)または retryDLQ(デッドレター キューを使用して再試行)です。デフォルトは regular です。
  • transformationContextFilePath: 移行中に実行される変換でデータの入力に使用される、Cloud Storage 内の変換コンテキスト ファイル パス。例: 行の移行元のデータベースを識別するデータベース名のシャード ID。
  • directoryWatchDurationInMinutes: パイプラインが GCS 内のディレクトリをポーリングし続ける期間。Datastream の出力ファイルは、イベントのタイムスタンプを分単位でグループ化したディレクトリ構造で配置されます。このパラメータは、イベントがソース データベースで発生してから Datastream によって GCS に書き込まれるまでの最大遅延時間(99.9 パーセンタイル = 10 分)とほぼ等しい必要があります。デフォルトは 10 です。
  • spannerPriority: Cloud Spanner 呼び出しのリクエストの優先度。値は HIGH、MEDIUM、LOW のいずれかである必要があります。デフォルトは HIGH です。
  • dlqGcsPubSubSubscription: 通常モードでの実行時にデッドレター キュー再試行ディレクトリ用の Cloud Storage 通知ポリシーで使用されている Pub/Sub サブスクリプション。名前は projects/ の形式にします。
  • transformationJarPath: 前方移行でレコードを処理するカスタム変換ロジックが含まれる、Cloud Storage 内のカスタム jar の場所。デフォルトは空です。
  • transformationClassName: カスタム変換ロジックが含まれる完全修飾クラス名。transformationJarPath が指定されている場合は必須フィールドです。デフォルトは空です。
  • transformationCustomParameters: カスタム変換クラスに渡すカスタム パラメータが含まれる文字列。デフォルトは空です。
  • filteredEventsDirectory: カスタム変換によってフィルタされたイベントを保存するファイルパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合は、デフォルト値のまま使用できます。
  • shardingContextFilePath: Cloud Storage 内のシャーディング コンテキスト ファイルパスは、ソースシャードごとに Spanner データベースにシャード ID を入力するために使用されます。形式は Map<stream_name, Map<db_name, shard_id>> です。
  • tableOverrides: ソースから Spanner へのテーブル名のオーバーライド。次の形式で記述されます。[{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}] この例では、Singers テーブルを Vocalists に、Albums テーブルを Records にマッピングしています(例: [{Singers, Vocalists}, {Albums, Records}])。デフォルトは空です。
  • columnOverrides: ソースから Spanner への列名のオーバーライドです。次のような形式で記述されます。[{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}] SourceTableName は、ソースペアと Spanner ペアの両方で同じである必要があります。テーブル名をオーバーライドするには、tableOverrides を使用します。この例では Singers テーブルと Albums テーブルそれぞれで、SingerName を TalentName に、AlbumName を RecordName にマッピングしています(例: [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}])。デフォルトは空です。
  • schemaOverridesFilePath: ソースから Spanner へのテーブルと列名のオーバーライドを指定するファイル。デフォルトは空です。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Datastream to Spanner template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • GCS_FILE_PATH: Datastream イベントの保存に使用される Cloud Storage パス。例: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: Spanner インスタンス。
  • CLOUDSPANNER_DATABASE: Spanner データベース。
  • DLQ: エラーキュー ディレクトリの Cloud Storage パス。

次のステップ