Spanner change streams to Pub/Sub テンプレート

Spanner change streams to Pub/Sub テンプレートは、Spanner データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して Pub/Sub トピックに書き込むストリーミング パイプラインです。

データを新しい Pub/Sub トピックに出力するには、まずトピックを作成する必要があります。作成後、Pub/Sub はサブスクリプションを自動的に生成して新しいトピックに接続します。存在しない Pub/Sub トピックにデータを出力しようとすると、Dataflow パイプラインは例外をスローし、パイプラインは継続的に接続しようとして停止します。

必要な Pub/Sub トピックがすでに存在する場合、そのトピックにデータを出力できます。

詳細については、変更ストリームについてDataflow で変更ストリームの接続を構築する変更ストリームのベスト プラクティスをご覧ください。

パイプラインの要件

  • パイプラインの実行前に、Spanner インスタンスが存在している。
  • パイプラインの実行前に Spanner データベースが存在している。
  • パイプラインの実行前に Spanner メタデータ インスタンスが存在している。
  • パイプラインの実行前に Spanner メタデータ データベースが存在している。
  • パイプラインの実行前に Spanner の変更ストリームが存在している。
  • Pub/Sub トピックは、パイプラインを実行する前に存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
spannerInstanceId 変更ストリームの読み取り元の Spanner インスタンス。
spannerDatabase 変更ストリームの読み取り元の Spanner データベース。
spannerDatabaseRole (省略可)テンプレートの実行時に使用される Spanner データベース ロール。このパラメータは、テンプレートを実行している IAM プリンシパルが、きめ細かいアクセス制御のユーザーである場合にのみ必要です。データベースのロールには、変更ストリームに対する SELECT 権限と、変更ストリームの読み取り機能に対する EXECUTE 権限が必要です。詳細については、変更ストリームのきめ細かなアクセス制御をご覧ください。
spannerMetadataInstanceId 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner インスタンス。
spannerMetadataDatabase 変更ストリーム コネクタのメタデータ テーブルに使用する Spanner データベース。
spannerChangeStreamName 読み取り元の Spanner 変更ストリームの名前。
pubsubTopic 変更ストリーム出力の Pub/Sub トピック。
spannerProjectId (省略可)変更ストリームの読み取り元のプロジェクト。これは、変更ストリーム コネクタのメタデータ テーブルが作成されるプロジェクトでもあります。このパラメータのデフォルトは、Dataflow パイプラインが動作しているプロジェクトです。
spannerMetadataTableName (省略可)使用する Spanner 変更ストリーム メタデータのテーブル名。指定しない場合、パイプライン フローの変更中に Spanner によってストリーム コネクタのメタデータ テーブルが自動的に作成されます。このパラメータは、既存のパイプラインを更新するときに指定する必要があります。このパラメータは他のケースでは使用しないでください。
rpcPriority (省略可)Spanner 呼び出しのリクエストの優先度。値は、高、中、低のいずれかである必要があります。(デフォルトは高)
startTimestamp (省略可)変更ストリームの読み取りに使用する開始の DateTime(両端を含む)。例: ex-2021-10-12T07:20:50.52Z。デフォルトは、パイプライン開始時のタイムスタンプ、つまり現在の時刻です。
endTimestamp (省略可)変更ストリームの読み取りに使用する終了の DateTime(両端を含む)。例: ex-2021-10-12T07:20:50.52Z。デフォルトは、現在よりも先の無限の時間です。
outputFileFormat (省略可)出力の形式。出力は多くの PubsubMessage でラップされ、Pub/Sub トピックに送信されます。許可されている形式は JSON と AVRO です。デフォルトは JSON です。
pubsubAPI (省略可)パイプラインの実装に使用される Pub/Sub API。許可される API は pubsubionative_client です。秒間クエリ数(QPS)が少ない場合、native_client はレイテンシが低くなります。QPS が大きい場合、pubsubio はより優れた安定したパフォーマンスを提供します。デフォルト値は pubsubio です。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Cloud Spanner change streams to Pub/Sub template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • PUBSUB_TOPIC: 変更ストリーム出力の Pub/Sub トピック

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • SPANNER_INSTANCE_ID: Spanner インスタンス ID
  • SPANNER_DATABASE: Spanner データベース
  • SPANNER_METADATA_INSTANCE_ID: Spanner メタデータ インスタンス ID
  • SPANNER_METADATA_DATABASE: Spanner メタデータ データベース
  • SPANNER_CHANGE_STREAM: Spanner 変更ストリーム
  • PUBSUB_TOPIC: 変更ストリーム出力の Pub/Sub トピック

次のステップ