Google 提供的 Dataflow 流处理模板

Google 提供了一组开源 Dataflow 模板。

这些 Dataflow 模板可帮助您解决大型数据任务,包括数据导入、数据导出、数据备份、数据恢复和批量 API 操作,所有这些均无需使用专用开发环境。这些模板基于 Apache Beam 构建,并使用 Dataflow 转换数据。

如需了解有关模板的一般信息,请参阅 Dataflow 模板。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板

本指南介绍了流式模板。

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

对此流水线的要求

  • Pub/Sub 消息的 data 字段必须使用 JSON 格式(如此 JSON 指南中所述)。例如,您可以将 data 字段中值为 {"k1":"v1", "k2":"v2"} 格式的消息插入具有名为 k1k2 的列且数据类型为字符串的 BigQuery 表中。
  • 在运行该流水线之前,输出表必须已存在。表架构必须与输入 JSON 对象相匹配。

模板参数

参数 说明
inputSubscription 要读取的 Pub/Sub 输入订阅,格式为 projects/<project>/subscriptions/<subscription>
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
outputDeadletterTable 未能到达输出表的消息的 BigQuery 表,格式为 <my-project>:<my-dataset>.<my-table>。如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 OUTPUT_TABLE_SPEC_error_records
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

运行 Pub/Sub Subscription to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Subscription to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery 模板是一种流处理流水线,可从 Cloud Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

对此流水线的要求

  • Pub/Sub 消息的 data 字段必须使用 JSON 格式(如此 JSON 指南中所述)。例如,您可以将 data 字段中值为 {"k1":"v1", "k2":"v2"} 格式的消息插入具有名为 k1k2 的列且数据类型为字符串的 BigQuery 表中。
  • 在运行该流水线之前,输出表必须已存在。表架构必须与输入 JSON 对象相匹配。

模板参数

参数 说明
inputTopic 要读取的 Cloud Pub/Sub 输入主题,格式为 projects/<project>/topics/<topic>
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
outputDeadletterTable 无法到达输出表的消息的 BigQuery 表。格式应为 <my-project>:<my-dataset>.<my-table>。 如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

运行 Cloud Pub/Sub Topic to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Topic to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。

对此流水线的要求

  • 用作输入来源的 Pub/Sub 订阅必须存在。
  • Avro 记录的架构文件必须存在于 Cloud Storage 存储空间中。
  • 未处理的 Pub/Sub 主题必须存在。
  • 用作输出目标的 BigQuery 数据集必须已存在。

模板参数

参数 说明
schemaPath Avro 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/schema.avsc
inputSubscription 要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription>
outputTopic 要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 输出表位置。例如 <my-project>:<my-dataset>.<my-table>。 根据指定的 createDisposition,系统可能会使用用户提供的 Avro 架构自动创建输出表。
writeDisposition (可选)BigQuery WriteDisposition。例如 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认:WRITE_APPEND
createDisposition (可选)BigQuery CreateDisposition。例如 CREATE_IF_NEEDEDCREATE_NEVER。默认:CREATE_IF_NEEDED

运行 Pub/Sub Avro to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Avro to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Avro 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • DEADLETTER_TOPIC:要用于未处理的队列的 Pub/Sub 主题

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Avro 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • DEADLETTER_TOPIC:要用于未处理的队列的 Pub/Sub 主题

Pub/Sub Proto to BigQuery

Pub/Sub proto to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。

可以提供 JavaScript 用户定义函数 (UDF) 来转换数据。可以将在执行 UDF 期间发生的错误发送到单独的 Pub/Sub 主题或与 BigQuery 错误相同的未处理主题。

对此流水线的要求

  • 用作输入来源的 Pub/Sub 订阅必须存在。
  • Proto 记录的架构文件必须存在于 Cloud Storage 中。
  • 输出 Pub/Sub 主题必须存在。
  • 用作输出目标的 BigQuery 数据集必须已存在。
  • 如果 BigQuery 表存在,则无论 createDisposition 值如何,该表都必须具有与 proto 数据匹配的架构。

模板参数

参数 说明
protoSchemaPath 独立的 proto 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/file.pb。 您可以使用 protoc 命令的 --descriptor_set_out 标志生成此文件。--include_imports 标志可确保文件是独立的。
fullMessageName 完整的 proto 消息名称。例如 package.name.MessageName,其中 package.name 是为 package 语句(而不是 java_package 语句)提供的值。
inputSubscription 要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription>
outputTopic 要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 输出表位置。例如 my-project:my_dataset.my_table。 根据指定的 createDisposition,系统可能会使用输入架构文件自动创建输出表。
preserveProtoFieldNames (可选)true 用于保留 JSON 中的原始 Proto 字段名称。false 用于使用更多标准 JSON 名称。例如,false 会将 field_name 更改为 fieldName。(默认:false
bigQueryTableSchemaPath (可选)BigQuery 架构路径到 Cloud Storage 路径。例如 gs://path/to/my/schema.json。如果未提供,则根据 Proto 架构推断架构。
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
udfOutputTopic (可选)存储 UDF 错误的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>。如果未提供,则会将 UDF 错误发送到 outputTopic 所在的主题。
writeDisposition (可选)BigQuery WriteDisposition。例如 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认值:WRITE_APPEND
createDisposition (可选)BigQuery CreateDisposition。例如 CREATE_IF_NEEDEDCREATE_NEVER。默认值为 CREATE_IF_NEEDED

运行 Pub/Sub Proto to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Proto to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
fullMessageName=PROTO_MESSAGE_NAME,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=UNPROCESSED_TOPIC
  

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Proto 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.pb
  • PROTO_MESSAGE_NAME:Proto 消息名称(例如 package.name.MessageName
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • UNPROCESSED_TOPIC:要用于未处理的队列的 Pub/Sub 主题

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_Proto_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "fullMessageName": "PROTO_MESSAGE_NAME",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "UNPROCESSED_TOPIC"
      }
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SCHEMA_PATH:Proto 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.pb
  • PROTO_MESSAGE_NAME:Proto 消息名称(例如 package.name.MessageName
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • UNPROCESSED_TOPIC:要用于未处理的队列的 Pub/Sub 主题

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并将这些消息写入其他 Pub/Sub 主题。该流水线还接受一个可选的消息属性键以及值,该值可用于过滤应写入 Pub/Sub 主题的消息。您可以使用此模板将消息从 Pub/Sub 订阅复制到带有可选消息过滤器的其他 Pub/Sub 主题。

对此流水线的要求

  • 源 Pub/Sub 订阅必须已存在才能执行流水线。
  • 源 Pub/Sub 订阅必须是拉取订阅
  • 目的地 Pub/Sub 主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name>
outputTopic 要将输出写入其中的 Cloud Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
filterKey (可选)根据特性键过滤事件。如果未指定 filterKey,则不会应用过滤器。
filterValue (可选)提供了 filterKey 时要使用的过滤器特性值。默认情况下,filterValue 为 null。

运行 Pub/Sub to Pub/Sub 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Pub/Sub template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOPIC_NAME:Pub/Sub 主题名称
  • FILTER_KEY:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。
  • FILTER_VALUE:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。不过滤部分匹配(如子字符串)。默认使用 null 事件过滤值。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOPIC_NAME:Pub/Sub 主题名称
  • FILTER_KEY:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。
  • FILTER_VALUE:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。不过滤部分匹配(如子字符串)。默认使用 null 事件过滤值。

Pub/Sub to Splunk

Pub/Sub to Splunk 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并通过 Splunk 的 HTTP Event Collector (HEC) 将消息载荷写入 Splunk。此模板的最常见使用场景是将日志导出到 Splunk。如需查看底层工作流的示例,请参阅使用 Dataflow 将支持生产环境的日志导出部署到 Splunk

在写入 Splunk 之前,您还可以将 JavaScript 用户定义函数应用于消息载荷。任何未能成功处理的消息都会被转发到 Pub/Sub 未处理主题,以便进一步进行问题排查并重新处理。

要为 HEC 令牌提供额外保护,您还可以传入 Cloud KMS 密钥和使用 Cloud KMS 密钥加密的 base64 编码 HEC 令牌参数。如需详细了解如何对 HEC 令牌参数进行加密,请参阅 Cloud KMS API 加密端点

对此流水线的要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Splunk HEC 端点必须可从 Dataflow 工作器的网络访问。
  • Splunk HEC 令牌必须已生成并且可用。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name>
token (可选)Splunk HEC 身份验证令牌。如果 tokenSource 设置为 PlaINTEXT 或 KMS,则必须提供。
url Splunk HEC 网址,必须可从运行流水线的 VPC 路由。例如,https://splunk-hec-host:8088。
outputDeadletterTopic 用于转发无法递送的消息的 Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
batchCount (可选)向 Splunk 发送多个事件的批次大小。默认值为 1(无批处理)。
parallelism (可选)最大并行请求数。默认值为 1(无并行)。
disableCertificateValidation (可选)停用 SSL 证书验证。默认为 false(已启用验证)。 如果为 true,则不验证证书(所有证书均受信任),并且忽略“rootCaCertificatePath”参数。
includePubsubMessage (可选)在载荷中包含完整的 Pub/Sub 消息。默认值为 false(只有数据元素包含在载荷中)。
tokenSource 令牌的来源。PLAINTEXT、KMS 或 SECRET_MANAGER 之一。如果使用了 Secret Manager,则必须提供此参数。如果 tokenSource 设置为 KMS,则必须提供 tokenKMSEncryptionKey 和加密的 token。如果 tokenSource 设置为 SECRET_MANAGER,则必须提供 tokenSecretId。如果 tokenSource 设置为 DLPINTEXT,则必须提供 token
tokenKMSEncryptionKey (可选)用于解密 HEC 令牌字符串的 Cloud KMS 密钥。 如果 tokenSource 设置为 KMS,则必须提供此参数。如果提供了 Cloud KMS 密钥,则必须以加密方式传递 HEC 令牌字符串。
tokenSecretId (可选)令牌的 Secret Manager Secret ID。如果 tokenSource 设置为 SECRET_MANAGER,则必须提供此参数。应采用以下格式:projects/<project-id>/secrets/<secret-name>/versions/<secret-version>
rootCaCertificatePath (可选)Cloud Storage 中根 CA 证书的完整网址。例如 gs://mybucket/mycerts/privateCA.crt。Cloud Storage 中提供的证书必须采用 DER 编码,并且可能以二进制或可打印 (Base64) 编码提供。如果证书是使用 Base64 编码提供的,则它必须以 -----BEGIN CERTIFICATE----- 开头为界,并且必须以 -----END CERTIFICATE----- 结尾为界。如果提供此参数,系统会提取此私有 CA 证书文件并将其添加到 Dataflow 工作器的信任库,以便验证 Splunk HEC 端点的 SSL 证书。如果未提供此参数,则使用默认信任库。
enableBatchLogs (可选)指定是否应为写入 Splunk 的批次启用日志。默认值:true
enableGzipHttpCompression (可选)指定是否应压缩发送到 Splunk HEC 的 HTTP 请求(gzip 内容编码)。默认值:true

运行 Pub/Sub to Splunk 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Splunk template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
  • ROOT_CA_CERTIFICATE_PATH:Cloud Storage 中根 CA 证书的路径(例如 gs://your-bucket/privateCA.crt

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
  • ROOT_CA_CERTIFICATE_PATH:Cloud Storage 中根 CA 证书的路径(例如 gs://your-bucket/privateCA.crt

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files on Cloud Storage 模板是一个流处理流水线,可从 Pub/Sub 主题中读取数据,并将 Avro 文件写入指定的 Cloud Storage 存储桶。

对此流水线的要求

  • Pub/Sub 输入主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputTopic 要订阅用来处理消息的 Pub/Sub 主题。主题名称必须采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 要用于归档输出 Avro 文件的输出目录。末尾必须包含 /。例如:gs://example-bucket/example-directory/
avroTempDirectory 临时 Avro 文件的目录。末尾必须包含 /。例如:gs://example-bucket/example-directory/
outputFilenamePrefix (可选)Avro 文件的输出文件名前缀。
outputFilenameSuffix (可选)Avro 文件的输出文件名后缀。
outputShardTemplate (可选)输出文件的分片模板。它被指定为字母 SN 的重复序列。例如 SSS-NNN。这些字母会分别替换成分片编号或分片总数。如果未指定此参数,则默认模板格式为 W-P-SS-of-NN

运行 Pub/Sub to Cloud Storage Avro 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Avro Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置;例如 gs://your-bucket/temp
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置;例如 gs://your-bucket/temp
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板

Pub/Sub Topic to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 主题读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。

对此流水线的要求

  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 发布到主题的消息必须采用文本格式。
  • 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。

模板参数

参数 说明
inputTopic 要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 用于写入输出文件的路径和文件名前缀,例如 gs://bucket-name/path/。该值必须以斜杠结尾。
outputFilenamePrefix 要在各窗口文件上放置的前缀。例如 output-
outputFilenameSuffix 要放置于每个窗口化文件上的后缀,通常是文件扩展名,例如 .txt.csv
outputShardTemplate 分片式模板定义每个窗口文件的动态部分。默认情况下,该管道使用单一碎片输出到各窗口内的文件系统。这意味着每个窗口的所有数据都会输出到单个文件中。outputShardTemplate 默认为 W-P-SS-of-NN,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplateSS-of-NN 部分为 00-of-01

运行 Pub/Sub Topic to Text Files on Cloud Storage 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。

Pub/Sub Topic or Subscription to Text Files on Cloud Storage

Pub/Sub Topic or Subscription to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。

对此流水线的要求

  • Pub/Sub 主题或订阅必须已存在才能执行此流水线。
  • 发布到主题的消息必须采用文本格式。
  • 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。

模板参数

参数 说明
inputTopic 要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<project-id>/topics/<topic-name> 格式。 如果提供此参数,则不应提供 inputSubscription
inputSubscription 要从中读取输入的 Pub/Sub 订阅。订阅名称应采用 projects/<project-id>/subscription/<subscription-name> 格式。如果提供此参数,则不应提供 inputTopic
outputDirectory 用于写入输出文件的路径和文件名前缀,例如 gs://bucket-name/path/。该值必须以斜杠结尾。
outputFilenamePrefix 要在各窗口文件上放置的前缀。例如 output-
outputFilenameSuffix 要放置于每个窗口化文件上的后缀,通常是文件扩展名,例如 .txt.csv
outputShardTemplate 分片式模板定义每个窗口文件的动态部分。默认情况下,该管道使用单一碎片输出到各窗口内的文件系统。这意味着每个窗口的所有数据都会输出到单个文件中。outputShardTemplate 默认为 W-P-SS-of-NN,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplateSS-of-NN 部分为 00-of-01
windowDuration (可选)窗口时长是将数据写入输出目录的时间间隔。请根据流水线的吞吐量配置时长。例如,较高的吞吐量可能需要较短的窗口时长,以便数据适应内存。默认为 5 分钟,最短可为 1 秒。允许的格式如下:[int]s(表示数秒,例如 5s)、[int]m(表示数分钟,例如 12m)、[int]h(表示数小时,例如 2h)。

运行 Pub/Sub Topic or Subscription to Text Files on Cloud Storage 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template jobs run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称

Pub/Sub to MongoDB

Pub/Sub to MongoDB 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 编码的消息并将其以文档的形式写入 MongoDB。如果需要,此流水线还能支持额外的转换,只需通过 JavaScript 用户定义的函数 (UDF) 包含这些转换即可。因架构不匹配、JOSN 格式不正确或执行转换时发生的所有错误都会记录在未处理消息的 BigQuery 表中,并随附输入消息。如果在执行之前不存在任何未处理记录的表,则流水线会自动创建该表。

对此流水线的要求

  • Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • MongoDB 集群必须存在,并且应该可通过 Dataflow 工作器机器访问。

模板参数

参数 说明
inputSubscription Pub/Sub 订阅的名称。例如:projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri 以英文逗号分隔的 MongoDB 服务器列表。例如:192.285.234.12:27017,192.287.123.11:27017
database 存储集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
deadletterTable 存储失败消息(架构不匹配、JSON 格式错误)的 BigQuery 表。例如:project-id:dataset-name.table-name
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
batchSize (可选)用于将文档批量插入 MongoDB 的批次大小。默认值:1000
batchSizeBytes (可选)批次大小(以字节为单位)。默认值:5242880
maxConnectionIdleTime (可选)在出现连接超时之前所允许的空闲时间上限(以秒为单位)。默认值:60000
sslEnabled (可选)用于指示与 MongoDB 的连接是否启用了 SSL 的布尔值。默认值:true
ignoreSSLCertificate (可选)用于指示是否应忽略 SSL 证书的布尔值。默认值:true
withOrdered (可选)允许依次批量插入 MongoDB 的布尔值。默认值:true
withSSLInvalidHostNameAllowed (可选)用于指示是否允许 SSL 连接使用无效主机名的布尔值。默认值:true

运行 Pub/Sub to MongoDB 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to MongoDB template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/my-project-id/subscriptions/my-subscription-id
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name

Pub/Sub to Elasticsearch

Pub/Sub to Elasticsearch 模板是一种流处理流水线,可从 Pub/Sub 订阅读取消息、执行用户定义的函数 (UDF) 并将其作为文档写入 Elasticsearch。Dataflow 模板使用 Elasticsearch 的数据流功能跨多个索引存储时间序列数据,同时为请求提供单个命名资源。数据流非常适合存储在 Pub/Sub 中的日志、指标、跟踪记录和其他持续生成的数据。

对此流水线的要求

  • 来源 Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • GCP 实例上或 Elastic Cloud 上可公开访问的 Elasticsearch 主机,用于 Elasticsearch 7.0 或更高版本。如需了解详情,请参阅适用于 Elastic 的 Google Cloud 集成
  • 用于错误输出的 Pub/Sub 主题。

模板参数

参数 说明
inputSubscription 要使用的 Pub/Sub 订阅。该名称应采用 projects/<project-id>/subscriptions/<subscription-name> 格式。
connectionUrl Elasticsearch 网址,格式为 https://hostname:[port] 或指定 CloudID(如果使用 Elastic Cloud)。
apiKey 用于身份验证的 Base64 编码 API 密钥。
errorOutputTopic Pub/Sub 输出主题,用于发布失败的记录,格式为 projects/<project-id>/topics/<topic-name>
dataset (可选)通过 Pub/Sub 发送的日志类型,我们为其提供了开箱即用的信息中心。已知日志类型值为 audit、vpcflow 和 firewall。默认值:pubsub
namespace (可选)任意分组,例如环境(dev、prod 或 qa)、团队或战略性业务部门。默认值:default
batchSize (可选)文档数量中的批次大小。默认值:1000
batchSizeBytes (可选)批次大小(以字节为单位)。默认值:5242880 (5mb)。
maxRetryAttempts (可选)尝试次数上限,必须大于 0。默认值:no retries
maxRetryDuration (可选)重试时长上限(以毫秒为单位),必须大于 0。默认值:no retries
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
propertyAsIndex (可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _index 元数据(优先于 _index UDF)。默认值:none。
propertyAsId (可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _id 元数据(优先于 _id UDF)。默认值:none。
javaScriptIndexFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。
javaScriptIndexFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。
javaScriptIdFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。
javaScriptIdFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。
javaScriptTypeFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。
javaScriptTypeFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。
javaScriptIsDeleteFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true""false"。默认值:none。
javaScriptIsDeleteFnName (可选)函数的 UDF JavaScript 函数名称,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true""false"。默认值:none。
usePartialUpdate (可选)是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或索引,允许部分文档)。默认值:false
bulkInsertMethod (可选)在 Elasticsearch 批量请求中使用 INDEX(索引,允许执行更新插入操作)还是 CREATE(创建,会对重复 _id 报错)。默认值:CREATE

运行 Pub/Sub to Elasticsearch 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Elasticsearch template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • ERROR_OUTPUT_TOPIC:用于错误输出的 Pub/Sub 主题
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • CONNECTION_URL:您的 Elasticsearch 网址
  • DATASET:您的日志类型
  • NAMESPACE:数据集的命名空间
  • APIKEY:用于身份验证的 base64 编码 API 密钥

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • ERROR_OUTPUT_TOPIC:用于错误输出的 Pub/Sub 主题
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • CONNECTION_URL:您的 Elasticsearch 网址
  • DATASET:您的日志类型
  • NAMESPACE:数据集的命名空间
  • APIKEY:用于身份验证的 base64 编码 API 密钥

Datastream to Cloud Spanner

Datastream to Cloud Spanner 模板是一种流处理流水线,可从 Cloud Storage 存储桶中读取 Datastream 事件并将它们写入 Cloud Spanner 数据库。它预期用于将数据从 Datastream 迁移到 Cloud Spanner。

在执行模板之前,执行迁移所需的所有表必须存在于目标 Cloud Spanner 数据库中。因此,在数据迁移之前,必须完成从源数据库到目标 Cloud Spanner 的架构迁移。在迁移之前,数据可能存在表中。此模板不会将 DataStream 架构更改传播到 Cloud Spanner 数据库。

只有在所有数据都写入 Cloud Spanner 后,才能在迁移结束时保证数据一致性。为了存储对写入 Cloud Spanner 的每条记录的排序信息,此模板为 Cloud Spanner 数据库中的每个表创建了一个额外的表(称为影子表)。这用于确保迁移结束时的一致性。影子表在迁移后不会被删除,可在迁移结束时用于进行验证。

操作期间发生的任何错误(例如架构不匹配、JSON 文件格式错误或执行转换产生的错误)都会记录在错误队列中。错误队列是一个 Cloud Storage 文件夹,它以文本格式存储遇到错误的所有 Datastream 事件以及错误原因。这些错误可能是暂时性的,也可能是永久性的,它们存储在错误队列的相应 Cloud Storage 文件夹中。系统会自动重试暂时性错误,但不会自动重试永久性错误。如果发生永久性错误,您可以选择在模板运行期间更正更改事件,并将它们转移到可重试的存储桶。

对此流水线的要求

  • 处于正在运行未启动状态的 Datastream 数据流。
  • 要在其中复制 Datastream 事件的 Cloud Storage 存储桶。
  • 具有现有表的 Cloud Spanner 数据库。这些表可以为空,也可以包含数据。

模板参数

参数 说明
inputFilePattern Cloud Storage 中要复制的 Datastream 文件的位置。通常,这是数据流的根路径。
streamName 用于轮询架构信息和来源类型的数据流的名称或模板。
instanceId 在其中复制更改的 Cloud Spanner 实例。
databaseId 在其中复制更改的 Cloud Spanner 数据库。
projectId Cloud Spanner 项目 ID。
deadLetterQueueDirectory (可选)用于存储错误队列输出的文件路径。默认值为 Dataflow 作业的临时位置下的目录。
inputFileFormat (可选)Datastream 生成的输出文件的格式。例如:avro,json。默认值:avro
shadowTablePrefix (可选)用于为影子表命名的前缀。默认值:shadow_

运行 Datastream to Cloud Spanner 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Datastream to Spanner template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • GCS_FILE_PATH:用于存储 Datastream 事件的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:您的 Cloud Spanner 实例。
  • CLOUDSPANNER_DATABASE:您的 Cloud Spanner 数据库。
  • DLQ:错误队列目录的 Cloud Storage 路径。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • GCS_FILE_PATH:用于存储 Datastream 事件的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:您的 Cloud Spanner 实例。
  • CLOUDSPANNER_DATABASE:您的 Cloud Spanner 数据库。
  • DLQ:错误队列目录的 Cloud Storage 路径。

Text Files on Cloud Storage to BigQuery (Stream)

Text Files on Cloud Storage to BigQuery 流水线是一种流处理流水线,可用于流式传输 Cloud Storage 中存储的文本文件,使用您提供的 JavaScript 用户定义函数 (UDF) 转换这些文件,然后将结果附加到 BigQuery。

流水线无限期运行,需要通过取消而非排空手动终止,原因是其使用 Watch 转换,该转换是不支持排空的可拆分 DoFn

对此流水线的要求

  • 创建一个用于描述 BigQuery 中的输出表架构的 JSON 文件。

    确保有一个名为 fields 的顶级 JSON 数组,且该数组的内容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 格式。例如:

    {
      "fields": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • 使用 UDF 函数(该函数提供转换文本行的逻辑)创建一个 JavaScript (.js) 文件。请注意,此函数必须返回 JSON 字符串。

    例如,以下函数将拆分 CSV 文件的每行文本,并通过转换值返回 JSON 字符串。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

模板参数

参数 说明
javascriptTextTransformGcsPath .js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
JSONPath 您的 BigQuery 架构文件的 Cloud Storage 位置,以 JSON 格式描述。例如:gs://path/to/my/schema.json
javascriptTextTransformFunctionName 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
outputTable 完全限定的 BigQuery 表,例如 my-project:dataset.table
inputFilePattern 您要处理的文本的 Cloud Storage 位置,例如:gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录,例如:gs://my-bucket/my-files/temp_dir
outputDeadletterTable 无法到达输出表的消息表。例如 my-project:dataset.my-unprocessed-table。如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Cloud Storage Text to BigQuery (Stream) 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • BIGQUERY_UNPROCESSED_TABLE:未处理消息的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • BIGQUERY_UNPROCESSED_TABLE:未处理消息的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

Text Files on Cloud Storage to Pub/Sub (Stream)

此模板可以创建一种流处理流水线,该流水线可持续轮询新上传到 Cloud Storage 的文本文件,并逐行读取每个文件,然后将字符串发布到 Pub/Sub 主题。此外,此模板还能以包含 JSON 记录的换行符分隔文件或 CSV 文件形式,将记录发布到 Pub/Sub 主题进行实时处理。您可以使用此模板将数据重放到 Pub/Sub。

流水线无限期运行,需要通过“取消”而非“排空”手动终止,原因是其使用“Watch”转换,该转换是一个不支持排空的“SplittableDoFn”。

目前,轮询间隔固定为 10 秒。此模板不会对个别记录设置任何时间戳,因此在执行期间,事件时间与发布时间相同。如果您的流水线依赖准确的事件时间来执行处理,建议不要使用此流水线。

对此流水线的要求

  • 输入文件必须采用换行符分隔的 JSON 或 CSV 格式。跨越源文件中多个行的记录会导致下游问题,因为文件中的每一行都以消息形式发布到 Pub/Sub。
  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 流水线无限期运行,需要手动终止。

模板参数

参数 说明
inputFilePattern 需要读取的输入文件格式。例如 gs://bucket-name/files/*.jsongs://bucket-name/path/*.csv
outputTopic 要向其写入数据的 Pub/Sub 输入主题。该名称应采用 projects/<project-id>/topics/<topic-name> 格式。

运行 Text Files on Cloud Storage to Pub/Sub (Stream) 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Pub/Sub (Stream) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME\
    --staging-location STAGING_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • FILE_PATTERN:要从 Cloud Storage 存储桶中读取的文件格式 glob(例如 path/*.csv

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称
  • FILE_PATTERN:要从 Cloud Storage 存储桶中读取的文件格式 glob(例如 path/*.csv

Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP)

Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) 模板是一种流处理流水线,会从 Cloud Storage 存储桶读取 csv 文件,调用 Cloud Data Loss Prevention (Cloud DLP) API 进行去标识化处理,并将经过去标识化处理的数据写入指定的 BigQuery 表中。此模板支持使用 Cloud DLP 检查模板和 Cloud DLP 去标识化模板。 这样用户就能够检查潜在的敏感信息并进行去标识化处理,以及对结构化数据进行去标识化处理,其中列指定进行去标识化处理并且无需检查。值得注意的是,此模板不支持去标识化模板位置的区域路径。仅支持全局路径。

对此流水线的要求

  • 要令牌化的输入数据必须存在。
  • Cloud DLP 模板(例如,DeidentifyTemplate 和 InspectTemplate)必须存在。如需了解更多详细信息,请参阅 Cloud DLP 模板
  • BigQuery 数据集必须存在。

模板参数

参数 说明
inputFilePattern 要从中读取输入数据记录的 csv 文件。也可以使用通配符,例如 gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv
dlpProjectId 拥有 Cloud DLP API 资源的 Cloud DLP 项目 ID。 此 Cloud DLP 项目可以是拥有 Cloud DLP 模板的项目,也可以是其他项目; 例如 my_dlp_api_project
deidentifyTemplateName 要用于 API 请求的 Cloud DLP 去标识化模板;以 projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} 模式指定,例如 projects/my_project/deidentifyTemplates/100
datasetName 用于发送标记化结果的 BigQuery 数据集。
batchSize 用于发送数据以进行检查和/或去标记化处理的区块/批次大小。对于 csv 文件,batchSize 是一个批次包含的行数。用户必须根据记录大小和文件大小确定批次大小。请注意,Cloud DLP API 的载荷大小上限为每个 API 调用 524 KB。
inspectTemplateName (可选)要用于 API 请求的 Cloud DLP 检查模板;以 projects/{template_project_id}/identifyTemplates/{idTemplateId} 模式指定,例如 projects/my_project/identifyTemplates/100

运行 Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

替换以下内容:

  • DLP_API_PROJECT_ID:您的 Cloud DLP API 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_DATA:输入文件路径
  • DEIDENTIFY_TEMPLATE:Cloud DLPDeidentify 模板编号
  • DATASET_NAME:BigQuery 数据集名称
  • INSPECT_TEMPLATE_NUMBER:Cloud DLPInspect 模板编号
  • BATCH_SIZE_VALUE:批次大小(对于 csv 文件,批次大小是每个 API 的行数)

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • DLP_API_PROJECT_ID:您的 Cloud DLP API 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_DATA:输入文件路径
  • DEIDENTIFY_TEMPLATE:Cloud DLPDeidentify 模板编号
  • DATASET_NAME:BigQuery 数据集名称
  • INSPECT_TEMPLATE_NUMBER:Cloud DLPInspect 模板编号
  • BATCH_SIZE_VALUE:批次大小(对于 csv 文件,批次大小是每个 API 的行数)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub 模版是一种流处理流水线,用于从 MySQL 数据库中读取包含更改数据的 Pub/Sub 消息并将记录写入 BigQuery。Debezium 连接器捕获对 MySQL 数据库的更改,并将更改后的数据发布到 Pub/Sub。然后,该模板读取 Pub/Sub 消息并将其写入 BigQuery。

您可以使用此模板同步 MySQL 数据库和 BigQuery 表。流水线将更改后的数据写入 BigQuery 暂存表,并间歇性地更新复制 MySQL 数据库的 BigQuery 表。

对此流水线的要求

  • Debezium 连接器必须已部署
  • Pub/Sub 消息必须在 Beam 行中序列化。

模板参数

参数 说明
inputSubscriptions 要读取的 Pub/Sub 输入订阅列表(以英文逗号分隔),格式为 <subscription>,<subscription>, ...
changeLogDataset 用于存储暂存表的 BigQuery 数据集,格式为 <my-dataset>
replicaDataset 用于存储副本表的 BigQuery 数据集的位置,格式为 <my-dataset>
updateFrequencySecs (可选)流水线更新复制 MySQL 数据库的 BigQuery 表的时间间隔。

运行 Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery 模版

如需运行此模板,请执行以下步骤:

  1. 在本地机器上,克隆 DataflowTemplates 代码库
  2. 切换到 v2/cdc-parent 目录:
  3. 确保已部署 Debezium 连接器
  4. 使用 Maven,运行 Dataflow 模板。
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
    • SUBSCRIPTIONS:以英文逗号分隔的 Pub/Sub 订阅名称列表
    • CHANGELOG_DATASET:用于变更日志数据的 BigQuery 数据集
    • REPLICA_DATASET:用于副本表的 BigQuery 数据集

Apache Kafka to BigQuery

Apache Kafka to BigQuery 模板是一种流处理流水线,可从 Apache Kafka 提取文本数据、执行用户定义的函数 (UDF) 并将生成的记录输出到 BigQuery。在转换数据、执行 UDF 或插入输出表时出现的任何错误都将被插入 BigQuery 中单独的错误表。如果在执行之前错误表不存在,则创建该表。

对此流水线的要求

  • 输出 BigQuery 表必须已存在。
  • Apache Kafka 代理服务器必须正在运行并可从 Dataflow 工作器机器进行访问。
  • Apache Kafka 主题必须存在,并且消息必须采用有效的 JSON 格式编码。

模板参数

参数 说明
outputTableSpec 要写入 Apache Kafka 消息的 BigQuery 输出表位置,格式为 my-project:dataset.table
inputTopics 以英文逗号分隔的列表中可读取的 Apache Kafka 输入主题。例如:messages
bootstrapServers 以英文逗号分隔的列表中正在运行的 Apache Kafka Broker 服务器的主机地址,每个主机地址的格式为 35.70.252.199:9092
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
outputDeadletterTable (可选)未能到达输出表的消息的 BigQuery 表,格式为 my-project:dataset.my-deadletter-table。如果不存在,则系统会在流水线执行期间创建该表。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Apache Kafka to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Kafka to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • KAFKA_TOPICS:Apache Kakfa 主题列表。如果提供了多个主题,请按照说明了解如何转义英文逗号。
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • KAFKA_SERVER_ADDRESSES:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都应与服务器可访问的端口号相匹配。例如:35.70.252.199:9092。如果提供了多个地址,请按照说明了解如何转义英文逗号。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • KAFKA_TOPICS:Apache Kakfa 主题列表。如果提供了多个主题,请按照说明了解如何转义英文逗号。
  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • KAFKA_SERVER_ADDRESSES:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都应与服务器可访问的端口号相匹配。例如:35.70.252.199:9092。如果提供了多个地址,请按照说明了解如何转义英文逗号。

如需了解详情,请参阅使用 Dataflow 将数据从 Kafka 写入 BigQuery

Datastream to BigQuery (Stream)

Datastream to BigQuery 模板是一种流处理流水线,可读取 Datastream 数据并将其复制到 BigQuery 中。该模板使用 Pub/Sub 通知从 Cloud Storage 中读取数据,并将其复制到时间分区的 BigQuery 暂存表中。复制后,该模板会在 BigQuery 中执行 MERGE,以将所有更改数据捕获 (CDC) 更改插入/更新到源表的副本中。

此模板处理创建和更新通过复制管理的 BigQuery 表。当需要数据定义语言 (DDL) 时,对 Datastream 的回调将提取源表架构并将其转换为 BigQuery 数据类型。支持的操作包括下列操作:

  • 在插入数据时创建新表。
  • 在 BigQuery 表中添加新列,且初始值为 null。
  • 在 BigQuery 中忽略丢弃的列,且未来的值为 null。
  • 重命名的列将作为新列添加到 BigQuery 中。
  • 类型更改不会传播到 BigQuery。

对此流水线的要求

  • 已准备好或已在复制数据的 Datastream 数据流。
  • 已经为 Datastream 数据启用 Cloud Storage Pub/Sub 通知
  • BigQuery 目标数据集已创建,而 Compute Engine 服务帐号已获授予管理员的访问权限。
  • 若要创建目标副本表,源表中必须有主键。

模板参数

参数 说明
inputFilePattern Cloud Storage 中要复制的 Datastream 文件的位置。此文件位置通常是数据流的根路径。
gcsPubSubSubscription 包含 Datastream 文件通知的 Pub/Sub 订阅。例如 projects/my-project-id/subscriptions/my-subscription-id
inputFileFormat Datastream 生成的输出文件的格式。例如:avro,json。默认值:avro
outputStagingDatasetTemplate 包含暂存表的现有数据集的名称。您可以将模板 {_metadata_dataset} 包括为占位符,它会被替换为源数据集/架构的名称(例如 {_metadata_dataset}_log)。
outputDatasetTemplate 包含副本表的现有数据集的名称。您可以将模板 {_metadata_dataset} 包括为占位符,它会被替换为源数据集/架构的名称(例如 {_metadata_dataset})。
deadLetterQueueDirectory 用于存储任何未处理消息以及无法处理原因的文件路径。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,默认值就可以了。
outputStagingTableNameTemplate (可选)暂存表的名称模板。默认值为 {_metadata_table}_log。如果您要复制多个架构,建议使用 {_metadata_schema}_{_metadata_table}_log
outputTableNameTemplate (可选)副本表名称的模板。默认值:{_metadata_table}。如果您要复制多个架构,建议使用 {_metadata_schema}_{_metadata_table}
outputProjectId (可选)BigQuery 数据集的项目,用于将数据输出到其中。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
streamName (可选)用于轮询架构信息的数据流的名称或模板。默认值:{_metadata_stream}
mergeFrequencyMinutes (可选)合并给定表格的时间间隔(分钟)。默认值:5。
dlqRetryMinutes (可选)死信队列 (DLQ) 重试之间的分钟数。默认值:10。
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

运行 Datastream to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Datastream to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如:projects/my-project-id/subscriptions/my-subscription-id
  • BIGQUERY_DATASET:您的 BigQuery 数据集名称。
  • BIGQUERY_TABLE:您的 BigQuery 表模板。例如 {_metadata_schema}_{_metadata_table}_log

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如:projects/my-project-id/subscriptions/my-subscription-id
  • BIGQUERY_DATASET:您的 BigQuery 数据集名称。
  • BIGQUERY_TABLE:您的 BigQuery 表模板。例如 {_metadata_schema}_{_metadata_table}_log

Datastream to MySQL 或 PostgreSQL (Stream)

Datastream to SQL 模板是一种流处理流水线,可读取 Datastream 数据并将其复制到任何 MySQL 或 PostgreSQL 数据库。该模板使用 Pub/Sub 通知从 Cloud Storage 中读取数据,并将此数据复制到 SQL 副本表。

该模板不支持数据定义语言 (DDL),要求数据库中已存在所有表。复制使用 Dataflow 有状态转换来过滤过时的数据,并确保数据无序的一致性。例如,如果传递了某行的较新版本,则会忽略该行较晚到达的版本。执行的数据操纵语言 (DML) 是将源数据最佳复制到目标数据的最佳尝试。执行的 DML 语句遵循以下规则:

  • 如果存在主键,则插入和更新操作使用插入/更新语法(例如 INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE)。
  • 如果存在主键,则删除将作为删除 DML 复制。
  • 如果不存在主键,则会将插入和更新操作插入表中。
  • 如果不存在主键,则会忽略删除操作。

如果您使用的是 Oracle to Postgres 实用程序,请在 SQL 中添加 ROWID 作为主键(如果主键不存在)。

对此流水线的要求如下

  • 已准备好或已在复制数据的 Datastream 数据流。
  • 已经为 Datastream 数据启用 Cloud Storage Pub/Sub 通知
  • 为 PostgreSQL 数据库提供了所需的架构。
  • 已在 Dataflow 工作器和 PostgreSQL 之间设置网络访问权限。

模板参数

参数 说明
inputFilePattern Cloud Storage 中要复制的 Datastream 文件的位置。此文件位置通常是数据流的根路径。
gcsPubSubSubscription 包含 Datastream 文件通知的 Pub/Sub 订阅。例如 projects/my-project-id/subscriptions/my-subscription-id
inputFileFormat Datastream 生成的输出文件的格式。例如:avro,json。默认值:avro
databaseHost 要连接的 SQL 主机。
databaseUser 具有写入副本中所有表所需的全部权限的 SQL 用户。
databasePassword 给定 SQL 用户的密码。
databasePort (可选)要连接到的 SQL 数据库端口。默认值:5432。
databaseName (可选)要连接到的 SQL 数据库的名称。默认值:postgres。
streamName (可选)用于轮询架构信息的数据流的名称或模板。默认值:{_metadata_stream}

运行 Datastream to SQL 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Datastream to SQL template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如:projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST:您的 SQL 主机 IP。
  • DATABASE_USER:您的 SQL 用户。
  • DATABASE_PASSWORD:您的 SQL 密码。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

    • latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates/latest/
    • the version name, like 2021-09-20-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates/
  • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如:projects/my-project-id/subscriptions/my-subscription-id
  • DATABASE_HOST:您的 SQL 主机 IP。
  • DATABASE_USER:您的 SQL 用户。
  • DATABASE_PASSWORD:您的 SQL 密码。

Pub/Sub to Java Database Connectivity (JDBC)

Pub/Sub to Java Database Connectivity (JDBC) 模板是一种流处理流水线,可从预先存在的 Cloud Pub/Sub 订阅注入数据作为 JSON 字符串,并将生成的记录写入 JDBC。

对此流水线的要求

  • Cloud Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行流水线之前,JDBC 源必须已存在。
  • 在运行流水线之前,Cloud Pub/Sub 输出死信主题必须已存在。

模板参数

参数 说明
driverClassName JDBC 驱动程序类名称。例如 com.mysql.jdbc.Driver
connectionUrl JDBC 连接网址字符串。例如 jdbc:mysql://some-host:3306/sampledb。可作为 Base64 编码,然后使用 Cloud KMS 密钥加密的字符串传入。
driverJars 以英文逗号分隔的 JDBC 驱动程序 Cloud Storage 路径。例如 gs://your-bucket/driver_jar1.jar,gs://your-bucket/driver_jar2.jar
username (可选)用于 JDBC 连接的用户名。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
password (可选)用于 JDBC 连接的密码。该参数可以作为使用 Cloud KMS 密钥加密的 Base64 编码字符串传入。
connectionProperties (可选)用于 JDBC 连接的属性字符串。 字符串的格式必须为 [propertyName=property;]*。例如 unicode=true;characterEncoding=UTF-8
statement 针对数据库运行的语句。该语句必须以任意顺序指定表的列名。只会从 JSON 中读取指定列名称的值并将其添加到语句中。例如 INSERT INTO tableName (column1, column2) VALUES (?,?)
inputSubscription 要读取的 Pub/Sub 输入订阅,格式为 projects/<project>/subscriptions/<subscription>
outputDeadletterTopic 用于转发无法递送的消息的 Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
KMSEncryptionKey (可选)用于对用户名、密码和连接字符串进行解密的 Cloud KMS 加密密钥。如果传入了 Cloud KMS 密钥,则用户名、密码和连接字符串都必须以加密方式进行传递。
extraFilesToStage 用于将文件暂存在工作器中的 Cloud Storage 路径或 Secret Manager 密文,以逗号分隔。这些文件将保存在每个工作器的 /extra_files 目录下。例如 gs://<my-bucket>/file.txt,projects/<project-id>/secrets/<secret-id>/versions/<version-id>

运行 Pub/Sub to Java Database Connectivity (JDBC) 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to JDBC template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/VERSION/PubSub_to_Jdbc \
    --region REGION_NAME \
    --parameters \
driverClassName=DRIVER_CLASS_NAME,\
connectionURL=JDBC_CONNECTION_URL,\
driverJars=DRIVER_PATHS,\
username=CONNECTION_USERNAME,\
password=CONNECTION_PASSWORD,\
connectionProperties=CONNECTION_PROPERTIES,\
statement=SQL_STATEMENT,\
inputSubscription=INPUT_SUBSCRIPTION,\
outputDeadletterTopic=OUTPUT_DEADLETTER_TOPIC,\
KMSEncryptionKey=KMS_ENCRYPTION_KEY

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • DRIVER_CLASS_NAME:驱动程序类名称
  • JDBC_CONNECTION_URL:JDBC 连接网址
  • DRIVER_PATHS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • CONNECTION_USERNAME:JDBC 连接用户名
  • CONNECTION_PASSWORD:JDBC 连接密码
  • CONNECTION_PROPERTIES:JDBC 连接属性(如有需要)
  • SQL_STATEMENT:要对数据库执行的 SQL 语句
  • INPUT_SUBSCRIPTION:要读取的 Pub/Sub 输入订阅。
  • OUTPUT_DEADLETTER_TOPIC:用于转发无法递送的消息的 Pub/Sub
  • KMS_ENCRYPTION_KEY:Cloud KMS 加密密钥

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates/VERSION/PubSub_to_Jdbc
{
   "jobName": "JOB_NAME",
   "parameters": {
       "driverClassName": "DRIVER_CLASS_NAME",
       "connectionURL": "JDBC_CONNECTION_URL",
       "driverJars": "DRIVER_PATHS",
       "username": "CONNECTION_USERNAME",
       "password": "CONNECTION_PASSWORD",
       "connectionProperties": "CONNECTION_PROPERTIES",
       "statement": "SQL_STATEMENT",
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "outputDeadletterTopic": "OUTPUT_DEADLETTER_TOPIC",
       "KMSEncryptionKey":"KMS_ENCRYPTION_KEY"
   },
   "environment": { "zone": "us-central1-f" },
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • DRIVER_CLASS_NAME:驱动程序类名称
  • JDBC_CONNECTION_URL:JDBC 连接网址
  • DRIVER_PATHS:JDBC 驱动程序以英文逗号分隔的 Cloud Storage 路径
  • CONNECTION_USERNAME:JDBC 连接用户名
  • CONNECTION_PASSWORD:JDBC 连接密码
  • CONNECTION_PROPERTIES:JDBC 连接属性(如有需要)
  • SQL_STATEMENT:要对数据库执行的 SQL 语句
  • INPUT_SUBSCRIPTION:要读取的 Pub/Sub 输入订阅。
  • OUTPUT_DEADLETTER_TOPIC:用于转发无法递送的消息的 Pub/Sub
  • KMS_ENCRYPTION_KEY:Cloud KMS 加密密钥

Cloud Spanner change streams to Cloud Storage

Cloud Spanner change streams to Cloud Storage 模板是一种流处理流水线,可流式传输 Spanner 数据更改记录并使用 Dataflow Runner V2 将其写入 Cloud Storage 存储桶。

流水线根据 Spanner 变更数据流记录的时间戳将其分组到窗口中,每个窗口代表一个时长,您可以使用此模板配置该时长。时间戳属于某个窗口的所有记录都保证在该窗口中;不会有延迟到达。您还可以定义多个输出分片。流水线会为每个窗口的每个分片创建一个 Cloud Storage 输出文件。在输出文件中,记录是无序的。输出文件可以采用 JSON 或 AVRO 格式编写,具体取决于用户配置。

请注意,通过在与 Cloud Spanner 实例或 Cloud Storage 存储桶相同的区域运行 Dataflow 作业,您可以最大限度地减少网络延迟和网络传输费用。如果您使用位于作业区域之外的源、接收器、暂存文件位置或临时文件位置,则数据可能会跨区域发送。详细了解 Dataflow 区域端点

详细了解变更数据流如何构建变更数据流 Dataflow 流水线最佳实践

对此流水线的要求

  • 在运行流水线之前,Cloud Spanner 实例必须已存在。
  • 在运行流水线之前,Cloud Spanner 数据库必须已存在。
  • 在运行流水线之前,Cloud Spanner 元数据实例必须已存在。
  • 在运行流水线之前,Cloud Spanner 元数据数据库必须已存在。
  • 在运行流水线之前,Cloud Spanner 变更数据流必须已存在。
  • 在运行流水线之前,Cloud Storage 输出存储桶必须已存在。

模板参数

参数 说明
spannerInstanceId 要从中读取变更数据流数据的 Cloud Spanner 实例 ID。
spannerDatabase 要从中读取变更数据流数据的 Cloud Spanner 数据库。
spannerMetadataInstanceId 用于变更数据流连接器元数据表的 Cloud Spanner 实例 ID。
spannerMetadataDatabase 要用于变更数据流连接器元数据表的 Cloud Spanner 数据库。
spannerChangeStreamName 要读取的 Cloud Spanner 变更数据流的名称。
gcsOutputDirectory 变更数据流输出在 Cloud Storage 中的文件位置,格式为:“gs://${BUCKET}/${ROOT_PATH}/”。
outputFilenamePrefix (可选)要写入的文件的文件名前缀。默认文件前缀设置为“output”。
spannerProjectId (可选)从中读取变更数据流的项目。这也是创建变更数据流连接器元数据表的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
startTimestamp (可选)要用于读取变更数据流的起始 DateTime(含边界值)。Ex-2021-10-12T07:20:50.52Z。默认为流水线启动时的时间戳,即当前时间。
endTimestamp (可选)要用于读取变更数据流的结束 DateTime(含边界值)。Ex-2021-10-12T07:20:50.52Z。默认为未来的无限时间。
outputFileFormat (可选)输出 Cloud Storage 文件的格式。允许的格式为 TEXT、AVRO。默认为 AVRO。
windowDuration (可选)窗口时长是将数据写入输出目录的时间间隔。请根据流水线的吞吐量配置时长。例如,较高的吞吐量可能需要较短的窗口时长,以便数据适应内存。默认为 5 分钟,最短可为 1 秒。允许的格式如下:[int]s(表示数秒,例如 5s)、[int]m(表示数分钟,例如 12m)、[int]h(表示数小时,例如 2h)。
rpcPriority (可选)Cloud Spanner 调用的请求优先级。该值必须为 [HIGH,MEDIUM,LOW] 之一。(默认值:HIGH)
numShards (可选)写入时产生的最大输出分片数。默认值为 20。分片数量越多,写入 Cloud Storage 的吞吐量越高,但处理输出 Cloud Storage 文件时跨分片聚合数据的费用也可能更高。
spannerMetadataTableName (可选)要使用的 Cloud Spanner 变更数据流连接器元数据表名称。如果未提供,系统会在流水线流期间自动创建 Cloud Spanner 变更数据流元数据表。更新现有流水线时必须提供此参数,其他情况下则不应提供。

运行 Cloud Spanner change streams to Cloud Storage 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner change streams to Google Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
gcsOutputDirectory=GCS_OUTPUT_DIRECTORY

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 实例 ID
  • SPANNER_DATABASE:Cloud Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Cloud Spanner 变更数据流
  • GCS_OUTPUT_DIRECTORY:变更数据流输出的文件位置

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "gcsOutputDirectory": "GCS_OUTPUT_DIRECTORY"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_Google_Cloud_Storage",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 实例 ID
  • SPANNER_DATABASE:Cloud Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Cloud Spanner 变更数据流
  • GCS_OUTPUT_DIRECTORY:变更数据流输出的文件位置

Cloud Spanner change streams to BigQuery

Cloud Spanner change stream to BigQuery 模板是一种流处理流水线,用于流式传输 Cloud Spanner 数据更改记录并使用 Dataflow Runner V2 将其写入 BigQuery 表。

如果需要的 BigQuery 表不存在,流水线会创建这些表。否则将使用现有 BigQuery 表。现有 BigQuery 表的架构必须包含 Cloud Spanner 表的相应跟踪列以及“ignoreFields”选项未显式忽略的其他元数据列(请参阅以下列表中元数据字段的说明)。每个新的 BigQuery 行都包含变更数据流在更改记录的时间戳从 Cloud Spanner 表中的对应行中监控的所有列。

所有变更数据流监控的列都会包含在每个 BigQuery 表行中,无论它们是否被 Cloud Spanner 事务修改。未监控的列不会包含在 BigQuery 行中。任何小于 Dataflow 水印的 Cloud Spanner 更改都会成功应用于 BigQuery 表,或存储在死信队列中进行重试。与原始 Cloud Spanner 提交时间戳排序相比,BigQuery 行插入是乱序的。

以下元数据字段会添加到 BigQuery 表中:

  • _metadata_spanner_mod_type:从变更数据流数据更改记录中提取。
  • _metadata_spanner_table_name:Cloud Spanner 表名称。请注意,这不是连接器的元数据表名称。
  • _metadata_spanner_commit_timestamp:从变更数据流数据更改记录中提取。
  • _metadata_spanner_server_transaction_id:从变更数据流数据更改记录中提取。
  • _metadata_spanner_record_sequence:从变更数据流数据更改记录中提取。
  • _metadata_spanner_is_last_record_in_transaction_in_partition:从变更数据流数据更改记录中提取。
  • _metadata_spanner_number_of_records_in_transaction:从变更数据流数据更改记录中提取。
  • _metadata_spanner_number_of_partitions_in_transaction:从变更数据流数据更改记录中提取。
  • _metadata_big_query_commit_timestamp:行插入 BigQuery 的提交时间戳。

注意:

  • 此模板不会将 Cloud Spanner 架构更改传播到 BigQuery。由于在 Cloud Spanner 中执行架构更改可能会中断流水线,因此您可能需要在架构更改后重新创建流水线。
  • 对于 OLD_AND_NEW_VALUESNEW_VALUES 值捕获类型,当数据更改记录包含 UPDATE 更改时,模板需要在数据更改记录的提交时间戳对 Cloud Spanner 执行过时读取,以检索未更改但受监控的列。请确保为过时读取正确配置数据库“version_retention_period”。对于 NEW_ROW 值捕获类型,模板效率更高,因为数据更改记录会捕获整个新行(包括 UPDATE 中未更新的列),并且模板不需要执行过时读取。
  • 您可以通过在与 Cloud Spanner 实例或 BigQuery 表相同的区域运行 Dataflow 作业来最大限度地减少网络延迟和网络传输费用。如果您使用位于作业区域之外的源、接收器、暂存文件位置或临时文件位置,则数据可能会跨区域发送。详细了解 Dataflow 区域端点
  • 此模板支持所有有效的 Cloud Spanner 数据类型,但如果 BigQuery 类型比 Cloud Spanner 类型更精确,则在转换期间可能会发生精确率下降。具体而言:
    • 对于 Cloud Spanner JSON 类型,对象的成员顺序按字典顺序排列,但对于 BigQuery JSON 类型没有此类保证。
    • Cloud Spanner 支持纳秒时间戳类型,BigQuery 仅支持微秒时间戳类型。

详细了解变更数据流如何构建变更数据流 Dataflow 流水线最佳实践

对此流水线的要求

  • 在运行流水线之前,Cloud Spanner 实例必须已存在。
  • 在运行流水线之前,Cloud Spanner 数据库必须已存在。
  • 在运行流水线之前,Cloud Spanner 元数据实例必须已存在。
  • 在运行流水线之前,Cloud Spanner 元数据数据库必须已存在。
  • 在运行流水线之前,Cloud Spanner 变更数据流必须已存在。
  • 在运行流水线之前,BigQuery 数据集必须已存在。

模板参数

参数 说明
spannerInstanceId 要从中读取变更数据流的 Cloud Spanner 实例。
spannerDatabase 要从中读取变更数据流的 Cloud Spanner 数据库。
spannerMetadataInstanceId 要用于变更数据流连接器元数据表的 Cloud Spanner 实例。
spannerMetadataDatabase 要用于变更数据流连接器元数据表的 Cloud Spanner 数据库。
spannerChangeStreamName 要读取的 Cloud Spanner 变更数据流的名称。
bigQueryDataSet 变更数据流输出的 BigQuery 数据集。
spannerProjectId (可选)从中读取变更数据流的项目。这也是创建变更数据流连接器元数据表的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
spannerMetadataTableName (可选)要使用的 Cloud Spanner 变更数据流连接器元数据表名称。如果未提供,系统会在流水线流期间自动创建 Cloud Spanner 变更数据流连接器元数据表。更新现有流水线时必须提供此参数,其他情况下则不应提供。
rpcPriority (可选)Cloud Spanner 调用的请求优先级。该值必须为 [HIGH,MEDIUM,LOW] 之一。(默认值:HIGH)
startTimestamp (可选)要用于读取变更数据流的起始 DateTime(含边界值)。Ex-2021-10-12T07:20:50.52Z。默认为流水线启动时的时间戳,即当前时间。
endTimestamp (可选)要用于读取变更数据流的结束 DateTime(含边界值)。Ex-2021-10-12T07:20:50.52Z。默认为未来的无限时间。
bigQueryProjectId (可选)BigQuery 项目。默认为 Dataflow 作业的项目。
bigQueryChangelogTableNameTemplate (可选)BigQuery 更新日志表名称的模板。默认为 {_metadata_spanner_table_name}_changelog。
deadLetterQueueDirectory (可选)用于存储未处理记录以及无法处理原因的文件路径。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,默认值就可以了。
dlqRetryMinutes (可选)死信队列重试之间的分钟数。默认值为 10。
ignoreFields (可选)要忽略的字段(区分大小写),以逗号分隔列表形式列出。这些字段可以是被监控表的字段,也可以是流水线添加的元数据字段。被忽略的字段不会插入到 BigQuery 中。

运行 Cloud Spanner change streams to BigQuery 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner change streams to BigQuery template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 实例 ID
  • SPANNER_DATABASE:Cloud Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Cloud Spanner 变更数据流
  • BIGQUERY_DATASET:变更数据流输出的 BigQuery 数据集

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 实例 ID
  • SPANNER_DATABASE:Cloud Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Cloud Spanner 变更数据流
  • BIGQUERY_DATASET:变更数据流输出的 BigQuery 数据集

Cloud Spanner change streams to Pub/Sub

Cloud Spanner change streams to the Pub/Sub 模板是一种流处理流水线,用于使用 Dataflow Runner V2 流式传输 Cloud Spanner 数据更改记录并将其写入 Pub/Sub 主题。

如需将数据输出到新的 Pub/Sub 主题,您需要先创建主题。创建后,Pub/Sub 会自动生成订阅并将其附加到新主题。如果您尝试将数据输出到不存在的 Pub/Sub 主题,则 Dataflow 流水线会抛出异常,并且流水线会在不断尝试建立连接时卡住。

如果存在必要的 Pub/Sub 主题,则可以将数据输出到该主题。

如需了解详情,请参阅关于变更数据流使用 Dataflow 构建变更数据流连接变更数据流最佳实践

对此流水线的要求

  • 在运行流水线之前,Cloud Spanner 实例必须已存在。
  • 在运行流水线之前,Cloud Spanner 数据库必须已存在。
  • 在运行流水线之前,Cloud Spanner 元数据实例必须已存在。
  • 在运行流水线之前,Cloud Spanner 元数据数据库必须已存在。
  • 在运行流水线之前,Cloud Spanner 变更数据流必须已存在。
  • 在运行此流水线之前,Pub/Sub 主题必须已存在。

模板参数

参数 说明
spannerInstanceId 要从中读取变更数据流的 Cloud Spanner 实例。
spannerDatabase 要从中读取变更数据流的 Cloud Spanner 数据库。
spannerMetadataInstanceId 要用于变更数据流连接器元数据表的 Cloud Spanner 实例。
spannerMetadataDatabase 要用于变更数据流连接器元数据表的 Cloud Spanner 数据库。
spannerChangeStreamName 要读取的 Cloud Spanner 变更数据流的名称。
pubsubTopic 变更数据流输出的 Pub/Sub 主题。
spannerProjectId (可选)从中读取变更数据流的项目。这也是创建变更数据流连接器元数据表的项目。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
spannerMetadataTableName (可选)要使用的 Cloud Spanner 变更数据流连接器元数据表名称。如果未提供,Cloud Spanner 将在流水线流更改期间自动创建流连接器元数据表。更新现有流水线时,您必须提供此参数。请勿将此参数用于其他情况。
rpcPriority (可选)Cloud Spanner 调用的请求优先级。该值必须为 [HIGH,MEDIUM,LOW] 之一。(默认值:HIGH)
startTimestamp (可选)要用于读取变更数据流的开始 DateTime(含边界值)。例如 ex-2021-10-12T07:20:50.52Z。默认为流水线启动时的时间戳,即当前时间。
endTimestamp (可选)要用于读取变更数据流的结束 DateTime(含边界值)。例如 ex-2021-10-12T07:20:50.52Z。默认为未来的无限时间。
outputFileFormat (可选)输出的格式。输出会封装在许多 PubsubMessage 中,并发送到 Pub/Sub 主题。允许的格式为 JSON 和 AVRO。默认值为 JSON。
pubsubAPI (可选)用于实现流水线的 Pub/Sub API。允许的 API 包括 pubsubionative_client。对于少量每秒查询次数 (QPS),native_client 的延迟时间较短。对于 QPS 而言,pubsubio 可提供更好且更稳定的性能。默认值为 pubsubio

运行 Cloud Spanner change streams to the Pub/Sub 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Spanner change streams to Pub/Sub template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

    gcloud beta dataflow flex-template run JOB_NAME \
        --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub \
        --region REGION_NAME \
        --parameters \
    spannerInstanceId=SPANNER_INSTANCE_ID,\
    spannerDatabase=SPANNER_DATABASE,\
    spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
    spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
    spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
    pubsubTopic=PUBSUB_TOPIC
    

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 实例 ID
  • SPANNER_DATABASE:Cloud Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Cloud Spanner 变更数据流
  • PUBSUB_TOPIC:变更数据流输出的 Pub/Sub 主题

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

  POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
  {
    "launch_parameter": {
        "jobName": "JOB_NAME",
        "parameters": {
            "spannerInstanceId": "SPANNER_INSTANCE_ID",
            "spannerDatabase": "SPANNER_DATABASE",
            "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
            "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
            "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
            "pubsubTopic": "PUBSUB_TOPIC"
        },
        "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/Spanner_Change_Streams_to_PubSub",
    }
  }
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • SPANNER_INSTANCE_ID:Cloud Spanner 实例 ID
  • SPANNER_DATABASE:Cloud Spanner 数据库
  • SPANNER_METADATA_INSTANCE_ID:Cloud Spanner 元数据实例 ID
  • SPANNER_METADATA_DATABASE:Cloud Spanner 元数据数据库
  • SPANNER_CHANGE_STREAM:Cloud Spanner 变更数据流
  • PUBSUB_TOPIC:变更数据流输出的 Pub/Sub 主题

MongoDB to BigQuery (CDC)

MongoDB to BigQuery CDC(变更数据捕获)模板是一种流处理流水线,可与 MongoDB 变更数据流结合使用。该流水线读取通过 MongoDB 变更数据流推送到 Pub/Sub 的 JSON 记录,并将其写入 BigQuery(由 userOption 参数指定)。

对此流水线的要求

  • 目标 BigQuery 数据集必须已存在。
  • 必须可从 Dataflow 工作器机器访问源 MongoDB 实例。
  • 从 MongoDB 向 Pub/Sub 推送更改的变更数据流应该正在运行。

模板参数

参数 说明
mongoDbUri MongoDB 连接 URI,格式为 mongodb+srv://:@
database 从中读取集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
outputTableSpec 要写入的 BigQuery 表。例如 bigquery-project:dataset.output_table
userOption FLATTENNONEFLATTEN 将文档展平至第一级。NONE 将整个文档存储为 JSON 字符串。
inputTopic 要读取的 Cloud Pub/Sub 输入主题,格式为 projects/<project>/topics/<topic>

运行 MongoDB to BigQuery (CDC) 模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域性端点为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the MongoDB to BigQuery (CDC) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC \
    --parameters \
outputTableSpec=OUTPUT_TABLE_SPEC,\
mongoDbUri=MONGO_DB_URI,\
database=DATABASE,\
collection=COLLECTION,\
userOption=USER_OPTION,\
inputTopic=INPUT_TOPIC

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • OUTPUT_TABLE_SPEC:您的 BigQuery 目标表的名称。
  • MONGO_DB_URI:您的 MongoDB URI。
  • DATABASE:您的 MongoDB 数据库。
  • COLLECTION:您的 MongoDB 集合。
  • USER_OPTION:FLATTEN 或 NONE。
  • INPUT_TOPIC:您的 Pub/Sub 输入主题。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "mongoDbUri": "MONGO_DB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "userOption": "USER_OPTION",
          "inputTopic": "INPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/VERSION/flex/MongoDB_to_BigQuery_CDC",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域端点,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

    • latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates/latest/) 中可用
    • 版本名称(如 2021-09-20-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates/) 中
  • OUTPUT_TABLE_SPEC:您的 BigQuery 目标表的名称。
  • MONGO_DB_URI:您的 MongoDB URI。
  • DATABASE:您的 MongoDB 数据库。
  • COLLECTION:您的 MongoDB 集合。
  • USER_OPTION:FLATTEN 或 NONE。
  • INPUT_TOPIC:您的 Pub/Sub 输入主题。