Google 提供的 Dataflow 流处理模板

使用集合让一切井井有条 根据您的偏好保存内容并对其进行分类。

Google 提供了一组开源 Dataflow 模板。

这些 Dataflow 模板可帮助您解决大型数据任务,包括数据导入、数据导出、数据备份、数据恢复和批量 API 操作,所有这些均无需使用专用开发环境。这些模板基于 Apache Beam 构建,并使用 Dataflow 转换数据。

如需了解有关模板的一般信息,请参阅 Dataflow 模板。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板

本指南介绍了流式模板。

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

对此流水线的要求

  • Pub/Sub 消息的 data 字段必须使用 JSON 格式(如此 JSON 指南中所述)。例如,您可以将 data 字段中值为 {"k1":"v1", "k2":"v2"} 格式的消息插入具有名为 k1k2 的列且数据类型为字符串的 BigQuery 表中。
  • 在运行该流水线之前,输出表必须已存在。表架构必须与输入 JSON 对象相匹配。

模板参数

参数 说明
inputSubscription 要读取的 Pub/Sub 输入订阅,格式为 projects/<project>/subscriptions/<subscription>
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
outputDeadletterTable 未能到达输出表的消息的 BigQuery 表,格式为 <my-project>:<my-dataset>.<my-table>。如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 OUTPUT_TABLE_SPEC_error_records
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

运行 Pub/Sub Subscription to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Subscription to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery 模板是一种流处理流水线,可从 Cloud Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

对此流水线的要求

  • Pub/Sub 消息的 data 字段必须使用 JSON 格式(如此 JSON 指南中所述)。例如,您可以将 data 字段中值为 {"k1":"v1", "k2":"v2"} 格式的消息插入具有名为 k1k2 的列且数据类型为字符串的 BigQuery 表中。
  • 在运行该流水线之前,输出表必须已存在。表架构必须与输入 JSON 对象相匹配。

模板参数

参数 说明
inputTopic 要读取的 Cloud Pub/Sub 输入主题,格式为 projects/<project>/topics/<topic>
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
outputDeadletterTable 无法到达输出表的消息的 BigQuery 表。格式应为 <my-project>:<my-dataset>.<my-table>。 如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

运行 Cloud Pub/Sub Topic to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Topic to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。

对此流水线的要求

  • 用作输入来源的 Pub/Sub 订阅必须存在。
  • Avro 记录的架构文件必须存在于 Cloud Storage 存储空间中。
  • 未处理的 Pub/Sub 主题必须存在。
  • 用作输出目标的 BigQuery 数据集必须已存在。

模板参数

参数 说明
schemaPath Avro 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/schema.avsc
inputSubscription 要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription>
outputTopic 要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 输出表位置。例如 <my-project>:<my-dataset>.<my-table>。 根据指定的 createDisposition,系统可能会使用用户提供的 Avro 架构自动创建输出表。
writeDisposition (可选)BigQuery WriteDisposition。例如 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认:WRITE_APPEND
createDisposition (可选)BigQuery CreateDisposition。例如 CREATE_IF_NEEDEDCREATE_NEVER。默认:CREATE_IF_NEEDED

运行 Pub/Sub Avro to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Avro to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Avro 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • DEADLETTER_TOPIC:要用于未处理的队列的 Pub/Sub 主题

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Avro 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • DEADLETTER_TOPIC:要用于未处理的队列的 Pub/Sub 主题

Pub/Sub Proto to BigQuery

Pub/Sub proto to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。

可以提供 JavaScript 用户定义函数 (UDF) 来转换数据。可以将在执行 UDF 期间发生的错误发送到单独的 Pub/Sub 主题或与 BigQuery 错误相同的未处理主题。

对此流水线的要求

  • 用作输入来源的 Pub/Sub 订阅必须存在。
  • Proto 记录的架构文件必须存在于 Cloud Storage 中。
  • 输出 Pub/Sub 主题必须存在。
  • 用作输出目标的 BigQuery 数据集必须已存在。
  • 如果 BigQuery 表存在,则无论 createDisposition 值如何,该表都必须具有与 proto 数据匹配的架构。

模板参数

参数 说明
protoSchemaPath 独立的 proto 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/file.pb。 您可以使用 protoc 命令的 --descriptor_set_out 标志生成此文件。--include_imports 标志可确保文件是独立的。
fullMessageName 完整的 proto 消息名称。例如 package.name.MessageName,其中 package.name 是为 package 语句(而不是 java_package 语句)提供的值。
inputSubscription 要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription>
outputTopic 要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 输出表位置。例如 my-project:my_dataset.my_table。 根据指定的 createDisposition,系统可能会使用输入架构文件自动创建输出表。
preserveProtoFieldNames (可选)true 用于保留 JSON 中的原始 Proto 字段名称。false 用于使用更多标准 JSON 名称。例如,false 会将 field_name 更改为 fieldName。(默认:false
bigQueryTableSchemaPath (可选)BigQuery 架构路径到 Cloud Storage 路径。例如 gs://path/to/my/schema.json。如果未提供,则根据 Proto 架构推断架构。
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
udfOutputTopic (可选)存储 UDF 错误的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>。如果未提供,则会将 UDF 错误发送到 outputTopic 所在的主题。
writeDisposition (可选)BigQuery WriteDisposition。例如 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认值:WRITE_APPEND
createDisposition (可选)BigQuery CreateDisposition。例如 CREATE_IF_NEEDEDCREATE_NEVER。默认值为 CREATE_IF_NEEDED

运行 Pub/Sub Proto to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Proto to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Proto 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.pb
  • PROTO_MESSAGE_NAME:Proto 消息名称(例如 package.name.MessageName
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • UNPROCESSED_TOPIC:要用于未处理的队列的 Pub/Sub 主题

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Proto 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.pb
  • PROTO_MESSAGE_NAME:Proto 消息名称(例如 package.name.MessageName
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • UNPROCESSED_TOPIC:要用于未处理的队列的 Pub/Sub 主题

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并将这些消息写入其他 Pub/Sub 主题。该流水线还接受一个可选的消息属性键以及值,该值可用于过滤应写入 Pub/Sub 主题的消息。您可以使用此模板将消息从 Pub/Sub 订阅复制到带有可选消息过滤器的其他 Pub/Sub 主题。

对此流水线的要求

  • 源 Pub/Sub 订阅必须已存在才能执行流水线。
  • 源 Pub/Sub 订阅必须是拉取订阅
  • 目的地 Pub/Sub 主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name>
outputTopic 要将输出写入其中的 Cloud Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
filterKey (可选)根据特性键过滤事件。如果未指定 filterKey,则不会应用过滤器。
filterValue (可选)提供了 filterKey 时要使用的过滤器特性值。默认情况下,filterValue 为 null。

运行 Pub/Sub to Pub/Sub 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Pub/Sub template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOPIC_NAME:Pub/Sub 主题名称
  • FILTER_KEY:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。
  • FILTER_VALUE:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。不过滤部分匹配(如子字符串)。默认使用 null 事件过滤值。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOPIC_NAME:Pub/Sub 主题名称
  • FILTER_KEY:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。
  • FILTER_VALUE:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。不过滤部分匹配(如子字符串)。默认使用 null 事件过滤值。

Pub/Sub to Splunk

Pub/Sub to Splunk 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并通过 Splunk 的 HTTP Event Collector (HEC) 将消息载荷写入 Splunk。此模板的最常见使用场景是将日志导出到 Splunk。如需查看底层工作流的示例,请参阅使用 Dataflow 将支持生产环境的日志导出部署到 Splunk

在写入 Splunk 之前,您还可以将 JavaScript 用户定义函数应用于消息载荷。任何未能成功处理的消息都会被转发到 Pub/Sub 未处理主题,以便进一步进行问题排查并重新处理。

要为 HEC 令牌提供额外保护,您还可以传入 Cloud KMS 密钥和使用 Cloud KMS 密钥加密的 base64 编码 HEC 令牌参数。如需详细了解如何对 HEC 令牌参数进行加密,请参阅 Cloud KMS API 加密端点

对此流水线的要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Splunk HEC 端点必须可从 Dataflow 工作器的网络访问。
  • Splunk HEC 令牌必须已生成并且可用。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name>
token (可选)Splunk HEC 身份验证令牌。如果 tokenSource 设置为 PlaINTEXT 或 KMS,则必须提供。
url Splunk HEC 网址,必须可从运行流水线的 VPC 路由。例如,https://splunk-hec-host:8088。
outputDeadletterTopic 用于转发无法递送的消息的 Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
batchCount (可选)向 Splunk 发送多个事件的批次大小。默认值为 1(无批处理)。
parallelism (可选)最大并行请求数。默认值为 1(无并行)。
disableCertificateValidation (可选)停用 SSL 证书验证。默认为 false(已启用验证)。 如果为 true,则不验证证书(所有证书均受信任),并且忽略“rootCaCertificatePath”参数。
includePubsubMessage (可选)在载荷中包含完整的 Pub/Sub 消息。默认值为 false(只有数据元素包含在载荷中)。
tokenSource 令牌的来源。PLAINTEXT、KMS 或 SECRET_MANAGER 之一。如果使用了 Secret Manager,则必须提供此参数。如果 tokenSource 设置为 KMS,则必须提供 tokenKMSEncryptionKey 和加密的 token。如果 tokenSource 设置为 SECRET_MANAGER,则必须提供 tokenSecretId。如果 tokenSource 设置为 DLPINTEXT,则必须提供 token
tokenKMSEncryptionKey (可选)用于解密 HEC 令牌字符串的 Cloud KMS 密钥。 如果 tokenSource 设置为 KMS,则必须提供此参数。如果提供了 Cloud KMS 密钥,则必须以加密方式传递 HEC 令牌字符串。
tokenSecretId (可选)令牌的 Secret Manager Secret ID。如果 tokenSource 设置为 SECRET_MANAGER,则必须提供此参数。应采用以下格式:projects/<project-id>/secrets/<secret-name>/versions/<secret-version>
rootCaCertificatePath (可选)Cloud Storage 中根 CA 证书的完整网址。例如 gs://mybucket/mycerts/privateCA.crt。Cloud Storage 中提供的证书必须采用 DER 编码,并且可能以二进制或可打印 (Base64) 编码提供。如果证书是使用 Base64 编码提供的,则它必须以 -----BEGIN CERTIFICATE----- 开头为界,并且必须以 -----END CERTIFICATE----- 结尾为界。如果提供此参数,系统会提取此私有 CA 证书文件并将其添加到 Dataflow 工作器的信任库,以便验证 Splunk HEC 端点的 SSL 证书。如果未提供此参数,则使用默认信任库。
enableBatchLogs (可选)指定是否应为写入 Splunk 的批次启用日志。默认值:true
enableGzipHttpCompression (可选)指定是否应压缩发送到 Splunk HEC 的 HTTP 请求(gzip 内容编码)。默认值:true

运行 Pub/Sub to Splunk 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Splunk template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
  • ROOT_CA_CERTIFICATE_PATH:Cloud Storage 中根 CA 证书的路径(例如 gs://your-bucket/privateCA.crt

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
  • ROOT_CA_CERTIFICATE_PATH:Cloud Storage 中根 CA 证书的路径(例如 gs://your-bucket/privateCA.crt

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files on Cloud Storage 模板是一个流处理流水线,可从 Pub/Sub 主题中读取数据,并将 Avro 文件写入指定的 Cloud Storage 存储桶。

对此流水线的要求

  • Pub/Sub 输入主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputTopic 要订阅用来处理消息的 Pub/Sub 主题。主题名称必须采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 要用于归档输出 Avro 文件的输出目录。末尾必须包含 /。例如:gs://example-bucket/example-directory/
avroTempDirectory 临时 Avro 文件的目录。末尾必须包含 /。例如:gs://example-bucket/example-directory/
outputFilenamePrefix (可选)Avro 文件的输出文件名前缀。
outputFilenameSuffix (可选)Avro 文件的输出文件名后缀。
outputShardTemplate (可选)输出文件的分片模板。它被指定为字母 SN 的重复序列。例如 SSS-NNN。这些字母会分别替换成分片编号或分片总数。如果未指定此参数,则默认模板格式为 W-P-SS-of-NN

运行 Pub/Sub to Cloud Storage Avro 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Avro Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置;例如 gs://your-bucket/temp
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置;例如 gs://your-bucket/temp
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板

Pub/Sub Topic to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 主题读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。

对此流水线的要求

  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 发布到主题的消息必须采用文本格式。
  • 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。

模板参数

参数 说明
inputTopic 要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 用于写入输出文件的路径和文件名前缀,例如 gs://bucket-name/path/。该值必须以斜杠结尾。
outputFilenamePrefix 要在各窗口文件上放置的前缀。例如 output-
outputFilenameSuffix 要放置于每个窗口化文件上的后缀,通常是文件扩展名,例如 .txt.csv
outputShardTemplate 分片式模板定义每个窗口文件的动态部分。默认情况下,该管道使用单一碎片输出到各窗口内的文件系统。这意味着每个窗口的所有数据都会输出到单个文件中。outputShardTemplate 默认为 W-P-SS-of-NN,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplateSS-of-NN 部分为 00-of-01

运行 Pub/Sub Topic to Text Files on Cloud Storage 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。

Pub/Sub Topic or Subscription to Text Files on Cloud Storage

Pub/Sub Topic or Subscription to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。

对此流水线的要求

  • Pub/Sub 主题或订阅必须已存在才能执行此流水线。
  • 发布到主题的消息必须采用文本格式。
  • 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。

模板参数

参数 说明
inputTopic 要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<project-id>/topics/<topic-name> 格式。 如果提供此参数,则不应提供 inputSubscription
inputSubscription 要从中读取输入的 Pub/Sub 订阅。订阅名称应采用 projects/<project-id>/subscription/<subscription-name> 格式。如果提供此参数,则不应提供 inputTopic
outputDirectory 用于写入输出文件的路径和文件名前缀,例如 gs://bucket-name/path/。该值必须以斜杠结尾。
outputFilenamePrefix 要在各窗口文件上放置的前缀。例如 output-
outputFilenameSuffix 要放置于每个窗口化文件上的后缀,通常是文件扩展名,例如 .txt.csv
outputShardTemplate 分片式模板定义每个窗口文件的动态部分。默认情况下,该管道使用单一碎片输出到各窗口内的文件系统。这意味着每个窗口的所有数据都会输出到单个文件中。outputShardTemplate 默认为 W-P-SS-of-NN,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplateSS-of-NN 部分为 00-of-01
windowDuration (可选)窗口时长是将数据写入输出目录的时间间隔。请根据流水线的吞吐量配置时长。例如,较高的吞吐量可能需要较短的窗口时长,以便数据适应内存。默认为 5 分钟,最短可为 1 秒。允许的格式如下:[int]s(表示数秒,例如 5s)、[int]m(表示数分钟,例如 12m)、[int]h(表示数小时,例如 2h)。

运行 Pub/Sub Topic or Subscription to Text Files on Cloud Storage 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template jobs run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称

Pub/Sub to MongoDB

Pub/Sub to MongoDB 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 编码的消息并将其以文档的形式写入 MongoDB。如果需要,此流水线还能支持额外的转换,只需通过 JavaScript 用户定义的函数 (UDF) 包含这些转换即可。因架构不匹配、JOSN 格式不正确或执行转换时发生的所有错误都会记录在未处理消息的 BigQuery 表中,并随附输入消息。如果在执行之前不存在任何未处理记录的表,则流水线会自动创建该表。

对此流水线的要求

  • Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • MongoDB 集群必须存在,并且应该可通过 Dataflow 工作器机器访问。

模板参数

参数 说明
inputSubscription Pub/Sub 订阅的名称。例如:projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri 以英文逗号分隔的 MongoDB 服务器列表。例如:192.285.234.12:27017,192.287.123.11:27017
database 存储集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
deadletterTable 存储失败消息(架构不匹配、JSON 格式错误)的 BigQuery 表。例如:project-id:dataset-name.table-name
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
batchSize (可选)用于将文档批量插入 MongoDB 的批次大小。默认值:1000
batchSizeBytes (可选)批次大小(以字节为单位)。默认值:5242880
maxConnectionIdleTime (可选)在出现连接超时之前所允许的空闲时间上限(以秒为单位)。默认值:60000
sslEnabled (可选)用于指示与 MongoDB 的连接是否启用了 SSL 的布尔值。默认值:true
ignoreSSLCertificate (可选)用于指示是否应忽略 SSL 证书的布尔值。默认值:true
withOrdered (可选)允许依次批量插入 MongoDB 的布尔值。默认值:true
withSSLInvalidHostNameAllowed (可选)用于指示是否允许 SSL 连接使用无效主机名的布尔值。默认值:true

运行 Pub/Sub to MongoDB 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to MongoDB template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name

Pub/Sub to Elasticsearch

Pub/Sub to Elasticsearch 模板是一种流处理流水线,可从 Pub/Sub 订阅读取消息、执行用户定义的函数 (UDF) 并将其作为文档写入 Elasticsearch。Dataflow 模板使用 Elasticsearch 的数据流功能跨多个索引存储时间序列数据,同时为请求提供单个命名资源。数据流非常适合存储在 Pub/Sub 中的日志、指标、跟踪记录和其他持续生成的数据。

对此流水线的要求

  • 来源 Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • GCP 实例上或 Elastic Cloud 上可公开访问的 Elasticsearch 主机,用于 Elasticsearch 7.0 或更高版本。如需了解详情,请参阅适用于 Elastic 的 Google Cloud 集成
  • 用于错误输出的 Pub/Sub 主题。

模板参数

参数 说明
inputSubscription 要使用的 Pub/Sub 订阅。该名称应采用 projects/<project-id>/subscriptions/<subscription-name> 格式。
connectionUrl Elasticsearch 网址,格式为 https://hostname:[port] 或指定 CloudID(如果使用 Elastic Cloud)。
apiKey 用于身份验证的 Base64 编码 API 密钥。
errorOutputTopic Pub/Sub 输出主题,用于发布失败的记录,格式为 projects/<project-id>/topics/<topic-name>
dataset (可选)通过 Pub/Sub 发送的日志类型,我们为其提供了开箱即用的信息中心。已知日志类型值为 audit、vpcflow 和 firewall。默认值:pubsub
namespace (可选)任意分组,例如环境(dev、prod 或 qa)、团队或战略性业务部门。默认值:default
batchSize (可选)文档数量中的批次大小。默认值:1000
batchSizeBytes (可选)批次大小(以字节为单位)。默认值:5242880 (5mb)。
maxRetryAttempts (可选)尝试次数上限,必须大于 0。默认值:no retries
maxRetryDuration (可选)重试时长上限(以毫秒为单位),必须大于 0。默认值:no retries
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
propertyAsIndex (可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _index 元数据(优先于 _index UDF)。默认值:none。
propertyAsId (可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _id 元数据(优先于 _id UDF)。默认值:none。
javaScriptIndexFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。
javaScriptIndexFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。
javaScriptIdFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。
javaScriptIdFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。
javaScriptTypeFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。
javaScriptTypeFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。
javaScriptIsDeleteFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true""false"。默认值:none。
javaScriptIsDeleteFnName (可选)函数的 UDF JavaScript 函数名称,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true""false"。默认值:none。
usePartialUpdate (可选)是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或索引,允许部分文档)。默认值:false
bulkInsertMethod (可选)在 Elasticsearch 批量请求中使用 INDEX(索引,允许执行更新插入操作)还是 CREATE(创建,会对重复 _id 报错)。默认值:CREATE

运行 Pub/Sub to Elasticsearch 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Elasticsearch template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (