Spanner change streams to Pub/Sub 模板

Spanner change streams to the Pub/Sub 模板是一种流处理流水线,可使用 Dataflow Runner V2 流式传输 Spanner 数据更改记录并将其写入 Pub/Sub 主题。

如需将数据输出到新的 Pub/Sub 主题,您需要先创建主题。创建后,Pub/Sub 会自动生成订阅并将其附加到新主题。如果您尝试将数据输出到不存在的 Pub/Sub 主题,则 Dataflow 流水线会抛出异常,并且流水线会在不断尝试建立连接时卡住。

如果存在必要的 Pub/Sub 主题,则可以将数据输出到该主题。

如需了解详情,请参阅关于变更数据流使用 Dataflow 构建变更数据流连接变更数据流最佳实践

流水线要求

  • 在运行流水线之前,Spanner 实例必须已存在。
  • 在运行流水线之前,Spanner 数据库必须已存在。
  • 在运行流水线之前,Spanner 元数据实例必须已存在。
  • 在运行流水线之前,Spanner 元数据数据库必须已存在。
  • 在运行流水线之前,Spanner 变更数据流必须已存在。
  • 在运行此流水线之前,Pub/Sub 主题必须已存在。

模板参数

参数 说明
spannerInstanceId 要从中读取变更数据流的 Spanner 实例。
spannerDatabase 要从中读取变更数据流的 Spanner 数据库。
spannerDatabaseRole (可选)运行模板时使用的 Spanner 数据库角色。仅当运行模板的 IAM 主账号是精细访问权限控制用户时,才需要此参数。数据库角色必须拥有变更数据流的 SELECT 特权和变更数据流的读取函数的 EXECUTE 特权。如需了解详情,请参阅变更数据流的精细访问权限控制
spannerMetadataInstanceId 要用于变更数据流连接器元数据表的 Spanner 实例。
spannerMetadataDatabase 要用于变更数据流连接器元数据表的 Spanner 数据库。
spannerChangeStreamName 要从中读取数据的 Spanner 变更数据流的名称。
pubsubTopic 变更数据流输出的 Pub/Sub 主题。
spannerProjectId (可选)从中读取变更数据流的项目。这也是创建变更数据流连接器元数据表的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
spannerMetadataTableName (可选)要使用的 Spanner 变更数据流连接器元数据表名称。如果未提供,Spanner 会在流水线流更改期间自动创建数据流连接器元数据表。更新现有流水线时,您必须提供此参数。请勿将此参数用于其他情况。
rpcPriority (可选)Spanner 调用的请求优先级。该值必须为 [HIGH,MEDIUM,LOW] 之一。(默认值:HIGH)
startTimestamp (可选)要用于读取变更数据流的开始 DateTime(含边界值)。例如 ex-2021-10-12T07:20:50.52Z。默认为流水线启动时的时间戳,即当前时间。
endTimestamp (可选)要用于读取变更数据流的结束 DateTime(含边界值)。例如 ex-2021-10-12T07:20:50.52Z。默认为未来的无限时间。
outputFileFormat (可选)输出的格式。输出会封装在许多 PubsubMessage 中,并发送到 Pub/Sub 主题。允许的格式为 JSON 和 AVRO。默认值为 JSON。
pubsubAPI (可选)用于实现流水线的 Pub/Sub API。允许的 API 包括 pubsubionative_client。对于少量每秒查询次数 (QPS),native_client 的延迟时间较短。对于 QPS 而言,pubsubio 可提供更好且更稳定的性能。默认值为 pubsubio

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner change streams to Pub/Sub template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

请替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 实例 ID
  • SPANNER_DATABASE:Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Spanner 变更数据流
  • PUBSUB_TOPIC:变更数据流输出的 Pub/Sub 主题

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 实例 ID
  • SPANNER_DATABASE:Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Spanner 变更数据流
  • PUBSUB_TOPIC:变更数据流输出的 Pub/Sub 主题

后续步骤