Spanner change streams to Pub/Sub 模板

Spanner change streams to the Pub/Sub 模板是一种流处理流水线,可使用 Dataflow Runner V2 流式传输 Spanner 数据更改记录并将其写入 Pub/Sub 主题。

如需将数据输出到新的 Pub/Sub 主题,您需要先创建主题。创建后,Pub/Sub 会自动生成订阅并将其附加到新主题。如果您尝试将数据输出到不存在的 Pub/Sub 主题,则 Dataflow 流水线会抛出异常,并且流水线会在不断尝试建立连接时卡住。

如果存在必要的 Pub/Sub 主题,则可以将数据输出到该主题。

如需了解详情,请参阅关于变更数据流使用 Dataflow 构建变更数据流连接变更数据流最佳实践

流水线要求

  • 在运行流水线之前,Spanner 实例必须已存在。
  • 在运行流水线之前,Spanner 数据库必须已存在。
  • 在运行流水线之前,Spanner 元数据实例必须已存在。
  • 在运行流水线之前,Spanner 元数据数据库必须已存在。
  • 在运行流水线之前,Spanner 变更数据流必须已存在。
  • 在运行此流水线之前,Pub/Sub 主题必须已存在。

模板参数

必需参数

  • spannerInstanceId:要从中读取变更数据流的 Spanner 实例。
  • spannerDatabase:要从中读取变更数据流的 Spanner 数据库。
  • spannerMetadataInstanceId:要用于变更数据流连接器元数据表的 Spanner 实例。
  • spannerMetadataDatabase:要用于变更数据流连接器元数据表的 Spanner 数据库。
  • spannerChangeStreamName:要从中读取数据的 Spanner 变更数据流的名称。
  • pubsubTopic:变更数据流输出的 Pub/Sub 主题。

可选参数

  • spannerProjectId:读取变更数据流的项目。此项目也是创建变更数据流连接器元数据表的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
  • spannerDatabaseRole:运行模板时使用的 Spanner 数据库角色。仅当运行模板的 IAM 主账号是精细访问权限控制用户时,才需要此参数。数据库角色必须拥有变更数据流的 SELECT 特权和变更数据流的读取函数的 EXECUTE 特权。如需了解详情,请参阅变更数据流的精细访问权限控制 (https://cloud.google.com/spanner/docs/fgac-change-streams)。
  • spannerMetadataTableName:要使用的 Spanner 变更数据流连接器元数据表名称。如果未提供,Spanner 会在流水线流更改期间自动创建数据流连接器元数据表。更新现有流水线时,您必须提供此参数。 请勿将此参数用于其他情况。
  • startTimestamp:用于读取变更数据流的起始日期时间 (https://tools.ietf.org/html/rfc3339)(含边界值)。例如 ex- 2021-10-12T07:20:50.52Z。默认为流水线启动时的时间戳,即当前时间。
  • endTimestamp:用于读取变更数据流的结束日期时间 (https://tools.ietf.org/html/rfc3339)(含边界值)。例如 ex- 2021-10-12T07:20:50.52Z。默认为未来的无限时间。
  • spannerHost:要在模板中调用的 Cloud Spanner 端点。仅用于测试。例如 https://spanner.googleapis.com。默认值为 https://spanner.googleapis.com
  • outputDataFormat:输出的格式。输出会封装在许多 PubsubMessage 中,并发送到 Pub/Sub 主题。允许的格式为 JSON 和 AVRO。默认值为 JSON。
  • pubsubAPI:用于实现流水线的 Pub/Sub API。允许的 API 包括 pubsubionative_client。对于少量每秒查询次数 (QPS),native_client 的延迟时间较短。对于 QPS 而言,pubsubio 可提供更好且更稳定的性能。默认值为 pubsubio
  • pubsubProjectId:Pub/Sub 主题的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
  • rpcPriority:Spanner 调用的请求优先级。允许的值包括“高”“中”和“低”。默认值为 HIGH。
  • includeSpannerSource:是否在输出消息数据中包含要从中读取变更数据流的 Spanner 数据库 ID 和实例 ID。默认值为:false。
  • outputMessageMetadata:输出 Pub/Sub 消息中自定义字段 outputMessageMetadata 的字符串值。默认为空,只有当此值不为空时,系统才会填充 outputMessageMetadata 字段。在此处输入值时,请转义所有特殊字符(例如双引号)。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner change streams to Pub/Sub template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 实例 ID
  • SPANNER_DATABASE:Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Spanner 变更数据流
  • PUBSUB_TOPIC:变更数据流输出的 Pub/Sub 主题

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • SPANNER_INSTANCE_ID:Spanner 实例 ID
  • SPANNER_DATABASE:Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Spanner 变更数据流
  • PUBSUB_TOPIC:变更数据流输出的 Pub/Sub 主题

后续步骤