Spanner change streams to Cloud Storage テンプレート

Spanner change streams to Cloud Storage テンプレートは、Spanner データ変更レコードをストリーミングし、Dataflow Runner v2 を使用して Cloud Storage バケットに書き込むストリーミング パイプラインです。

パイプラインは、タイムスタンプに基づいて Spanner の変更ストリーム レコードをウィンドウにグループ化します。各ウィンドウは、このテンプレートで構成できる期間を表します。タイムスタンプがウィンドウに属するすべてのレコードが、ウィンドウ内に存在することが保証されます。遅延は発生しません。複数の出力シャードを定義することもできます。パイプラインはシャードごと、ウィンドウごとに 1 つの Cloud Storage 出力ファイルを作成します。出力ファイル内では、レコードは順序付けされていません。出力ファイルは、ユーザーの構成に応じて JSON 形式または AVRO 形式で記述できます。

Spanner インスタンスまたは Cloud Storage バケットと同じリージョンから Dataflow ジョブを実行することで、ネットワークのレイテンシやネットワーク転送の費用を最小限に抑えることができます。使用するソース、シンク、ステージング ファイルのロケーションや、一時ファイルのロケーションがジョブのリージョン外である場合、データがリージョンを越えて送信される可能性があります。Dataflow リージョンの詳細をご覧ください。

変更ストリームの詳細については、変更ストリーム Dataflow パイプラインの構築方法ベスト プラクティスをご覧ください。

パイプラインの要件

  • パイプラインの実行前に Spanner インスタンスが存在している。
  • パイプラインの実行前に Spanner データベースが存在している。
  • パイプラインの実行前に Spanner メタデータ インスタンスが存在している。
  • パイプラインの実行前に Spanner メタデータ データベースが存在している。
  • パイプラインの実行前に Spanner の変更ストリームが存在している。
  • パイプラインの実行前に Cloud Storage 出力バケットが存在している。

テンプレートのパラメータ

パラメータ 説明
spannerInstanceId 変更ストリーム データの読み取り元の Spanner インスタンス ID。
spannerDatabase 変更ストリーム データの読み取り元の Spanner データベース。
spannerDatabaseRole (省略可)テンプレートの実行時に使用される Spanner データベース ロール。このパラメータは、テンプレートを実行している IAM プリンシパルが、きめ細かいアクセス制御のユーザーである場合にのみ必要です。データベースのロールには、変更ストリームに対する SELECT 権限と、変更ストリームの読み取り機能に対する EXECUTE 権限が必要です。詳細については、変更ストリームのきめ細かなアクセス制御をご覧ください。
spannerMetadataInstanceId 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner インスタンス ID。
spannerMetadataDatabase 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner データベース。
spannerChangeStreamName 読み取り元の Spanner 変更ストリームの名前。
gcsOutputDirectory 変更ストリームのファイルの場所は、Cloud Storage に「gs://${BUCKET}/${ROOT_PATH}/」の形式で出力されます。
outputFilenamePrefix (省略可)書き込み先ファイルのファイル名の接頭辞。ファイルの接頭辞はデフォルトで「output」に設定されています。
spannerProjectId (省略可)変更ストリームの読み取り元のプロジェクト。これは、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
startTimestamp (省略可)変更ストリームの読み取りに使用する開始の DateTime(両端を含む)。例: 2021-10-12T07:20:50.52Zデフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。
endTimestamp (省略可)変更ストリームの読み取りに使用する終了の DateTime(両端を含む)。例: 2021-10-12T07:20:50.52Zデフォルトは、現在よりも先の無限の時間です。
outputFileFormat (省略可)出力 Cloud Storage ファイルの形式。使用可能な形式は TEXT、AVRO です。デフォルトは AVRO です。
windowDuration (省略可)ウィンドウ期間は、出力ディレクトリにデータが書き込まれる間隔です。パイプラインのスループットに基づいて期間を構成します。たとえば、スループットを向上させるには、データがメモリに収まるようにウィンドウ サイズを小さくする必要があります。デフォルトは 5 分、最小は 1 秒です。使用できる形式は、[int]s(秒に使用、例: 5s)、[int]m(分に使用、例: 12m)、[int]h(時間に使用、例: 2h)です。
rpcPriority (省略可)Spanner 呼び出しのリクエストの優先度。値は、高、中、低のいずれかである必要があります。(デフォルトは高)
numShards (省略可)書き込み時に生成される出力シャードの最大数。デフォルト値は 20 です。シャード数が多いと Cloud Storage への書き込みのスループットが高くなりますが、出力 Cloud Storage ファイルの処理時にシャード全体のデータ集計コストが高くなる可能性があります。
spannerMetadataTableName (省略可)使用する Spanner 変更ストリーム メタデータのテーブル名。指定しない場合、パイプライン フロー中に Spanner 変更ストリームのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。それ以外の場合は指定しないでください。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、the Cloud Spanner change streams to Google Cloud Storage template を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner インスタンス ID
  • SPANNER_DATABASE: Cloud Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Cloud Spanner 変更ストリーム
  • GCS_OUTPUT_DIRECTORY: 変更ストリームの出力用のファイルの場所

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Cloud Spanner インスタンス ID
  • SPANNER_DATABASE: Cloud Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Cloud Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Cloud Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Cloud Spanner 変更ストリーム
  • GCS_OUTPUT_DIRECTORY: 変更ストリームの出力用のファイルの場所

次のステップ