Google 提供的流处理模板

Google 提供了一组开源 Dataflow 模板。如需了解有关模板的一般信息,请参阅 Dataflow 模板。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板

本指南介绍了流式模板。

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

对此流水线的要求

  • Pub/Sub 消息必须采用 JSON 格式(请参阅此处)。例如,您可以将 {"k1":"v1", "k2":"v2"} 格式的消息插入具有两个列(k1k2)且数据类型为字符串的 BigQuery 表格中。
  • 在运行该流水线之前,输出表必须已存在。 表架构必须与输入 JSON 对象相匹配。

模板参数

参数 说明
inputSubscription 要读取的 Pub/Sub 输入订阅,格式为 projects/<project>/subscriptions/<subscription>
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
outputDeadletterTable 未能到达输出表的消息的 BigQuery 表,格式为 <my-project>:<my-dataset>.<my-table>。如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 OUTPUT_TABLE_SPEC_error_records
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

运行 Pub/Sub Subscription to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Subscription to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery 模板是一种流处理流水线,可从 Cloud Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

对此流水线的要求

  • Pub/Sub 消息必须采用 JSON 格式(请参阅此处)。例如,您可以将 {"k1":"v1", "k2":"v2"} 格式的消息插入具有两个列(k1k2)且数据类型为字符串的 BigQuery 表格中。
  • 在运行该流水线之前,输出表必须已存在。 表架构必须与输入 JSON 对象相匹配。

模板参数

参数 说明
inputTopic 要读取的 Cloud Pub/Sub 输入主题,格式为 projects/<project>/topics/<topic>
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
outputDeadletterTable 无法到达输出表的消息的 BigQuery 表。格式应为 <my-project>:<my-dataset>.<my-table>。 如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

运行 Cloud Pub/Sub Topic to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Topic to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。

对此流水线的要求

  • 用作输入来源的 Pub/Sub 订阅必须存在。
  • Avro 记录的架构文件必须存在于 Cloud Storage 存储空间中。
  • 未处理的 Pub/Sub 主题必须存在。
  • 用作输出目标的 BigQuery 数据集必须已存在。

模板参数

参数 说明
schemaPath Avro 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/schema.avsc
inputSubscription 要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription>
outputTopic 要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 输出表位置。例如 <my-project>:<my-dataset>.<my-table>。 根据指定的 createDisposition,系统可能会使用用户提供的 Avro 架构自动创建输出表。
writeDisposition (可选)BigQuery WriteDisposition。例如 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认:WRITE_APPEND
createDisposition (可选)BigQuery CreateDisposition。例如 CREATE_IF_NEEDEDCREATE_NEVER。默认:CREATE_IF_NEEDED

运行 Pub/Sub Avro to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Avro to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Avro 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • DEADLETTER_TOPIC:要用于未处理的队列的 Pub/Sub 主题

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Avro 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • DEADLETTER_TOPIC:要用于未处理的队列的 Pub/Sub 主题

Pub/Sub Proto to BigQuery

Pub/Sub proto to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。

可以提供 JavaScript 用户定义函数 (UDF) 来转换数据。可以将在执行 UDF 期间发生的错误发送到单独的 Pub/Sub 主题或与 BigQuery 错误相同的未处理主题。

对此流水线的要求

  • 用作输入来源的 Pub/Sub 订阅必须存在。
  • Proto 记录的架构文件必须存在于 Cloud Storage 中。
  • 输出 Pub/Sub 主题必须存在。
  • 用作输出目标的 BigQuery 数据集必须已存在。
  • 如果 BigQuery 表存在,则无论 createDisposition 值如何,该表都必须具有与 proto 数据匹配的架构。

模板参数

参数 说明
protoSchemaPath 独立的 proto 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/file.pb。 您可以使用 protoc 命令的 --descriptor_set_out 标志生成此文件。--include-imports 标志可确保文件是独立的。
fullMessageName 完整的 proto 消息名称。例如 package.name.MessageName,其中 package.name 是为 package 语句(而不是 java_package 语句)提供的值。
inputSubscription 要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription>
outputTopic 要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 输出表位置。例如 my-project:my_dataset.my_table。 根据指定的 createDisposition,系统可能会使用输入架构文件自动创建输出表。
preserveProtoFieldNames (可选)true 用于保留 JSON 中的原始 Proto 字段名称。false 用于使用更多标准 JSON 名称。例如,false 会将 field_name 更改为 fieldName。(默认:false
bigQueryTableSchemaPath (可选)BigQuery 架构路径到 Cloud Storage 路径。例如 gs://path/to/my/schema.json。如果未提供,系统将根据 Proto 架构推断架构。
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
udfOutputTopic (可选)存储 UDF 错误的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>。如果未提供,则 UDF 错误将发送到与 outputTopic 相同的主题。
writeDisposition (可选)BigQuery WriteDisposition。例如 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认值:WRITE_APPEND
createDisposition (可选)BigQuery CreateDisposition。例如 CREATE_IF_NEEDEDCREATE_NEVER。默认值为 CREATE_IF_NEEDED

运行 Cloud Pub/Sub Topic to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Proto to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Proto 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.pb
  • PROTO_MESSAGE_NAME:Proto 消息名称(例如 package.name.MessageName
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • UNPROCESSED_TOPIC:要用于未处理的队列的 Pub/Sub 主题

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Proto 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.pb
  • PROTO_MESSAGE_NAME:Proto 消息名称(例如 package.name.MessageName
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • UNPROCESSED_TOPIC:要用于未处理的队列的 Pub/Sub 主题

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并将这些消息写入其他 Pub/Sub 主题。该流水线还接受一个可选的消息属性键以及值,该值可用于过滤应写入 Pub/Sub 主题的消息。您可以使用此模板将消息从 Pub/Sub 订阅复制到带有可选消息过滤器的其他 Pub/Sub 主题。

对此流水线的要求

  • 源 Pub/Sub 订阅必须已存在才能执行流水线。
  • 目的地 Pub/Sub 主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name>
outputTopic 要将输出写入其中的 Cloud Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
filterKey (可选)根据特性键过滤事件。如果未指定 filterKey,则不会应用过滤器。
filterValue (可选)提供了 filterKey 时要使用的过滤器特性值。默认情况下,filterValue 为 null。

运行 Pub/Sub to Pub/Sub 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Pub/Sub template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOPIC_NAME:Pub/Sub 主题名称
  • FILTER_KEY:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。
  • FILTER_VALUE:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。部分匹配(如子字符串)不会被过滤。默认使用 null 事件过滤值。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOPIC_NAME:Pub/Sub 主题名称
  • FILTER_KEY:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。
  • FILTER_VALUE:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。部分匹配(如子字符串)不会被过滤。默认使用 null 事件过滤值。

Pub/Sub to Splunk

Pub/Sub to Splunk 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并通过 Splunk 的 HTTP Event Collector (HEC) 将消息载荷写入 Splunk。此模板的最常见使用场景是将日志导出到 Splunk。如需查看底层工作流的示例,请参阅使用 Dataflow 将支持生产环境的日志导出部署到 Splunk

在写入 Splunk 之前,您还可以将 JavaScript 用户定义函数应用于消息载荷。任何未能成功处理的的消息都会被转发到 Pub/Sub 未处理主题,以便进一步进行问题排查并重新处理。

要为 HEC 令牌提供额外保护,您还可以传入 Cloud KMS 密钥和使用 Cloud KMS 密钥加密的 base64 编码 HEC 令牌参数。如需详细了解如何对 HEC 令牌参数进行加密,请参阅 Cloud KMS API 加密端点

对此流水线的要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Splunk HEC 端点必须可从 Dataflow 工作器的网络访问。
  • Splunk HEC 令牌必须已生成并且可用。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name>
token Splunk HEC 身份验证令牌。此 base64 编码的字符串可以使用 Cloud KMS 密钥进行加密,以提高安全性。
url Splunk HEC 网址,必须可从运行流水线的 VPC 路由。例如,https://splunk-hec-host:8088。
outputDeadletterTopic 用于转发无法递送的消息的 Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
batchCount (可选)向 Splunk 发送多个事件的批次大小。默认值为 1(无批处理)。
parallelism (可选)最大并行请求数。默认值为 1(无并行)。
disableCertificateValidation (可选)停用 SSL 证书验证。默认为 false(已启用验证)。 如果为 true,则不验证证书(所有证书均受信任),并且忽略“rootCaCertificatePath”参数。
includePubsubMessage (可选)在载荷中包含完整的 Pub/Sub 消息。默认值为 false(只有数据元素包含在载荷中)。
tokenKMSEncryptionKey (可选)用于解密 HEC 令牌字符串的 Cloud KMS 密钥。如果提供了 Cloud KMS 密钥,则必须以加密方式传递 HEC 令牌字符串。
rootCaCertificatePath (可选)Cloud Storage 中根 CA 证书的完整网址。例如 gs://mybucket/mycerts/privateCA.crt。Cloud Storage 中提供的证书必须采用 DER 编码,并且可能以二进制或可打印 (Base64) 编码提供。如果证书是使用 Base64 编码提供的,则它必须以 -----BEGIN CERTIFICATE----- 开头为界,并且必须以 -----END CERTIFICATE----- 结尾为界。如果提供此参数,系统将提取此私有 CA 证书文件并将其添加到 Dataflow 工作器的信任库,以便验证 Splunk HEC 端点的 SSL 证书。如果未提供此参数,则使用默认信任库。

运行 Pub/Sub to Splunk 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Splunk template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
  • ROOT_CA_CERTIFICATE_PATH:Cloud Storage 中根 CA 证书的路径(例如 gs://your-bucket/privateCA.crt

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
  • ROOT_CA_CERTIFICATE_PATH:Cloud Storage 中根 CA 证书的路径(例如 gs://your-bucket/privateCA.crt

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files on Cloud Storage 模板是一个流处理流水线,可从 Pub/Sub 主题中读取数据,并将 Avro 文件写入指定的 Cloud Storage 存储分区。

对此流水线的要求

  • Pub/Sub 输入主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputTopic 要订阅用来处理消息的 Pub/Sub 主题。主题名称必须采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 要用于归档输出 Avro 文件的输出目录。末尾必须包含 /。例如:gs://example-bucket/example-directory/
avroTempDirectory 临时 Avro 文件的目录。末尾必须包含 /。例如:gs://example-bucket/example-directory/
outputFilenamePrefix (可选)Avro 文件的输出文件名前缀。
outputFilenameSuffix (可选)Avro 文件的输出文件名后缀。
outputShardTemplate (可选)输出文件的分片模板。它被指定为字母 SN 的重复序列。例如 SSS-NNN。这些字母会分别替换成分片编号或分片总数。如果未指定此参数,则默认模板格式为 W-P-SS-of-NN

运行 Pub/Sub to Cloud Storage Avro 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Avro Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置;例如 gs://your-bucket/temp
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置;例如 gs://your-bucket/temp
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板

Pub/Sub to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。

对此流水线的要求

  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 发布到主题的消息必须采用文本格式。
  • 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。

模板参数

参数 说明
inputTopic 要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 用于写入输出文件的路径和文件名前缀,例如 gs://bucket-name/path/。该值必须以斜杠结尾。
outputFilenamePrefix 要在各窗口文件上放置的前缀。例如 output-
outputFilenameSuffix 要放置于每个窗口化文件上的后缀,通常是文件扩展名,例如 .txt.csv
outputShardTemplate 分片式模板定义每个窗口文件的动态部分。默认情况下,该管道使用单一碎片输出到各窗口内的文件系统。这意味着所有数据都将按窗口归入单个文件中。outputShardTemplate 默认为 W-P-SS-of-NN,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplateSS-of-NN 部分将为 00-of-01

运行 Pub/Sub to Text Files on Cloud Storage 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。

Pub/Sub to MongoDB

Pub/Sub to MongoDB 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 编码的消息并将其以文档的形式写入 MongoDB。如果需要,此流水线还能支持额外的转换,只需通过 JavaScript 用户定义的函数 (UDF) 包含这些转换即可。因架构不匹配、JOSN 格式不正确或执行转换时发生的所有错误都会记录在未处理消息的 BigQuery 表中,并随附输入消息。如果在执行之前不存在任何未处理记录的表,则流水线会自动创建该表。

对此流水线的要求

  • Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • MongoDB 集群必须存在,并且应该可通过 Dataflow 工作器机器访问。

模板参数

参数 说明
inputSubscription Pub/Sub 订阅的名称。例如:projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri 以英文逗号分隔的 MongoDB 服务器列表。例如:192.285.234.12:27017,192.287.123.11:27017
database 存储集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
deadletterTable 存储失败消息(架构不匹配、JSON 格式错误)的 BigQuery 表。例如:project-id:dataset-name.table-name
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
batchSize (可选)用于将文档批量插入 MongoDB 的批次大小。默认值:1000
batchSizeBytes (可选)批次大小(以字节为单位)。默认值:5242880
maxConnectionIdleTime (可选)在出现连接超时之前所允许的空闲时间上限(以秒为单位)。默认值:60000
sslEnabled (可选)用于指示与 MongoDB 的连接是否启用了 SSL 的布尔值。默认值:true
ignoreSSLCertificate (可选)用于指示是否应忽略 SSL 证书的布尔值。默认值:true
withOrdered (可选)允许依次批量插入 MongoDB 的布尔值。默认值:true
withSSLInvalidHostNameAllowed (可选)用于指示是否允许 SSL 连接使用无效主机名的布尔值。默认值:true

运行 Pub/Sub to MongoDB 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to MongoDB template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name

Pub/Sub to Elasticsearch

Pub/Sub to Elasticsearch 模板是一种流处理流水线,可从 Pub/Sub 订阅读取消息、执行用户定义的函数 (UDF) 并将其作为文档写入 Elasticsearch。Dataflow 模板使用 Elasticsearch 的数据流功能跨多个索引存储时间序列数据,同时为请求提供单个命名资源。数据流非常适合存储在 Pub/Sub 中的日志、指标、跟踪记录和其他持续生成的数据。

对此流水线的要求

  • 来源 Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • GCP 实例上或 Elastic Cloud 上可公开访问的 Elasticsearch 主机,用于 Elasticsearch 7.0 或更高版本。如需了解详情,请参阅适用于 Elastic 的 Google Cloud 集成
  • 用于错误输出的 Pub/Sub 主题。

模板参数

参数 说明
inputSubscription 要使用的 Pub/Sub 订阅。该名称应采用 projects/<project-id>/subscriptions/<subscription-name> 格式。
connectionUrl Elasticsearch 网址,格式为 https://hostname:[port] 或指定 CloudID(如果使用 Elastic Cloud)。
apiKey 用于身份验证的 Base64 编码 API 密钥。
errorOutputTopic Pub/Sub 输出主题,用于发布失败的记录,格式为 projects/<project-id>/topics/<topic-name>
dataset (可选)通过 Pub/Sub 发送的日志类型,我们为其提供了开箱即用的信息中心。已知日志类型值为 audit、vpcflow 和 firewall。默认值:pubsub
namespace (可选)任意分组,例如环境(dev、prod 或 qa)、团队或战略性业务部门。默认值:default
batchSize (可选)文档数量中的批次大小。默认值:1000
batchSizeBytes (可选)批次大小(以字节为单位)。默认值:5242880 (5mb)。
maxRetryAttempts (可选)尝试次数上限,必须大于 0。默认值:no retries
maxRetryDuration (可选)重试时长上限(以毫秒为单位),必须大于 0。默认值:no retries
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

运行 Pub/Sub to Elasticsearch 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Elasticsearch template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • ERROR_OUTPUT_TOPIC:用于错误输出的 Pub/Sub 主题
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • CONNECTION_URL:您的 Elasticsearch 网址
  • DATASET:您的日志类型
  • NAMESPACE:数据集的命名空间
  • APIKEY:用于身份验证的 base64 编码 API 密钥

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • ERROR_OUTPUT_TOPIC:用于错误输出的 Pub/Sub 主题
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • CONNECTION_URL:您的 Elasticsearch 网址
  • DATASET:您的日志类型
  • NAMESPACE:数据集的命名空间
  • APIKEY:用于身份验证的 base64 编码 API 密钥

Datastream to Cloud Spanner

Datastream to Cloud Spanner 模板是一种流处理流水线,可从 Cloud Storage 存储分区中读取 Datastream 事件并将它们写入 Cloud Spanner 数据库。它预期用于将数据从 Datastream 迁移到 Cloud Spanner。

在执行模板之前,执行迁移所需的所有表必须存在于目标 Cloud Spanner 数据库中。因此,在数据迁移之前,必须完成从源数据库到目标 Cloud Spanner 的架构迁移。在迁移之前,数据可能存在表中。此模板不会将 DataStream 架构更改传播到 Cloud Spanner 数据库。

只有在所有数据都写入 Cloud Spanner 后,才能在迁移结束时保证数据一致性。为了存储对写入 Cloud Spanner 的每条记录的排序信息,此模板为 Cloud Spanner 数据库中的每个表创建了一个额外的表(称为影子表)。这用于确保迁移结束时的一致性。影子表在迁移后不会被删除,可在迁移结束时用于进行验证。

操作期间发生的任何错误(例如架构不匹配、JSON 文件格式错误或执行转换产生的错误)都会记录在错误队列中。错误队列是一个 Cloud Storage 文件夹,它以文本格式存储遇到错误的所有 Datastream 事件以及错误原因。这些错误可能是暂时性的,也可能是永久性的,它们存储在错误队列的相应 Cloud Storage 文件夹中。系统会自动重试暂时性错误,但不会自动重试永久性错误。如果发生永久性错误,您可以选择在模板运行期间更正更改事件,并将它们转移到可重试的存储分区。

对此流水线的要求

  • 处于正在运行未启动状态的 Datastream 数据流。
  • 要在其中复制 Datastream 事件的 Cloud Storage 存储分区。
  • 具有现有表的 Cloud Spanner 数据库。这些表可以为空,也可以包含数据。

模板参数

参数 说明
inputFilePattern Cloud Storage 中要复制的 Datastream 文件的位置。通常,这是数据流的根路径。
streamName 用于轮询架构信息和来源类型的数据流的名称或模板。
instanceId 在其中复制更改的 Cloud Spanner 实例。
databaseId 在其中复制更改的 Cloud Spanner 数据库。
projectId Cloud Spanner 项目 ID。
deadLetterQueueDirectory (可选)用于存储错误队列输出的文件路径。默认值为 Dataflow 作业的临时位置下的目录。
inputFileFormat (可选)Datastream 生成的输出文件的格式。例如:avro,json。默认值:avro
shadowTablePrefix (可选)用于为影子表命名的前缀。默认值:shadow_

运行 Datastream to Cloud Spanner 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Datastream to Spanner template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • GCS_FILE_PATH:用于存储 Datastream 事件的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:您的 Cloud Spanner 实例。
  • CLOUDSPANNER_DATABASE:您的 Cloud Spanner 数据库。
  • DLQ:错误队列目录的 Cloud Storage 路径。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • GCS_FILE_PATH:用于存储 Datastream 事件的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:您的 Cloud Spanner 实例。
  • CLOUDSPANNER_DATABASE:您的 Cloud Spanner 数据库。
  • DLQ:错误队列目录的 Cloud Storage 路径。

Text Files on Cloud Storage to BigQuery (Stream)

Text Files on Cloud Storage to BigQuery 流水线是一种流处理流水线,可用于流式传输 Cloud Storage 中存储的文本文件,使用您提供的 JavaScript 用户定义函数 (UDF) 转换这些文件,然后将结果附加到 BigQuery。

流水线无限期运行,需要通过取消而非排空手动终止,原因是其使用 Watch 转换,该转换是不支持排空的可拆分 DoFn

对此流水线的要求

  • 创建一个用于描述 BigQuery 中的输出表架构的 JSON 文件。

    确保有一个名为 BigQuery Schema 的顶级 JSON 数组,且该数组的内容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 格式。例如:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • 使用 UDF 函数(该函数提供转换文本行的逻辑)创建一个 JavaScript (.js) 文件。请注意,此函数必须返回 JSON 字符串。

    例如,以下函数将拆分 CSV 文件的每行文本,并通过转换值返回 JSON 字符串。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

模板参数

参数 说明
javascriptTextTransformGcsPath .js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
JSONPath 您的 BigQuery 架构文件的 Cloud Storage 位置,以 JSON 格式描述。例如:gs://path/to/my/schema.json
javascriptTextTransformFunctionName 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
outputTable 完全限定的 BigQuery 表,例如 my-project:dataset.table
inputFilePattern 您要处理的文本的 Cloud Storage 位置,例如:gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录,例如:gs://my-bucket/my-files/temp_dir
outputDeadletterTable 无法到达输出表的消息表。例如 my-project:dataset.my-unprocessed-table。如果消息表不存在,则系统将在执行流水线期间创建该表。 如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Cloud Storage Text to BigQuery (Stream) 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • BIGQUERY_UNPROCESSED_TABLE:未处理消息的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • BIGQUERY_UNPROCESSED_TABLE:未处理消息的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

Text Files on Cloud Storage to Pub/Sub (Stream)

此模板可以创建一种流处理流水线,该流水线可持续轮询新上传到 Cloud Storage 的文本文件,并逐行读取每个文件,然后将字符串发布到 Pub/Sub 主题。此外,此模板还能以包含 JSON 记录的换行符分隔文件或 CSV 文件形式,将记录发布到 Pub/Sub 主题进行实时处理。您可以使用此模板将数据重放到 Pub/Sub。

流水线无限期运行,需要通过“取消”而非“排空”手动终止,原因是其使用“Watch”转换,该转换是一个不支持排空的“SplittableDoFn”。

目前,轮询间隔固定为 10 秒。此模板不会对个别记录设置任何时间戳,因此在执行期间,事件时间将与发布时间相同。如果您的流水线依赖准确的事件时间来执行处理,建议不要使用此流水线。

对此流水线的要求

  • 输入文件必须采用换行符分隔的 JSON 或 CSV 格式。跨越源文件中多个行的记录会导致下游问题,因为文件中的每一行都将以消息形式发布到 Pub/Sub。
  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 流水线无限期运行,需要手动终止。

模板参数

参数 说明
inputFilePattern 需要读取的输入文件格式。例如 gs://bucket-name/files/*.jsongs://bucket-name/path/*.csv
outputTopic 要向其写入数据的 Pub/Sub 输入主题。该名称应采用 projects/<project-id>/topics/<topic-name> 格式。

运行 Text Files on Cloud Storage to Pub/Sub (Stream) 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Pub/Sub (Stream) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储分区的名称
  • FILE_PATTERN:要从 Cloud Storage 存储分区中读取的文件格式 glob(例如 path/*.csv

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储分区的名称
  • FILE_PATTERN:要从 Cloud Storage 存储分区中读取的文件格式 glob(例如 path/*.csv

Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP)

Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) 模板是一种流处理流水线,会从 Cloud Storage 存储分区读取 csv 文件,调用 Cloud Data Loss Prevention (Cloud DLP) API 进行去标识化处理,并将经过去标识化处理的数据写入指定的 BigQuery 表中。此模板支持使用 Cloud DLP 检查模板和 Cloud DLP 去标识化模板。 这样,用户不但能检查潜在的敏感信息并执行去标识化,而且还能对结构化数据(其中有数据列被指定为要进行去标识化处理且无需检查)执行去标识化。

对此流水线的要求

  • 要令牌化的输入数据必须存在。
  • Cloud DLP 模板(例如,DeidentifyTemplate 和 InspectTemplate)必须存在。如需了解更多详细信息,请参阅 Cloud DLP 模板
  • BigQuery 数据集必须存在。

模板参数

参数 说明
inputFilePattern 要从中读取输入数据记录的 csv 文件。也可以使用通配符,例如 gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv
dlpProjectId 拥有 Cloud DLP API 资源的 Cloud DLP 项目 ID。 此 Cloud DLP 项目可以是拥有 Cloud DLP 模板的项目,也可以是其他项目; 例如 my_dlp_api_project
deidentifyTemplateName 要用于 API 请求的 Cloud DLP 去标识化模板;以 projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} 模式指定,例如 projects/my_project/deidentifyTemplates/100
datasetName 用于发送标记化结果的 BigQuery 数据集。
batchSize 用于发送数据以进行检查和/或去标记化处理的区块/批次大小。对于 csv 文件,batchSize 是一个批次包含的行数。用户必须根据记录大小和文件大小确定批次大小。请注意,Cloud DLP API 的载荷大小上限为每个 API 调用 524 KB。
inspectTemplateName (可选)要用于 API 请求的 Cloud DLP 检查模板;以 projects/{template_project_id}/identifyTemplates/{idTemplateId} 模式指定,例如 projects/my_project/identifyTemplates/100

运行 Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

替换以下内容:

  • DLP_API_PROJECT_ID:您的 Cloud DLP API 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_DATA:输入文件路径
  • DEIDENTIFY_TEMPLATE:Cloud DLPDeidentify 模板编号
  • DATASET_NAME:BigQuery 数据集名称
  • INSPECT_TEMPLATE_NUMBER:Cloud DLPInspect 模板编号
  • BATCH_SIZE_VALUE:批次大小(对于 csv 文件,批次大小是每个 API 的行数)

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • DLP_API_PROJECT_ID:您的 Cloud DLP API 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_DATA:输入文件路径
  • DEIDENTIFY_TEMPLATE:Cloud DLPDeidentify 模板编号
  • DATASET_NAME:BigQuery 数据集名称
  • INSPECT_TEMPLATE_NUMBER:Cloud DLPInspect 模板编号
  • BATCH_SIZE_VALUE:批次大小(对于 csv 文件,批次大小是每个 API 的行数)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub 模版是一种流处理流水线,用于从 MySQL 数据库中读取包含更改数据的 Pub/Sub 消息并将记录写入 BigQuery。Debezium 连接器捕获对 MySQL 数据库的更改,并将更改后的数据发布到 Pub/Sub。然后,该模板读取 Pub/Sub 消息并将其写入 BigQuery。

您可以使用此模板同步 MySQL 数据库和 BigQuery 表。流水线将更改后的数据写入 BigQuery 暂存表,并间歇性地更新复制 MySQL 数据库的 BigQuery 表。

对此流水线的要求

  • Debezium 连接器必须已部署
  • Pub/Sub 消息必须在 Beam 行中序列化。

模板参数

参数 说明
inputSubscriptions 要读取的 Pub/Sub 输入订阅列表(以英文逗号分隔),格式为 <subscription>,<subscription>, ...
changeLogDataset 用于存储暂存表的 BigQuery 数据集,格式为 <my-dataset>
replicaDataset 用于存储副本表的 BigQuery 数据集的位置,格式为 <my-dataset>
updateFrequencySecs (可选)流水线更新复制 MySQL 数据库的 BigQuery 表的时间间隔。

运行 Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery 模版

如需运行此模板,请执行以下步骤:

  1. 在本地机器上,克隆 DataflowTemplates 代码库
  2. 切换到 v2/cdc-parent 目录:
  3. 确保已部署 Debezium 连接器
  4. 使用 Maven,运行 Dataflow 模板。
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
    • SUBSCRIPTIONS:以英文逗号分隔的 Pub/Sub 订阅名称列表
    • CHANGELOG_DATASET:用于变更日志数据的 BigQuery 数据集
    • REPLICA_DATASET:用于副本表的 BigQuery 数据集

Apache Kafka to BigQuery