Pub/Sub to Pub/Sub 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并将这些消息写入其他 Pub/Sub 主题。该流水线还接受一个可选的消息属性键以及值,该值可用于过滤应写入 Pub/Sub 主题的消息。您可以使用此模板将消息从 Pub/Sub 订阅复制到带有可选消息过滤器的其他 Pub/Sub 主题。
流水线要求
- 源 Pub/Sub 订阅必须已存在才能执行流水线。
- 源 Pub/Sub 订阅必须是拉取订阅。
- 目的地 Pub/Sub 主题必须已存在才能执行此流水线。
模板参数
必需参数
- inputSubscription:要从中读取输入的 Pub/Sub 订阅。例如
projects/your-project-id/subscriptions/your-subscription-name
。 - outputTopic:要将输出写入的 Pub/Sub 主题。例如
projects/your-project-id/topics/your-topic-name
。
可选参数
- filterKey:用于过滤事件的属性键。如果未指定
filterKey
,则不会应用过滤器。 - filterValue:在提供
filterKey
时,用于过滤事件的属性值。默认情况下,系统会使用 nullfilterValue
。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub to Pub/Sub template。
- 在提供的参数字段中,输入您的参数值。
- 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Cloud_PubSub \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ filterKey=FILTER_KEY,\ filterValue=FILTER_VALUE
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
STAGING_LOCATION
:暂存本地文件的位置(例如gs://your-bucket/staging
)SUBSCRIPTION_NAME
:Pub/Sub 订阅名称TOPIC_NAME
:Pub/Sub 主题名称FILTER_KEY
:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。FILTER_VALUE
:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。不过滤部分匹配(如子字符串)。默认使用 null 事件过滤值。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "filterKey": "FILTER_KEY", "filterValue": "FILTER_VALUE" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
STAGING_LOCATION
:暂存本地文件的位置(例如gs://your-bucket/staging
)SUBSCRIPTION_NAME
:Pub/Sub 订阅名称TOPIC_NAME
:Pub/Sub 主题名称FILTER_KEY
:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。FILTER_VALUE
:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。不过滤部分匹配(如子字符串)。默认使用 null 事件过滤值。
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。