Spanner change streams to Pub/Sub テンプレート

Spanner change streams to Pub/Sub テンプレートは、Spanner データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して Pub/Sub トピックに書き込むストリーミング パイプラインです。

データを新しい Pub/Sub トピックに出力するには、まずトピックを作成する必要があります。作成後、Pub/Sub はサブスクリプションを自動的に生成して新しいトピックに接続します。存在しない Pub/Sub トピックにデータを出力しようとすると、Dataflow パイプラインは例外をスローし、パイプラインは継続的に接続しようとして停止します。

必要な Pub/Sub トピックがすでに存在する場合、そのトピックにデータを出力できます。

詳細については、変更ストリームについてDataflow で変更ストリームの接続を構築する変更ストリームのベスト プラクティスをご覧ください。

パイプラインの要件

  • パイプラインの実行前に、Spanner インスタンスが存在している。
  • パイプラインの実行前に Spanner データベースが存在している。
  • パイプラインの実行前に Spanner メタデータ インスタンスが存在している。
  • パイプラインの実行前に Spanner メタデータ データベースが存在している。
  • パイプラインの実行前に Spanner の変更ストリームが存在している。
  • Pub/Sub トピックは、パイプラインを実行する前に存在している必要があります。

テンプレートのパラメータ

必須パラメータ

  • spannerInstanceId: 変更ストリームの読み取り元の Spanner インスタンス。
  • spannerDatabase: 変更ストリームの読み取り元の Spanner データベース。
  • spannerMetadataInstanceId: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner インスタンス。
  • spannerMetadataDatabase: 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner データベース。
  • spannerChangeStreamName: 読み取り元の Spanner 変更ストリームの名前。
  • pubsubTopic: 変更ストリーム出力の Pub/Sub トピック

オプション パラメータ

  • spannerProjectId: 変更ストリームの読み取り元のプロジェクト。このプロジェクトは、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
  • spannerDatabaseRole: テンプレートの実行時に使用される Spanner データベース ロール。このパラメータは、テンプレートを実行している IAM プリンシパルが、きめ細かいアクセス制御のユーザーである場合にのみ必要です。データベースのロールには、変更ストリームに対する SELECT 権限と、変更ストリームの読み取り機能に対する EXECUTE 権限が必要です。詳細については、変更ストリームのきめ細かなアクセス制御(https://cloud.google.com/spanner/docs/fgac-change-streams)をご覧ください。
  • spannerMetadataTableName: Spanner の変更ストリーム コネクタで使用するメタデータ テーブル名。指定しない場合、パイプライン フローの変更中に Spanner によってストリーム コネクタのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。このパラメータは他のケースでは使用しないでください。
  • startTimestamp: 変更ストリームの読み取りに使用される開始日時(https://tools.ietf.org/html/rfc3339)。この値も含みます。例: ex-2021-10-12T07:20:50.52Z。デフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。
  • endTimestamp: 変更ストリームの読み取りに使用される終了日時(https://tools.ietf.org/html/rfc3339)。この値も含みます。例: ex-2021-10-12T07:20:50.52Z。デフォルトは、現在よりも先の無限の時間です。
  • spannerHost: テンプレート内で呼び出す Cloud Spanner のエンドポイント。テストでのみ使われます(例: https://spanner.googleapis.com)。デフォルトは https://spanner.googleapis.com です。
  • outputDataFormat: 出力の形式。出力は多くの PubsubMessage でラップされ、Pub/Sub トピックに送信されます。許可されている形式は JSON と AVRO です。デフォルトは JSON です。
  • pubsubAPI: パイプラインの実装に使用される Pub/Sub API。許可される API は pubsubionative_client です。秒間クエリ数(QPS)が少ない場合、native_client はレイテンシが低くなります。QPS が大きい場合、pubsubio はより優れた安定したパフォーマンスを提供します。デフォルトは pubsubio です。
  • pubsubProjectId: Pub/Sub トピックのプロジェクト。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
  • rpcPriority: Spanner 呼び出しのリクエストの優先度。使用できる値は、HIGH、MEDIUM、LOW です。デフォルトは HIGH です。
  • includeSpannerSource : 変更ストリームの読み取り元の Spanner データベース ID とインスタンス ID を出力メッセージ データに含めるかどうか。デフォルトは false です。
  • outputMessageMetadata : 出力 Pub/Sub メッセージのカスタム フィールド outputMessageMetadata の文字列値。デフォルトは空で、この値が空でない場合のみ、フィールド outputMessageMetadata に値が入力されます。値を入力する際は、特殊文字をエスケープしてください(例: 二重引用符)。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Spanner change streams to Pub/Sub template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • PUBSUB_TOPIC: 変更ストリーム出力の Pub/Sub トピック

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • PUBSUB_TOPIC: 変更ストリーム出力の Pub/Sub トピック

次のステップ