Spanner change streams to Cloud Storage テンプレートは、Spanner データ変更レコードをストリーミングし、Dataflow Runner v2 を使用して Cloud Storage バケットに書き込むストリーミング パイプラインです。
パイプラインは、タイムスタンプに基づいて Spanner の変更ストリーム レコードをウィンドウにグループ化します。各ウィンドウは、このテンプレートで構成できる期間を表します。タイムスタンプがウィンドウに属するすべてのレコードが、ウィンドウ内に存在することが保証されます。遅延は発生しません。複数の出力シャードを定義することもできます。パイプラインはシャードごと、ウィンドウごとに 1 つの Cloud Storage 出力ファイルを作成します。出力ファイル内では、レコードは順序付けされていません。出力ファイルは、ユーザーの構成に応じて JSON 形式または AVRO 形式で記述できます。
Spanner インスタンスまたは Cloud Storage バケットと同じリージョンから Dataflow ジョブを実行することで、ネットワークのレイテンシやネットワーク転送の費用を最小限に抑えることができます。使用するソース、シンク、ステージング ファイルのロケーションや、一時ファイルのロケーションがジョブのリージョン外である場合、データがリージョンを越えて送信される可能性があります。Dataflow リージョンの詳細をご覧ください。
変更ストリームの詳細については、変更ストリーム Dataflow パイプラインの構築方法とベスト プラクティスをご覧ください。
パイプラインの要件
- パイプラインの実行前に Spanner インスタンスが存在している。
- パイプラインの実行前に Spanner データベースが存在している。
- パイプラインの実行前に Spanner メタデータ インスタンスが存在している。
- パイプラインの実行前に Spanner メタデータ データベースが存在している。
- パイプラインの実行前に Spanner の変更ストリームが存在している。
- パイプラインの実行前に Cloud Storage 出力バケットが存在している。
テンプレートのパラメータ
必須パラメータ
- spannerInstanceId: 変更ストリーム データの読み取り元の Spanner インスタンス ID。
- spannerDatabase: 変更ストリーム データの読み取り元の Spanner データベース。
- spannerMetadataInstanceId: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner インスタンス ID。
- spannerMetadataDatabase: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner データベース。
- spannerChangeStreamName: 読み取り元の Spanner 変更ストリームの名前。
- gcsOutputDirectory : 出力ファイルを書き込むパスとファイル名の接頭辞。末尾はスラッシュでなければなりませんDateTime 形式は、日付と時刻のフォーマッタのディレクトリ パスをパースするために使用されます(例: gs://your-bucket/your-path)。
オプション パラメータ
- spannerProjectId: 変更ストリームの読み取り元の Spanner データベースを含む Google Cloud プロジェクトの ID。このプロジェクトは、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
- spannerDatabaseRole: テンプレートの実行時に使用される Spanner データベース ロール。このパラメータは、テンプレートを実行している IAM プリンシパルが、きめ細かいアクセス制御のユーザーである場合にのみ必要です。データベースのロールには、変更ストリームに対する
SELECT
権限と、変更ストリームの読み取り機能に対するEXECUTE
権限が必要です。詳細については、変更ストリームのきめ細かなアクセス制御(https://cloud.google.com/spanner/docs/fgac-change-streams)をご覧ください。 - spannerMetadataTableName: Spanner の変更ストリーム コネクタで使用するメタデータ テーブル名。指定しない場合、パイプラインの実行中に Spanner 変更ストリームのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。それ以外の場合は、このパラメータを使用しないでください。
- startTimestamp: 変更ストリーム読み取りに使用される開始日時(この値を含む)。形式は Ex-2021-10-12T07:20:50.52Z です。デフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。
- endTimestamp: 変更ストリームの読み取りに使用する終了の DateTime(この値を含む)。例: Ex-2021-10-12T07:20:50.52Z。デフォルトは、現在よりも先の無限の時間です。
- spannerHost: テンプレート内で呼び出す Cloud Spanner のエンドポイント。テストでのみ使われます(例: https://spanner.googleapis.com)。デフォルトは https://spanner.googleapis.com です。
- outputFileFormat: 出力 Cloud Storage ファイルの形式。指定できる形式は TEXT と AVRO です。デフォルトは AVRO です。
- windowDuration: ウィンドウ期間は、出力ディレクトリにデータが書き込まれる間隔です。パイプラインのスループットに基づいて期間を構成します。たとえば、スループットを向上させるには、データがメモリに収まるようにウィンドウ サイズを小さくする必要があります。デフォルトは 5m(5 分)、最小は 1s(1 秒)です。使用できる形式は、[int]s(秒に使用、例: 5s)、[int]m(分に使用、例: 12m)、[int]h(時間に使用、例: 2h)です(例: 5m)。
- rpcPriority: Spanner 呼び出しのリクエストの優先度。値は HIGH、MEDIUM、LOW のいずれかにする必要があります。デフォルトは HIGH です。
- outputFilenamePrefix: ウィンドウ処理された各ファイルに配置する接頭辞。(例: output-)。デフォルトは output です。
- numShards: 書き込み時に生成される出力シャードの最大数。シャード数が多いと Cloud Storage への書き込みのスループットが高くなりますが、出力 Cloud Storage ファイルの処理時にシャード全体のデータ集計コストが高くなる可能性があります。デフォルトは 20 です。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Spanner change streams to Google Cloud Storage template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ gcsOutputDirectory=GCS_OUTPUT_DIRECTORY
次のように置き換えます。
JOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)SPANNER_INSTANCE_ID
: Cloud Spanner インスタンス IDSPANNER_DATABASE
: Cloud Spanner データベースSPANNER_METADATA_INSTANCE_ID
: Cloud Spanner メタデータ インスタンス IDSPANNER_METADATA_DATABASE
: Cloud Spanner メタデータ データベースSPANNER_CHANGE_STREAM
: Cloud Spanner 変更ストリームGCS_OUTPUT_DIRECTORY
: 変更ストリームの出力用のファイルの場所
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)SPANNER_INSTANCE_ID
: Cloud Spanner インスタンス IDSPANNER_DATABASE
: Cloud Spanner データベースSPANNER_METADATA_INSTANCE_ID
: Cloud Spanner メタデータ インスタンス IDSPANNER_METADATA_DATABASE
: Cloud Spanner メタデータ データベースSPANNER_CHANGE_STREAM
: Cloud Spanner 変更ストリームGCS_OUTPUT_DIRECTORY
: 変更ストリームの出力用のファイルの場所
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。