Spanner change streams to the Pub/Sub 模板是一种流处理流水线,可使用 Dataflow Runner V2 流式传输 Spanner 数据更改记录并将其写入 Pub/Sub 主题。
如需将数据输出到新的 Pub/Sub 主题,您需要先创建主题。创建后,Pub/Sub 会自动生成订阅并将其附加到新主题。如果您尝试将数据输出到不存在的 Pub/Sub 主题,则 Dataflow 流水线会抛出异常,并且流水线会在不断尝试建立连接时卡住。
如果存在必要的 Pub/Sub 主题,则可以将数据输出到该主题。
如需了解详情,请参阅关于变更数据流、使用 Dataflow 构建变更数据流连接和变更数据流最佳实践。
流水线要求
- 在运行流水线之前,Spanner 实例必须已存在。
- 在运行流水线之前,Spanner 数据库必须已存在。
- 在运行流水线之前,Spanner 元数据实例必须已存在。
- 在运行流水线之前,Spanner 元数据数据库必须已存在。
- 在运行流水线之前,Spanner 变更数据流必须已存在。
- 在运行此流水线之前,Pub/Sub 主题必须已存在。
模板参数
必需参数
- spannerInstanceId:要从中读取变更数据流的 Spanner 实例。
- spannerDatabase:要从中读取变更数据流的 Spanner 数据库。
- spannerMetadataInstanceId:要用于变更数据流连接器元数据表的 Spanner 实例。
- spannerMetadataDatabase:要用于变更数据流连接器元数据表的 Spanner 数据库。
- spannerChangeStreamName:要从中读取数据的 Spanner 变更数据流的名称。
- pubsubTopic:变更数据流输出的 Pub/Sub 主题。
可选参数
- spannerProjectId:读取变更数据流的项目。此项目也是创建变更数据流连接器元数据表的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
- spannerDatabaseRole:运行模板时使用的 Spanner 数据库角色。仅当运行模板的 IAM 主账号是精细访问权限控制用户时,才需要此参数。数据库角色必须拥有变更数据流的
SELECT
特权和变更数据流的读取函数的EXECUTE
特权。如需了解详情,请参阅变更数据流的精细访问权限控制 (https://cloud.google.com/spanner/docs/fgac-change-streams)。 - spannerMetadataTableName:要使用的 Spanner 变更数据流连接器元数据表名称。如果未提供,Spanner 会在流水线流更改期间自动创建数据流连接器元数据表。更新现有流水线时,您必须提供此参数。 请勿将此参数用于其他情况。
- startTimestamp:用于读取变更数据流的起始日期时间 (https://tools.ietf.org/html/rfc3339)(含边界值)。例如 ex- 2021-10-12T07:20:50.52Z。默认为流水线启动时的时间戳,即当前时间。
- endTimestamp:用于读取变更数据流的结束日期时间 (https://tools.ietf.org/html/rfc3339)(含边界值)。例如 ex- 2021-10-12T07:20:50.52Z。默认为未来的无限时间。
- spannerHost:要在模板中调用的 Cloud Spanner 端点。仅用于测试。例如
https://spanner.googleapis.com
。默认值为 https://spanner.googleapis.com。 - outputDataFormat:输出的格式。输出会封装在许多 PubsubMessage 中,并发送到 Pub/Sub 主题。允许的格式为 JSON 和 AVRO。默认值为 JSON。
- pubsubAPI:用于实现流水线的 Pub/Sub API。允许的 API 包括
pubsubio
和native_client
。对于少量每秒查询次数 (QPS),native_client
的延迟时间较短。对于 QPS 而言,pubsubio
可提供更好且更稳定的性能。默认值为pubsubio
。 - pubsubProjectId:Pub/Sub 主题的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
- rpcPriority:Spanner 调用的请求优先级。允许的值包括“高”“中”和“低”。默认值为 HIGH。
- includeSpannerSource:是否在输出消息数据中包含要从中读取变更数据流的 Spanner 数据库 ID 和实例 ID。默认值为:false。
- outputMessageMetadata:输出 Pub/Sub 消息中自定义字段 outputMessageMetadata 的字符串值。默认为空,只有当此值不为空时,系统才会填充 outputMessageMetadata 字段。在此处输入值时,请转义所有特殊字符(例如双引号)。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cloud Spanner change streams to Pub/Sub template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_PubSub \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ pubsubTopic=PUBSUB_TOPIC
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
SPANNER_INSTANCE_ID
:Spanner 实例 IDSPANNER_DATABASE
:Spanner 数据库SPANNER_METADATA_INSTANCE_ID
:Spanner 元数据实例 IDSPANNER_METADATA_DATABASE
:Spanner 元数据数据库SPANNER_CHANGE_STREAM
:Spanner 变更数据流PUBSUB_TOPIC
:变更数据流输出的 Pub/Sub 主题
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "pubsubTopic": "PUBSUB_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_PubSub", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
SPANNER_INSTANCE_ID
:Spanner 实例 IDSPANNER_DATABASE
:Spanner 数据库SPANNER_METADATA_INSTANCE_ID
:Spanner 元数据实例 IDSPANNER_METADATA_DATABASE
:Spanner 元数据数据库SPANNER_CHANGE_STREAM
:Spanner 变更数据流PUBSUB_TOPIC
:变更数据流输出的 Pub/Sub 主题
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。