Google 提供了一组开源 Dataflow 模板。
这些 Dataflow 模板可帮助您解决大型数据任务,包括数据导入、数据导出、数据备份、数据恢复和批量 API 操作,所有这些均无需使用专用开发环境。这些模板基于 Apache Beam 构建,并使用 Dataflow 转换数据。
如需了解有关模板的一般信息,请参阅 Dataflow 模板。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板。本指南介绍了流式模板。
Pub/Sub Subscription to BigQuery
Pub/Sub Subscription to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。
对此流水线的要求:
- Pub/Sub 消息的
data
字段必须使用 JSON 格式(如此 JSON 指南中所述)。例如,您可以将data
字段中值为{"k1":"v1", "k2":"v2"}
格式的消息插入具有名为k1
和k2
的列且数据类型为字符串的 BigQuery 表中。 - 在运行该流水线之前,输出表必须已存在。表架构必须与输入 JSON 对象相匹配。
模板参数
参数 | 说明 |
---|---|
inputSubscription |
要读取的 Pub/Sub 输入订阅,格式为 projects/<project>/subscriptions/<subscription> 。 |
outputTableSpec |
BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table> |
outputDeadletterTable |
未能到达输出表的消息的 BigQuery 表,格式为 <my-project>:<my-dataset>.<my-table> 。如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 OUTPUT_TABLE_SPEC_error_records 。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
运行 Pub/Sub Subscription to BigQuery 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub Subscription to BigQuery template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \ --region REGION_NAME \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置(例如gs://your-bucket/temp
)SUBSCRIPTION_NAME
:您的 Pub/Sub 订阅名称DATASET
:您的 BigQuery 数据集TABLE_NAME
:您的 BigQuery 表名称
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" }, "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置(例如gs://your-bucket/temp
)SUBSCRIPTION_NAME
:您的 Pub/Sub 订阅名称DATASET
:您的 BigQuery 数据集TABLE_NAME
:您的 BigQuery 表名称
Pub/Sub Topic to BigQuery
Pub/Sub Topic to BigQuery 模板是一种流处理流水线,可从 Cloud Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。
对此流水线的要求:
- Pub/Sub 消息的
data
字段必须使用 JSON 格式(如此 JSON 指南中所述)。例如,您可以将data
字段中值为{"k1":"v1", "k2":"v2"}
格式的消息插入具有名为k1
和k2
的列且数据类型为字符串的 BigQuery 表中。 - 在运行该流水线之前,输出表必须已存在。表架构必须与输入 JSON 对象相匹配。
模板参数
参数 | 说明 |
---|---|
inputTopic |
要读取的 Cloud Pub/Sub 输入主题,格式为 projects/<project>/topics/<topic> 。 |
outputTableSpec |
BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table> |
outputDeadletterTable |
无法到达输出表的消息的 BigQuery 表。格式应为 <my-project>:<my-dataset>.<my-table> 。
如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records 。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
运行 Cloud Pub/Sub Topic to BigQuery 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub Topic to BigQuery template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \ --region REGION_NAME \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\ outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置(例如gs://your-bucket/temp
)TOPIC_NAME
:您的 Pub/Sub 主题名称DATASET
:您的 BigQuery 数据集TABLE_NAME
:您的 BigQuery 表名称
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置(例如gs://your-bucket/temp
)TOPIC_NAME
:您的 Pub/Sub 主题名称DATASET
:您的 BigQuery 数据集TABLE_NAME
:您的 BigQuery 表名称
Pub/Sub Avro to BigQuery
Pub/Sub Avro to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。
对此流水线的要求
- 用作输入来源的 Pub/Sub 订阅必须存在。
- Avro 记录的架构文件必须存在于 Cloud Storage 存储空间中。
- 未处理的 Pub/Sub 主题必须存在。
- 用作输出目标的 BigQuery 数据集必须已存在。
模板参数
参数 | 说明 |
---|---|
schemaPath |
Avro 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/schema.avsc 。 |
inputSubscription |
要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription> 。 |
outputTopic |
要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name> 。 |
outputTableSpec |
BigQuery 输出表位置。例如 <my-project>:<my-dataset>.<my-table> 。
根据指定的 createDisposition,系统可能会使用用户提供的 Avro 架构自动创建输出表。 |
writeDisposition |
(可选)BigQuery WriteDisposition。例如 WRITE_APPEND 、WRITE_EMPTY 或 WRITE_TRUNCATE 。默认:WRITE_APPEND |
createDisposition |
(可选)BigQuery CreateDisposition。例如 CREATE_IF_NEEDED 、CREATE_NEVER 。默认:CREATE_IF_NEEDED |
运行 Pub/Sub Avro to BigQuery 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub Avro to BigQuery template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
SCHEMA_PATH
:Avro 架构文件的 Cloud Storage 路径(例如gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
:Pub/Sub 输入订阅名称BIGQUERY_TABLE
:BigQuery 输出表名称DEADLETTER_TOPIC
:要用于未处理的队列的 Pub/Sub 主题
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
SCHEMA_PATH
:Avro 架构文件的 Cloud Storage 路径(例如gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
:Pub/Sub 输入订阅名称BIGQUERY_TABLE
:BigQuery 输出表名称DEADLETTER_TOPIC
:要用于未处理的队列的 Pub/Sub 主题
Pub/Sub Proto to BigQuery
Pub/Sub proto to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。
可以提供 JavaScript 用户定义函数 (UDF) 来转换数据。可以将在执行 UDF 期间发生的错误发送到单独的 Pub/Sub 主题或与 BigQuery 错误相同的未处理主题。
对此流水线的要求:
- 用作输入来源的 Pub/Sub 订阅必须存在。
- Proto 记录的架构文件必须存在于 Cloud Storage 中。
- 输出 Pub/Sub 主题必须存在。
- 用作输出目标的 BigQuery 数据集必须已存在。
- 如果 BigQuery 表存在,则无论
createDisposition
值如何,该表都必须具有与 proto 数据匹配的架构。
模板参数
参数 | 说明 |
---|---|
protoSchemaPath |
独立的 proto 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/file.pb 。
您可以使用 protoc 命令的 --descriptor_set_out 标志生成此文件。--include_imports 标志可确保文件是独立的。 |
fullMessageName |
完整的 proto 消息名称。例如 package.name.MessageName ,其中 package.name 是为 package 语句(而不是 java_package 语句)提供的值。 |
inputSubscription |
要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription> 。 |
outputTopic |
要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name> 。 |
outputTableSpec |
BigQuery 输出表位置。例如 my-project:my_dataset.my_table 。
根据指定的 createDisposition,系统可能会使用输入架构文件自动创建输出表。 |
preserveProtoFieldNames |
(可选)true 用于保留 JSON 中的原始 Proto 字段名称。false 用于使用更多标准 JSON 名称。例如,false 会将 field_name 更改为 fieldName 。(默认:false ) |
bigQueryTableSchemaPath |
(可选)BigQuery 架构路径到 Cloud Storage 路径。例如 gs://path/to/my/schema.json 。如果未提供,则根据 Proto 架构推断架构。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
udfOutputTopic |
(可选)存储 UDF 错误的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name> 。如果未提供,则会将 UDF 错误发送到 outputTopic 所在的主题。 |
writeDisposition |
(可选)BigQuery WriteDisposition 。例如 WRITE_APPEND 、WRITE_EMPTY 或 WRITE_TRUNCATE 。默认值:WRITE_APPEND 。 |
createDisposition |
(可选)BigQuery CreateDisposition 。例如 CREATE_IF_NEEDED 、CREATE_NEVER 。默认值为 CREATE_IF_NEEDED 。 |
运行 Pub/Sub Proto to BigQuery 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub Proto to BigQuery template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
SCHEMA_PATH
:Proto 架构文件的 Cloud Storage 路径(例如gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
:Proto 消息名称(例如package.name.MessageName
)SUBSCRIPTION_NAME
:Pub/Sub 输入订阅名称BIGQUERY_TABLE
:BigQuery 输出表名称UNPROCESSED_TOPIC
:要用于未处理的队列的 Pub/Sub 主题
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
SCHEMA_PATH
:Proto 架构文件的 Cloud Storage 路径(例如gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
:Proto 消息名称(例如package.name.MessageName
)SUBSCRIPTION_NAME
:Pub/Sub 输入订阅名称BIGQUERY_TABLE
:BigQuery 输出表名称UNPROCESSED_TOPIC
:要用于未处理的队列的 Pub/Sub 主题
Pub/Sub to Pub/Sub
Pub/Sub to Pub/Sub 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并将这些消息写入其他 Pub/Sub 主题。该流水线还接受一个可选的消息属性键以及值,该值可用于过滤应写入 Pub/Sub 主题的消息。您可以使用此模板将消息从 Pub/Sub 订阅复制到带有可选消息过滤器的其他 Pub/Sub 主题。
对此流水线的要求:
- 源 Pub/Sub 订阅必须已存在才能执行流水线。
- 源 Pub/Sub 订阅必须是拉取订阅。
- 目的地 Pub/Sub 主题必须已存在才能执行此流水线。
模板参数
参数 | 说明 |
---|---|
inputSubscription |
要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name> 。 |
outputTopic |
要将输出写入其中的 Cloud Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name> 。 |
filterKey |
(可选)根据特性键过滤事件。如果未指定 filterKey ,则不会应用过滤器。 |
filterValue |
(可选)提供了 filterKey 时要使用的过滤器特性值。默认情况下,filterValue 为 null。 |
运行 Pub/Sub to Pub/Sub 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub to Pub/Sub template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \ --region REGION_NAME \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ filterKey=FILTER_KEY,\ filterValue=FILTER_VALUE
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置(例如gs://your-bucket/temp
)SUBSCRIPTION_NAME
:Pub/Sub 订阅名称TOPIC_NAME
:Pub/Sub 主题名称FILTER_KEY
:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。FILTER_VALUE
:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。不过滤部分匹配(如子字符串)。默认使用 null 事件过滤值。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME", "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "filterKey": "FILTER_KEY", "filterValue": "FILTER_VALUE" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置(例如gs://your-bucket/temp
)SUBSCRIPTION_NAME
:Pub/Sub 订阅名称TOPIC_NAME
:Pub/Sub 主题名称FILTER_KEY
:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。FILTER_VALUE
:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。不过滤部分匹配(如子字符串)。默认使用 null 事件过滤值。
Pub/Sub to Splunk
Pub/Sub to Splunk 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并通过 Splunk 的 HTTP Event Collector (HEC) 将消息载荷写入 Splunk。此模板的最常见使用场景是将日志导出到 Splunk。如需查看底层工作流的示例,请参阅使用 Dataflow 将支持生产环境的日志导出部署到 Splunk。
在写入 Splunk 之前,您还可以将 JavaScript 用户定义函数应用于消息载荷。任何未能成功处理的消息都会被转发到 Pub/Sub 未处理主题,以便进一步进行问题排查并重新处理。
要为 HEC 令牌提供额外保护,您还可以传入 Cloud KMS 密钥和使用 Cloud KMS 密钥加密的 base64 编码 HEC 令牌参数。如需详细了解如何对 HEC 令牌参数进行加密,请参阅 Cloud KMS API 加密端点。
对此流水线的要求:
- 源 Pub/Sub 订阅必须已存在才能运行此流水线。
- 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
- Splunk HEC 端点必须可从 Dataflow 工作器的网络访问。
- Splunk HEC 令牌必须已生成并且可用。
模板参数
参数 | 说明 |
---|---|
inputSubscription |
要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name> 。 |
token |
(可选)Splunk HEC 身份验证令牌。如果 tokenSource 设置为 PlaINTEXT 或 KMS,则必须提供。 |
url |
Splunk HEC 网址,必须可从运行流水线的 VPC 路由。例如,https://splunk-hec-host:8088。 |
outputDeadletterTopic |
用于转发无法递送的消息的 Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name> 。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
batchCount |
(可选)向 Splunk 发送多个事件的批次大小。默认值为 1(无批处理)。 |
parallelism |
(可选)最大并行请求数。默认值为 1(无并行)。 |
disableCertificateValidation |
(可选)停用 SSL 证书验证。默认为 false(已启用验证)。 如果为 true,则不验证证书(所有证书均受信任),并且忽略“rootCaCertificatePath”参数。 |
includePubsubMessage |
(可选)在载荷中包含完整的 Pub/Sub 消息。默认值为 false(只有数据元素包含在载荷中)。 |
tokenSource |
令牌的来源。PLAINTEXT、KMS 或 SECRET_MANAGER 之一。如果使用了 Secret Manager,则必须提供此参数。如果 tokenSource 设置为 KMS,则必须提供 tokenKMSEncryptionKey 和加密的 token 。如果 tokenSource 设置为 SECRET_MANAGER,则必须提供 tokenSecretId 。如果 tokenSource 设置为 DLPINTEXT,则必须提供 token 。 |
tokenKMSEncryptionKey |
(可选)用于解密 HEC 令牌字符串的 Cloud KMS 密钥。 如果 tokenSource 设置为 KMS,则必须提供此参数。如果提供了 Cloud KMS 密钥,则必须以加密方式传递 HEC 令牌字符串。 |
tokenSecretId |
(可选)令牌的 Secret Manager Secret ID。如果 tokenSource 设置为 SECRET_MANAGER,则必须提供此参数。应采用以下格式:projects/<project-id>/secrets/<secret-name>/versions/<secret-version> 。 |
rootCaCertificatePath |
(可选)Cloud Storage 中根 CA 证书的完整网址。例如 gs://mybucket/mycerts/privateCA.crt 。Cloud Storage 中提供的证书必须采用 DER 编码,并且可能以二进制或可打印 (Base64) 编码提供。如果证书是使用 Base64 编码提供的,则它必须以 -----BEGIN CERTIFICATE----- 开头为界,并且必须以 -----END CERTIFICATE----- 结尾为界。如果提供此参数,系统会提取此私有 CA 证书文件并将其添加到 Dataflow 工作器的信任库,以便验证 Splunk HEC 端点的 SSL 证书。如果未提供此参数,则使用默认信任库。 |
enableBatchLogs |
(可选)指定是否应为写入 Splunk 的批次启用日志。默认值:true 。 |
enableGzipHttpCompression |
(可选)指定是否应压缩发送到 Splunk HEC 的 HTTP 请求(gzip 内容编码)。默认值:true 。 |
运行 Pub/Sub to Splunk 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub to Splunk template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \ --region REGION_NAME \ --staging-location TEMP_LOCATION \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\ token=TOKEN,\ url=URL,\ outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ batchCount=BATCH_COUNT,\ parallelism=PARALLELISM,\ disableCertificateValidation=DISABLE_VALIDATION,\ rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置(例如gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
:Pub/Sub 订阅名称TOKEN
:Splunk 的 HTTP Event Collector 令牌URL
:Splunk 的 HTTP Event Collector 的网址路径(例如https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
:Pub/Sub 主题名称JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT
:用于向 Splunk 发送多个事件的批次大小PARALLELISM
:用于向 Splunk 发送事件的并行请求数DISABLE_VALIDATION
:如果要停用 SSL 证书验证则为true
ROOT_CA_CERTIFICATE_PATH
:Cloud Storage 中根 CA 证书的路径(例如gs://your-bucket/privateCA.crt
)
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "gs://your-bucket/temp", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME", "token": "TOKEN", "url": "URL", "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "batchCount": "BATCH_COUNT", "parallelism": "PARALLELISM", "disableCertificateValidation": "DISABLE_VALIDATION", "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置(例如gs://your-bucket/temp
)INPUT_SUBSCRIPTION_NAME
:Pub/Sub 订阅名称TOKEN
:Splunk 的 HTTP Event Collector 令牌URL
:Splunk 的 HTTP Event Collector 的网址路径(例如https://splunk-hec-host:8088
)DEADLETTER_TOPIC_NAME
:Pub/Sub 主题名称JAVASCRIPT_FUNCTION
: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称例如,如果您的 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ }
,则函数名称为myTransform
。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。PATH_TO_JAVASCRIPT_UDF_FILE
:.js
文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如gs://my-bucket/my-udfs/my_file.js
BATCH_COUNT
:用于向 Splunk 发送多个事件的批次大小PARALLELISM
:用于向 Splunk 发送事件的并行请求数DISABLE_VALIDATION
:如果要停用 SSL 证书验证则为true
ROOT_CA_CERTIFICATE_PATH
:Cloud Storage 中根 CA 证书的路径(例如gs://your-bucket/privateCA.crt
)
Pub/Sub to Avro Files on Cloud Storage
Pub/Sub to Avro files on Cloud Storage 模板是一个流处理流水线,可从 Pub/Sub 主题中读取数据,并将 Avro 文件写入指定的 Cloud Storage 存储桶。
对此流水线的要求:
- Pub/Sub 输入主题必须已存在才能执行此流水线。
模板参数
参数 | 说明 |
---|---|
inputTopic |
要订阅用来处理消息的 Pub/Sub 主题。主题名称必须采用 projects/<project-id>/topics/<topic-name> 格式。 |
outputDirectory |
要用于归档输出 Avro 文件的输出目录。末尾必须包含 / 。例如:gs://example-bucket/example-directory/ 。 |
avroTempDirectory |
临时 Avro 文件的目录。末尾必须包含 / 。例如:gs://example-bucket/example-directory/ 。 |
outputFilenamePrefix |
(可选)Avro 文件的输出文件名前缀。 |
outputFilenameSuffix |
(可选)Avro 文件的输出文件名后缀。 |
outputShardTemplate |
(可选)输出文件的分片模板。它被指定为字母 S 或 N 的重复序列。例如 SSS-NNN 。这些字母会分别替换成分片编号或分片总数。如果未指定此参数,则默认模板格式为 W-P-SS-of-NN 。 |
运行 Pub/Sub to Cloud Storage Avro 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub to Avro Files on Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \ --region REGION_NAME \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=FILENAME_PREFIX,\ outputFilenameSuffix=FILENAME_SUFFIX,\ outputShardTemplate=SHARD_TEMPLATE,\ avroTempDirectory=gs://BUCKET_NAME/temp/
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置;例如gs://your-bucket/temp
TOPIC_NAME
:Pub/Sub 主题名称BUCKET_NAME
- Cloud Storage 存储桶的名称。FILENAME_PREFIX
:首选输出文件名前缀FILENAME_SUFFIX
:首选输出文件名后缀SHARD_TEMPLATE
:首选输出分片模板
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": TEMP_LOCATION, "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME", "outputDirectory": "gs://BUCKET_NAME/output/", "avroTempDirectory": "gs://BUCKET_NAME/temp/", "outputFilenamePrefix": "FILENAME_PREFIX", "outputFilenameSuffix": "FILENAME_SUFFIX", "outputShardTemplate": "SHARD_TEMPLATE" } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置;例如gs://your-bucket/temp
TOPIC_NAME
:Pub/Sub 主题名称BUCKET_NAME
- Cloud Storage 存储桶的名称。FILENAME_PREFIX
:首选输出文件名前缀FILENAME_SUFFIX
:首选输出文件名后缀SHARD_TEMPLATE
:首选输出分片模板
Pub/Sub Topic to Text Files on Cloud Storage
Pub/Sub to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 主题读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。
对此流水线的要求:
- Pub/Sub 主题必须已存在才能执行此流水线。
- 发布到主题的消息必须采用文本格式。
- 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。
模板参数
参数 | 说明 |
---|---|
inputTopic |
要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<project-id>/topics/<topic-name> 格式。 |
outputDirectory |
用于写入输出文件的路径和文件名前缀,例如 gs://bucket-name/path/ 。该值必须以斜杠结尾。 |
outputFilenamePrefix |
要在各窗口文件上放置的前缀。例如 output- 。 |
outputFilenameSuffix |
要放置于每个窗口化文件上的后缀,通常是文件扩展名,例如 .txt 或 .csv 。 |
outputShardTemplate |
分片式模板定义每个窗口文件的动态部分。默认情况下,该管道使用单一碎片输出到各窗口内的文件系统。这意味着每个窗口的所有数据都会输出到单个文件中。outputShardTemplate 默认为 W-P-SS-of-NN ,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplate 的 SS-of-NN 部分为 00-of-01 。 |
运行 Pub/Sub Topic to Text Files on Cloud Storage 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub to Text Files on Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \ --region REGION_NAME \ --staging-location TEMP_LOCATION \ --parameters \ inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置(例如gs://your-bucket/temp
)TOPIC_NAME
:您的 Pub/Sub 主题名称BUCKET_NAME
- Cloud Storage 存储桶的名称。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text { "jobName": "JOB_NAME", "environment": { "bypassTempDirValidation": false, "tempLocation": "TEMP_LOCATION", "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
TEMP_LOCATION
:写入临时文件的位置(例如gs://your-bucket/temp
)TOPIC_NAME
:您的 Pub/Sub 主题名称BUCKET_NAME
- Cloud Storage 存储桶的名称。
Pub/Sub Topic or Subscription to Text Files on Cloud Storage
Pub/Sub Topic or Subscription to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。
对此流水线的要求:
- Pub/Sub 主题或订阅必须已存在才能执行此流水线。
- 发布到主题的消息必须采用文本格式。
- 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。
模板参数
参数 | 说明 |
---|---|
inputTopic |
要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<project-id>/topics/<topic-name> 格式。 如果提供此参数,则不应提供 inputSubscription 。 |
inputSubscription |
要从中读取输入的 Pub/Sub 订阅。订阅名称应采用 projects/<project-id>/subscription/<subscription-name> 格式。如果提供此参数,则不应提供 inputTopic 。 |
outputDirectory |
用于写入输出文件的路径和文件名前缀,例如 gs://bucket-name/path/ 。该值必须以斜杠结尾。 |
outputFilenamePrefix |
要在各窗口文件上放置的前缀。例如 output- 。 |
outputFilenameSuffix |
要放置于每个窗口化文件上的后缀,通常是文件扩展名,例如 .txt 或 .csv 。 |
outputShardTemplate |
分片式模板定义每个窗口文件的动态部分。默认情况下,该管道使用单一碎片输出到各窗口内的文件系统。这意味着每个窗口的所有数据都会输出到单个文件中。outputShardTemplate 默认为 W-P-SS-of-NN ,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplate 的 SS-of-NN 部分为 00-of-01 。 |
windowDuration |
(可选)窗口时长是将数据写入输出目录的时间间隔。请根据流水线的吞吐量配置时长。例如,较高的吞吐量可能需要较短的窗口时长,以便数据适应内存。默认为 5 分钟,最短可为 1 秒。允许的格式如下:[int]s(表示数秒,例如 5s)、[int]m(表示数分钟,例如 12m)、[int]h(表示数小时,例如 2h)。 |
运行 Pub/Sub Topic or Subscription to Text Files on Cloud Storage 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template jobs run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region REGION_NAME \ --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \ --parameters \ inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\ outputDirectory=gs://BUCKET_NAME/output/,\ outputFilenamePrefix=output-,\ outputFilenameSuffix=.txt
替换以下内容:
JOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
SUBSCRIPTION_NAME
:您的 Pub/Sub 订阅名称BUCKET_NAME
:Cloud Storage 存储桶的名称
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME" "outputDirectory": "gs://BUCKET_NAME/output/", "outputFilenamePrefix": "output-", "outputFilenameSuffix": ".txt", }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
SUBSCRIPTION_NAME
:您的 Pub/Sub 订阅名称BUCKET_NAME
:Cloud Storage 存储桶的名称
Pub/Sub to MongoDB
Pub/Sub to MongoDB 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 编码的消息并将其以文档的形式写入 MongoDB。如果需要,此流水线还能支持额外的转换,只需通过 JavaScript 用户定义的函数 (UDF) 包含这些转换即可。因架构不匹配、JOSN 格式不正确或执行转换时发生的所有错误都会记录在未处理消息的 BigQuery 表中,并随附输入消息。如果在执行之前不存在任何未处理记录的表,则流水线会自动创建该表。
对此流水线的要求:
- Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
- MongoDB 集群必须存在,并且应该可通过 Dataflow 工作器机器访问。
模板参数
参数 | 说明 |
---|---|
inputSubscription |
Pub/Sub 订阅的名称。例如:
|
mongoDBUri |
以英文逗号分隔的 MongoDB 服务器列表。例如:192.285.234.12:27017,192.287.123.11:27017 |
database |
存储集合的 MongoDB 数据库。例如:my-db 。 |
collection |
MongoDB 数据库中集合的名称。例如:my-collection 。 |
deadletterTable |
存储失败消息(架构不匹配、JSON 格式错误)的 BigQuery 表。例如:project-id:dataset-name.table-name 。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
batchSize |
(可选)用于将文档批量插入 MongoDB 的批次大小。默认值:1000 。 |
batchSizeBytes |
(可选)批次大小(以字节为单位)。默认值:5242880 。 |
maxConnectionIdleTime |
(可选)在出现连接超时之前所允许的空闲时间上限(以秒为单位)。默认值:60000 。 |
sslEnabled |
(可选)用于指示与 MongoDB 的连接是否启用了 SSL 的布尔值。默认值:true 。 |
ignoreSSLCertificate |
(可选)用于指示是否应忽略 SSL 证书的布尔值。默认值:true 。 |
withOrdered |
(可选)允许依次批量插入 MongoDB 的布尔值。默认值:true 。 |
withSSLInvalidHostNameAllowed |
(可选)用于指示是否允许 SSL 连接使用无效主机名的布尔值。默认值:true 。 |
运行 Pub/Sub to MongoDB 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub to MongoDB template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \ --parameters \ inputSubscription=INPUT_SUBSCRIPTION,\ mongoDBUri=MONGODB_URI,\ database=DATABASE, collection=COLLECTION, deadletterTable=UNPROCESSED_TABLE
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDREGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
INPUT_SUBSCRIPTION
:Pub/Sub 订阅(例如
)projects/my-project-id/subscriptions/my-subscription-id
MONGODB_URI
:MongoDB 服务器地址(例如192.285.234.12:27017,192.287.123.11:27017
)DATABASE
:MongoDB 数据库名称(例如users
)COLLECTION
:MongoDB 集合名称(例如profiles
)UNPROCESSED_TABLE
:BigQuery 表名称(例如your-project:your-dataset.your-table-name
)
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "INPUT_SUBSCRIPTION", "mongoDBUri": "MONGODB_URI", "database": "DATABASE", "collection": "COLLECTION", "deadletterTable": "UNPROCESSED_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDLOCATION
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
JOB_NAME
:您选择的唯一性作业名称VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
INPUT_SUBSCRIPTION
:Pub/Sub 订阅(例如
)projects/my-project-id/subscriptions/my-subscription-id
MONGODB_URI
:MongoDB 服务器地址(例如192.285.234.12:27017,192.287.123.11:27017
)DATABASE
:MongoDB 数据库名称(例如users
)COLLECTION
:MongoDB 集合名称(例如profiles
)UNPROCESSED_TABLE
:BigQuery 表名称(例如your-project:your-dataset.your-table-name
)
Pub/Sub to Elasticsearch
Pub/Sub to Elasticsearch 模板是一种流处理流水线,可从 Pub/Sub 订阅读取消息、执行用户定义的函数 (UDF) 并将其作为文档写入 Elasticsearch。Dataflow 模板使用 Elasticsearch 的数据流功能跨多个索引存储时间序列数据,同时为请求提供单个命名资源。数据流非常适合存储在 Pub/Sub 中的日志、指标、跟踪记录和其他持续生成的数据。
对此流水线的要求
- 来源 Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
- GCP 实例上或 Elastic Cloud 上可公开访问的 Elasticsearch 主机,用于 Elasticsearch 7.0 或更高版本。如需了解详情,请参阅适用于 Elastic 的 Google Cloud 集成。
- 用于错误输出的 Pub/Sub 主题。
模板参数
参数 | 说明 |
---|---|
inputSubscription |
要使用的 Pub/Sub 订阅。该名称应采用 projects/<project-id>/subscriptions/<subscription-name> 格式。 |
connectionUrl |
Elasticsearch 网址,格式为 https://hostname:[port] 或指定 CloudID(如果使用 Elastic Cloud)。 |
apiKey |
用于身份验证的 Base64 编码 API 密钥。 |
errorOutputTopic |
Pub/Sub 输出主题,用于发布失败的记录,格式为 projects/<project-id>/topics/<topic-name> |
dataset |
(可选)通过 Pub/Sub 发送的日志类型,我们为其提供了开箱即用的信息中心。已知日志类型值为 audit、vpcflow 和 firewall。默认值:pubsub 。 |
namespace |
(可选)任意分组,例如环境(dev、prod 或 qa)、团队或战略性业务部门。默认值:default 。 |
batchSize |
(可选)文档数量中的批次大小。默认值:1000 。 |
batchSizeBytes |
(可选)批次大小(以字节为单位)。默认值:5242880 (5mb)。 |
maxRetryAttempts |
(可选)尝试次数上限,必须大于 0。默认值:no retries 。 |
maxRetryDuration |
(可选)重试时长上限(以毫秒为单位),必须大于 0。默认值:no retries 。 |
javascriptTextTransformGcsPath |
(可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js 。 |
javascriptTextTransformFunctionName |
(可选)
您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。
例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ } ,则函数名称为 myTransform 。如需查看 JavaScript UDF 示例,请参阅 UDF 示例。
|
propertyAsIndex |
(可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _index 元数据(优先于 _index UDF)。默认值:none。 |
propertyAsId |
(可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _id 元数据(优先于 _id UDF)。默认值:none。 |
javaScriptIndexFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。 |
javaScriptIndexFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。 |
javaScriptIdFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。 |
javaScriptIdFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。 |
javaScriptTypeFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。 |
javaScriptTypeFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。 |
javaScriptIsDeleteFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true" 或 "false" 。默认值:none。 |
javaScriptIsDeleteFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true" 或 "false" 。默认值:none。 |
usePartialUpdate |
(可选)是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或索引,允许部分文档)。默认值:false 。 |
bulkInsertMethod |
(可选)在 Elasticsearch 批量请求中使用 INDEX (索引,允许执行更新插入操作)还是 CREATE (创建,会对重复 _id 报错)。默认值:CREATE 。 |
运行 Pub/Sub to Elasticsearch 模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Pub/Sub to Elasticsearch template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud beta dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域端点,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用- 版本名称(如
2021-09-20-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (