Google 提供的流处理模板

Google 提供了一组开源 Dataflow 模板。如需了解有关模板的一般信息,请参阅概览页面。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板页面。

本页面介绍流处理模板。

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

对此流水线的要求

  • Pub/Sub 消息必须采用 JSON 格式(请参阅此处)。例如,您可以将 {"k1":"v1", "k2":"v2"} 格式的消息插入具有两个列(k1k2)且数据类型为字符串的 BigQuery 表格中。
  • 在运行该流水线之前,输出表必须已存在。 表架构必须与输入 JSON 对象相匹配。

模板参数

参数 说明
inputSubscription 要读取的 Pub/Sub 输入订阅,格式为 projects/<project>/subscriptions/<subscription>
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
outputDeadletterTable 未能到达输出表的消息的 BigQuery 表,格式为 <my-project>:<my-dataset>.<my-table>。如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Pub/Sub Subscription to BigQuery 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub Subscription to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称。
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称。
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery 模板是一种流处理流水线,可从 Cloud Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

对此流水线的要求

  • Pub/Sub 消息必须采用 JSON 格式(请参阅此处)。例如,您可以将 {"k1":"v1", "k2":"v2"} 格式的消息插入具有两个列(k1k2)且数据类型为字符串的 BigQuery 表格中。
  • 在运行该流水线之前,输出表必须已存在。 表架构必须与输入 JSON 对象相匹配。

模板参数

参数 说明
inputTopic 要读取的 Cloud Pub/Sub 输入主题,格式为 projects/<project>/topics/<topic>
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
outputDeadletterTable 无法到达输出表的消息的 BigQuery 表。格式应为 <my-project>:<my-dataset>.<my-table>。 如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Cloud Pub/Sub Topic to BigQuery 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub Topic to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。

对此流水线的要求

  • 用作输入来源的 Pub/Sub 订阅必须存在。
  • Avro 记录的架构文件必须存在于 Cloud Storage 存储空间中。
  • 未处理的 Pub/Sub 主题必须存在。
  • 用作输出目标的 BigQuery 数据集必须已存在。

模板参数

参数 说明
schemaPath Avro 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/schema.avsc
inputSubscription 要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription>
outputTopic 要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 输出表位置。例如 <my-project>:<my-dataset>.<my-table>。 根据指定的 createDisposition,系统可能会使用用户提供的 Avro 架构自动创建输出表。
writeDisposition (可选)BigQuery WriteDisposition。例如 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认:WRITE_APPEND
createDisposition (可选)BigQuery CreateDisposition。例如 CREATE_IF_NEEDEDCREATE_NEVER。默认:CREATE_IF_NEEDED

运行 Pub/Sub Avro to BigQuery 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub Avro to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

请替换以下内容:

  • JOB_NAME:您选择的作业名称
  • REGION_NAME:Dataflow 地区名称(例如 us-central1
  • SCHEMA_PATH:Avro 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • DEADLETTER_TOPIC:要用于未处理的队列的 Pub/Sub 主题
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • JOB_NAME:您选择的作业名称
  • LOCATION:Dataflow 地区名称(例如 us-central1
  • SCHEMA_PATH:Avro 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • DEADLETTER_TOPIC:要用于未处理的队列的 Pub/Sub 主题
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并将这些消息写入其他 Pub/Sub 主题。该流水线还接受一个可选的消息属性键以及值,该值可用于过滤应写入 Pub/Sub 主题的消息。您可以使用此模板将消息从 Pub/Sub 订阅复制到带有可选消息过滤器的其他 Pub/Sub 主题。

对此流水线的要求

  • 源 Pub/Sub 订阅必须已存在才能执行流水线。
  • 目的地 Pub/Sub 主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name>
outputTopic 要将输出写入其中的 Cloud Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
filterKey [可选] 根据特性键过滤事件。如果未指定 filterKey,则不会应用过滤器。
filterValue [可选] 提供了 filterKey 时要使用的过滤器特性值。默认情况下,filterValue 为 null。

运行 Pub/Sub to Pub/Sub 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub to Pub/Sub template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOPIC_NAME:Pub/Sub 主题名称
  • FILTER_KEY:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。
  • FILTER_VALUE:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。部分匹配(如子字符串)不会被过滤。默认使用 null 事件过滤值。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOPIC_NAME:Pub/Sub 主题名称
  • FILTER_KEY:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。
  • FILTER_VALUE:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。部分匹配(如子字符串)不会被过滤。默认使用 null 事件过滤值。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub to Splunk

Pub/Sub to Splunk 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并通过 Splunk 的 HTTP Event Collector (HEC) 将消息载荷写入 Splunk。此模板的最常见使用场景是将日志导出到 Splunk。如需查看底层工作流的示例,请参阅使用 Dataflow 将支持生产环境的日志导出部署到 Splunk

在写入 Splunk 之前,您还可以将 JavaScript 用户定义函数应用于消息载荷。任何未能成功处理的的消息都会被转发到 Pub/Sub 未处理主题,以便进一步进行问题排查并重新处理。

要为 HEC 令牌提供额外保护,您还可以传入 Cloud KMS 密钥和使用 Cloud KMS 密钥加密的 base64 编码 HEC 令牌参数。如需详细了解如何对 HEC 令牌参数进行加密,请参阅 Cloud KMS API 加密端点

对此流水线的要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Splunk HEC 端点必须可从 Dataflow 工作器的网络访问。
  • Splunk HEC 令牌必须已生成并且可用。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name>
token Splunk HEC 身份验证令牌。此 base64 编码的字符串可以使用 Cloud KMS 密钥进行加密,以提高安全性。
url Splunk HEC 网址,必须可从运行流水线的 VPC 路由。例如,https://splunk-hec-host:8088。
outputDeadletterTopic 用于转发无法递送的消息的 Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath [可选] 包含所有 JavaScript 代码的 Cloud Storage 路径,例如 gs://mybucket/mytransforms/*.js
javascriptTextTransformFunctionName [可选] 要调用的 JavaScript 函数的名称。例如,如果您的 JavaScript 函数为 function myTransform(inJson) { ...dostuff...},则函数名称为 myTransform
batchCount [可选] 向 Splunk 发送多个事件的批次大小。默认值为 1(无批处理)。
parallelism [可选] 最大并行请求数。默认值为 1(无并行)。
disableCertificateValidation [可选] 停用 SSL 证书验证。默认为 false(已启用验证)。
includePubsubMessage [可选] 在载荷中包含完整的 Pub/Sub 消息。默认值为 false(只有数据元素包含在载荷中)。
tokenKMSEncryptionKey [可选] 用于解密 HEC 令牌字符串的 Cloud KMS 密钥。如果提供了 Cloud KMS 密钥,则必须以加密方式传递 HEC 令牌字符串。

运行 Pub/Sub to Splunk 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub to Splunk template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION:您的 JavaScript 函数名称
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径(例如 gs://your-bucket/your-function.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION:您的 JavaScript 函数名称
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径(例如 gs://your-bucket/your-function.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files on Cloud Storage 模板是一个流处理流水线,可从 Pub/Sub 主题中读取数据,并将 Avro 文件写入指定的 Cloud Storage 存储分区。

对此流水线的要求

  • Pub/Sub 输入主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputTopic 要订阅用来处理消息的 Pub/Sub 主题。主题名称必须采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 归档输出 Avro 文件的输出目录。末尾必须包含 /。 例如:gs://example-bucket/example-directory/
avroTempDirectory 临时 Avro 文件的目录。末尾必须包含 /。例如:gs://example-bucket/example-directory/
outputFilenamePrefix (可选)Avro 文件的输出文件名前缀。
outputFilenameSuffix (可选)Avro 文件的输出文件名后缀。
outputShardTemplate (可选)输出文件的分片模板。它被指定为字母 SN 的重复序列。例如 SSS-NNN。这些字母会分别替换成分片编号或分片总数。如果未指定此参数,则默认模板格式为 W-P-SS-of-NN

运行 Pub/Sub to Cloud Storage Avro 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub to Cloud Storage Avro template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
avroTempDirectory=gs://BUCKET_NAME/temp/

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:区域端点;例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置;例如 gs://your-bucket/temp
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE"
   }
}

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:区域端点;例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置;例如 gs://your-bucket/temp
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板

Pub/Sub to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。

对此流水线的要求

  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 发布到主题的消息必须采用文本格式。
  • 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。

模板参数

参数 说明
inputTopic 要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 用于写入输出文件的路径和文件名前缀,例如 gs://bucket-name/path/。该值必须以斜杠结尾。
outputFilenamePrefix 要在各窗口文件上放置的前缀。例如 output-
outputFilenameSuffix 要放置于每个窗口化文件上的后缀,通常是文件扩展名,例如 .txt.csv
outputShardTemplate 分片式模板定义每个窗口文件的动态部分。默认情况下,该管道使用单一碎片输出到各窗口内的文件系统。这意味着所有数据都将按窗口归入单个文件中。outputShardTemplate 默认为 W-P-SS-of-NN,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplateSS-of-NN 部分将为 00-of-01

运行 Pub/Sub to Text Files on Cloud Storage 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub to Text Files on Cloud Storage template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub to MongoDB

Pub/Sub to MongoDB 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 编码的消息并将其以文档的形式写入 MongoDB。如果需要,此流水线还能支持额外的转换,只需通过 JavaScript 用户定义的函数 (UDF) 包含这些转换即可。因架构不匹配、JOSN 格式不正确或执行转换时发生的所有错误都会记录在未处理消息的 BigQuery 表中,并随附输入消息。如果在执行之前不存在任何未处理记录的表,则流水线会自动创建该表。

对此流水线的要求

  • Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • MongoDB 集群必须存在,并且应该可通过 Dataflow 工作器机器访问。

模板参数

参数 说明
inputSubscription Pub/Sub 订阅的名称。例如:projects/<project-id>/subscriptions/<subscription-name>
mongoDBUri 以英文逗号分隔的 MongoDB 服务器列表。例如:192.285.234.12:27017,192.287.123.11:27017
database 存储集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
deadletterTable 存储失败消息(架构不匹配、JSON 格式错误)的 BigQuery 表。例如:project-id:dataset-name.table-name
javascriptTextTransformGcsPath [可选] 包含 UDF 转换的 JavaScript 文件的 Cloud Storage 位置。例如:gs://mybucket/filename.json
javascriptTextTransformFunctionName [可选] JavaScript UDF 的名称。例如:transform
batchSize [可选] 用于将文档批量插入 MongoDB 的批次大小。默认值:1000
batchSizeBytes [可选] 批次大小(以字节为单位)。默认值:5242880
maxConnectionIdleTime [可选] 在出现连接超时之前所允许的空闲时间上限(以秒为单位)。默认值:60000
sslEnabled [可选] 用于指示与 MongoDB 的连接是否启用了 SSL 的布尔值。默认值:true
ignoreSSLCertificate [可选] 用于指示是否应忽略 SSL 证书的布尔值。默认值:true
withOrdered [可选] 允许依次批量插入 MongoDB 的布尔值。默认值:true
withSSLInvalidHostNameAllowed [可选] 用于指示是否允许 SSL 连接使用无效主机名的布尔值。默认值:true

运行 Pub/Sub to MongoDB 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 Pub/Sub to MongoDB template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • REGION_NAME:Dataflow 地区名称(例如 us-central1
  • JOB_NAME:您选择的作业名称
  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/<project-id>/subscriptions/<subscription-name>
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • LOCATION:Dataflow 地区名称(例如 us-central1
  • JOB_NAME:您选择的作业名称
  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/<project-id>/subscriptions/<subscription-name>
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Datastream to Cloud Spanner

Datastream to Cloud Spanner 模板是一种流处理流水线,可从 Cloud Storage 存储分区中读取 Datastream 事件并将它们写入 Cloud Spanner 数据库。它预期用于将数据从 Datastream 迁移到 Cloud Spanner。

在执行模板之前,执行迁移所需的所有表必须存在于目标 Cloud Spanner 数据库中。因此,在数据迁移之前,必须完成从源数据库到目标 Cloud Spanner 的架构迁移。在迁移之前,数据可能存在表中。此模板不会将 DataStream 架构更改传播到 Cloud Spanner 数据库。

只有在所有数据都写入 Cloud Spanner 后,才能在迁移结束时保证数据一致性。为了存储对写入 Cloud Spanner 的每条记录的排序信息,此模板为 Cloud Spanner 数据库中的每个表创建了一个额外的表(称为影子表)。这用于确保迁移结束时的一致性。影子表在迁移后不会被删除,可在迁移结束时用于进行验证。

操作期间发生的任何错误(例如架构不匹配、JSON 文件格式错误或执行转换产生的错误)都会记录在错误队列中。错误队列是一个 Cloud Storage 文件夹,它以文本格式存储遇到错误的所有 Datastream 事件以及错误原因。这些错误可能是暂时性的,也可能是永久性的,它们存储在错误队列的相应 Cloud Storage 文件夹中。系统会自动重试暂时性错误,但不会自动重试永久性错误。如果发生永久性错误,您可以选择在模板运行期间更正更改事件,并将它们转移到可重试的存储分区。

对此流水线的要求

  • 处于正在运行未启动状态的 Datastream 数据流。
  • 要在其中复制 Datastream 事件的 Cloud Storage 存储分区。
  • 具有现有表的 Cloud Spanner 数据库。这些表可以为空,也可以包含数据。

模板参数

参数 说明
inputFilePattern Cloud Storage 中要复制的 Datastream 文件的位置。通常,这是数据流的根路径。
streamName 用于轮询架构信息和来源类型的数据流的名称或模板。
instanceId 在其中复制更改的 Cloud Spanner 实例。
databaseId 在其中复制更改的 Cloud Spanner 数据库。
projectId Cloud Spanner 项目 ID。
deadLetterQueueDirectory (可选)用于存储错误队列输出的文件路径。默认值为 Dataflow 作业的临时位置下的目录。
inputFileFormat (可选)Datastream 生成的输出文件的格式。例如:avro,json。默认值:avro
shadowTablePrefix (可选)用于为影子表命名的前缀。默认值:shadow_

运行 Datastream to Cloud Spanner 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 Datastream to Cloud Spanner template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner

请替换以下内容:

  • YOUR_PROJECT_ID:您的模板项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION_NAME:Dataflow 地区名称(例如 us-central1
  • GCS_FILE_PATH:用于存储 Datastream 事件的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:您的 Cloud Spanner 实例。
  • CLOUDSPANNER_DATABASE:您的 Cloud Spanner 数据库。
  • DLQ:错误队列目录的 Cloud Storage 路径。
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Datastream_to_CloudSpanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Datastream_to_CloudSpanner

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • YOUR_PROJECT_ID:您的模板项目 ID
  • JOB_NAME:您选择的作业名称
  • LOCATION:Dataflow 地区名称(例如 us-central1
  • GCS_FILE_PATH:用于存储 Datastream 事件的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE:您的 Cloud Spanner 实例。
  • CLOUDSPANNER_DATABASE:您的 Cloud Spanner 数据库。
  • DLQ:错误队列目录的 Cloud Storage 路径。
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Datastream_to_CloudSpanner",
   }
}
  

Text Files on Cloud Storage to BigQuery (Stream)

Text Files on Cloud Storage to BigQuery 流水线是一种流处理流水线,可用于流式传输 Cloud Storage 中存储的文本文件,使用您提供的 JavaScript 用户定义函数 (UDF) 转换这些文件,然后将结果附加到 BigQuery。

流水线无限期运行,需要通过取消而非排空手动终止,原因是其使用 Watch 转换,该转换是不支持排空的可拆分 DoFn

对此流水线的要求

  • 创建一个用于描述 BigQuery 中的输出表架构的 JSON 文件。

    确保有一个名为 BigQuery Schema 的顶级 JSON 数组,且该数组的内容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 格式。例如:

    {
      "BigQuery Schema": [
        {
          "name": "location",
          "type": "STRING"
        },
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "STRING"
        },
        {
          "name": "color",
          "type": "STRING",
          "mode": "REQUIRED"
        },
        {
          "name": "coffee",
          "type": "STRING",
          "mode": "REQUIRED"
        }
      ]
    }
    
  • 使用 UDF 函数(该函数提供转换文本行的逻辑)创建一个 JavaScript (.js) 文件。请注意,此函数必须返回 JSON 字符串。

    例如,以下函数将拆分 CSV 文件的每行文本,并通过转换值返回 JSON 字符串。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

模板参数

参数 说明
javascriptTextTransformGcsPath 您的 JavaScript UDF 的 Cloud Storage 位置。例如:gs://my_bucket/my_function.js
JSONPath 您的 BigQuery 架构文件的 Cloud Storage 位置,以 JSON 格式描述。例如:gs://path/to/my/schema.json
javascriptTextTransformFunctionName 要作为 UDF 调用的 JavaScript 函数的名称,例如:transform
outputTable 完全限定的 BigQuery 表,例如 my-project:dataset.table
inputFilePattern 您要处理的文本的 Cloud Storage 位置,例如:gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录,例如:gs://my-bucket/my-files/temp_dir
outputDeadletterTable 无法到达输出表的消息表。例如 my-project:dataset.my-unprocessed-table。如果消息表不存在,则系统将在执行流水线期间创建该表。 如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Cloud Storage Text to BigQuery (Stream) 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Storage Text to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION:UDF 的名称
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • BIGQUERY_UNPROCESSED_TABLE:未处理消息的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION:UDF 的名称
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • BIGQUERY_UNPROCESSED_TABLE:未处理消息的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Text Files on Cloud Storage to Pub/Sub (Stream)

此模板可以创建一种流处理流水线,该流水线可持续轮询新上传到 Cloud Storage 的文本文件,并逐行读取每个文件,然后将字符串发布到 Pub/Sub 主题。此外,此模板还能以包含 JSON 记录的换行符分隔文件或 CSV 文件形式,将记录发布到 Pub/Sub 主题进行实时处理。您可以使用此模板将数据重放到 Pub/Sub。

流水线无限期运行,需要通过“取消”而非“排空”手动终止,原因是其使用“Watch”转换,该转换是一个不支持排空的“SplittableDoFn”。

目前,轮询间隔固定为 10 秒。此模板不会对个别记录设置任何时间戳,因此在执行期间,事件时间将与发布时间相同。如果您的流水线依赖准确的事件时间来执行处理,建议不要使用此流水线。

对此流水线的要求

  • 输入文件必须采用换行符分隔的 JSON 或 CSV 格式。跨越源文件中多个行的记录会导致下游问题,因为文件中的每一行都将以消息形式发布到 Pub/Sub。
  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 流水线无限期运行,需要手动终止。

模板参数

参数 说明
inputFilePattern 需要读取的输入文件格式。例如 gs://bucket-name/files/*.jsongs://bucket-name/path/*.csv
outputTopic 要向其写入数据的 Pub/Sub 输入主题。该名称应采用 projects/<project-id>/topics/<topic-name> 格式。

运行 Text Files on Cloud Storage to Pub/Sub (Stream) 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Text Files on Cloud Storage to Pub/Sub (Stream) template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储分区的名称
  • FILE_PATTERN:要从 Cloud Storage 存储分区中读取的文件格式 glob(例如 path/*.csv
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储分区的名称
  • FILE_PATTERN:要从 Cloud Storage 存储分区中读取的文件格式 glob(例如 path/*.csv
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP)

Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) 模板是一种流处理流水线,会从 Cloud Storage 存储分区读取 csv 文件,调用 Cloud Data Loss Prevention (Cloud DLP) API 进行去标识化处理,并将经过去标识化处理的数据写入指定的 BigQuery 表中。此模板支持使用 Cloud DLP 检查模板和 Cloud DLP 去标识化模板。 这样,用户不但能检查潜在的敏感信息并执行去标识化,而且还能对结构化数据(其中有数据列被指定为要进行去标识化处理且无需检查)执行去标识化。

对此流水线的要求

  • 要令牌化的输入数据必须存在。
  • Cloud DLP 模板(例如,DeidentifyTemplate 和 InspectTemplate)必须存在。如需了解更多详细信息,请参阅 Cloud DLP 模板
  • BigQuery 数据集必须存在。

模板参数

参数 说明
inputFilePattern 要从中读取输入数据记录的 csv 文件。也可以使用通配符,例如 gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv
dlpProjectId 拥有 Cloud DLP API 资源的 Cloud DLP 项目 ID。 此 Cloud DLP 项目可以是拥有 Cloud DLP 模板的项目,也可以是其他项目; 例如 my_dlp_api_project
deidentifyTemplateName 要用于 API 请求的 Cloud DLP 去标识化模板;以 projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} 模式指定,例如 projects/my_project/deidentifyTemplates/100
datasetName 用于发送标记化结果的 BigQuery 数据集。
batchSize 用于发送数据以进行检查和/或去标记化处理的区块/批次大小。对于 csv 文件,batchSize 是一个批次包含的行数。用户必须根据记录大小和文件大小确定批次大小。请注意,Cloud DLP API 的载荷大小上限为每个 API 调用 524 KB。
inspectTemplateName [可选] 要用于 API 请求的 Cloud DLP 检查模板;以 projects/{template_project_id}/identifyTemplates/{idTemplateId} 模式指定,例如 projects/my_project/identifyTemplates/100

运行 Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

请替换以下内容:

  • TEMPLATE_PROJECT_ID:您的模板项目 ID
  • DLP_API_PROJECT_ID:您的 Cloud DLP API 项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_DATA:输入文件路径
  • DEIDENTIFY_TEMPLATE:Cloud DLPDeidentify 模板编号
  • DATASET_NAME:BigQuery 数据集名称
  • INSPECT_TEMPLATE_NUMBER:Cloud DLPInspect 模板编号
  • BATCH_SIZE_VALUE:批次大小(对于 csv 文件,批次大小是每个 API 的行数)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • TEMPLATE_PROJECT_ID:您的模板项目 ID
  • DLP_API_PROJECT_ID:您的 Cloud DLP API 项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_DATA:输入文件路径
  • DEIDENTIFY_TEMPLATE:Cloud DLPDeidentify 模板编号
  • DATASET_NAME:BigQuery 数据集名称
  • INSPECT_TEMPLATE_NUMBER:Cloud DLPInspect 模板编号
  • BATCH_SIZE_VALUE:批次大小(对于 csv 文件,批次大小是每个 API 的行数)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub 模版是一种流处理流水线,用于从 MySQL 数据库中读取包含更改数据的 Pub/Sub 消息并将记录写入 BigQuery。Debezium 连接器捕获对 MySQL 数据库的更改,并将更改后的数据发布到 Pub/Sub。然后,该模板读取 Pub/Sub 消息并将其写入 BigQuery。

您可以使用此模板同步 MySQL 数据库和 BigQuery 表。流水线将更改后的数据写入 BigQuery 暂存表,并间歇性地更新复制 MySQL 数据库的 BigQuery 表。

对此流水线的要求

  • Debezium 连接器必须已部署
  • Pub/Sub 消息必须在 Beam 行中序列化。

模板参数

参数 说明
inputSubscriptions 要读取的 Pub/Sub 输入订阅列表(以英文逗号分隔),格式为 <subscription>,<subscription>, ...
changeLogDataset 用于存储暂存表的 BigQuery 数据集,格式为 <my-dataset>
replicaDataset 用于存储副本表的 BigQuery 数据集的位置,格式为 <my-dataset>
updateFrequencySecs (可选)流水线更新复制 MySQL 数据库的 BigQuery 表的时间间隔。

运行 Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery 模版

如需运行此模板,请执行以下步骤:

  1. 在本地机器上,克隆 DataflowTemplates 代码库
  2. 切换到 v2/cdc-parent 目录:
  3. 确保已部署 Debezium 连接器
  4. 使用 Maven,运行 Dataflow 模板。

    替换以下值:

    • PROJECT_ID:您的项目 ID。
    • YOUR_SUBSCRIPTIONS:以英文逗号分隔的 Pub/Sub 订阅名称列表。
    • YOUR_CHANGELOG_DATASET:用于变更日志数据的 BigQuery 数据集。
    • YOUR_REPLICA_DATASET:您的副本表的 BigQuery 数据集。
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka to BigQuery

Apache Kafka to BigQuery 模板是一种流处理流水线,可从 Apache Kafka 提取文本数据、执行用户定义的函数 (UDF) 并将生成的记录输出到 BigQuery。在转换数据、执行 UDF 或插入输出表时出现的任何错误都将被插入 BigQuery 中单独的错误表。如果在执行之前错误表不存在,则创建该表。

对此流水线的要求

  • 输出 BigQuery 表必须已存在。
  • Apache Kafka 代理服务器必须正在运行并可从 Dataflow 工作器机器进行访问。
  • Apache Kafka 主题必须存在,并且消息必须采用有效的 JSON 格式编码。

模板参数

参数 说明
outputTableSpec 要写入 Apache Kafka 消息的 BigQuery 输出表位置,格式为 my-project:dataset.table
inputTopics 以英文逗号分隔的列表中可读取的 Apache Kafka 输入主题。例如:messages
bootstrapServers 以英文逗号分隔的列表中正在运行的 Apache Kafka Broker 服务器的主机地址,每个主机地址的格式为 35.70.252.199:9092
javascriptTextTransformGcsPath (可选)JavaScript UDF 的 Cloud Storage 位置路径。例如:gs://my_bucket/my_function.js
javascriptTextTransformFunctionName (可选)要作为 UDF 调用的 JavaScript 的名称。例如 transform
outputDeadletterTable (可选)未能到达输出表的消息的 BigQuery 表,格式为 my-project:dataset.my-deadletter-table。如果不存在,则系统会在流水线执行期间创建该表。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Apache Kafka to BigQuery 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Apache Kafka to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

请替换以下内容:

  • YOUR_PROJECT_ID:您的模板项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION_NAME:Dataflow 地区名称(例如 us-central1
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • KAFKA_TOPICS:Apache Kakfa 主题列表。如果提供了多个主题,请按照说明了解如何转义英文逗号。
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • YOUR_JAVASCRIPT_FUNCTION:UDF 的名称
  • KAFKA_SERVER_ADDRESSES:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都应与服务器可访问的端口号相匹配。例如:35.70.252.199:9092。如果提供了多个地址,请按照说明了解如何转义英文逗号。
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • YOUR_PROJECT_ID:您的模板项目 ID
  • JOB_NAME:您选择的作业名称
  • LOCATION:Dataflow 地区名称(例如 us-central1
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • KAFKA_TOPICS:Apache Kakfa 主题列表。如果提供了多个主题,请按照说明了解如何转义英文逗号。
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • YOUR_JAVASCRIPT_FUNCTION:UDF 的名称
  • KAFKA_SERVER_ADDRESSES:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都应与服务器可访问的端口号相匹配。例如:35.70.252.199:9092。如果提供了多个地址,请按照说明了解如何转义英文逗号。
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}
  

Datastream to BigQuery (Stream)

Datastream to BigQuery 模板是一种流处理流水线,可读取 Datastream 数据并将其复制到 BigQuery 中。该模板使用 Pub/Sub 通知从 Cloud Storage 中读取数据,并将其复制到时间分区的 BigQuery 暂存表中。复制后,该模板会在 BigQuery 中执行 MERGE,以将所有更改数据捕获 (CDC) 更改插入/更新到源表的副本中。

此模板处理创建和更新通过复制管理的 BigQuery 表。当需要数据定义语言 (DDL) 时,对 Datastream 的回调将提取源表架构并将其转换为 BigQuery 数据类型。支持的操作包括下列操作:

  • 在插入数据时创建新表。
  • 在 BigQuery 表中添加新列,且初始值为 null。
  • 在 BigQuery 中忽略丢弃的列,且未来的值为 null。
  • 重命名的列将作为新列添加到 BigQuery 中。
  • 类型更改不会传播到 BigQuery。

对此流水线的要求

  • 已准备好或已在复制数据的 Datastream 数据流。
  • 已经为 Datastream 数据启用 Cloud Storage Pub/Sub 通知
  • BigQuery 目标数据集已创建,而 Compute Engine 服务帐号已获授予管理员的访问权限。

模板参数

参数 说明
inputFilePattern Cloud Storage 中要复制的 Datastream 文件的位置。此文件位置通常是数据流的根路径。
gcsPubSubSubscription 包含 Datastream 文件通知的 Pub/Sub 订阅。例如 projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
inputFileFormat Datastream 生成的输出文件的格式。例如:avro,json。默认值:avro
outputStagingDatasetTemplate 包含暂存表的现有数据集的名称。您可以将模板 {_metadata_dataset} 包括为占位符,它将被替换为源数据集/架构的名称(例如 {_metadata_dataset}_log)。
outputDatasetTemplate 包含副本表的现有数据集的名称。您可以将模板 {_metadata_dataset} 包括为占位符,它将被替换为源数据集/架构的名称(例如 {_metadata_dataset})。
outputStagingTableNameTemplate (可选)暂存表的名称模板。默认值为 {_metadata_table}_log。如果您要复制多个架构,建议使用 {_metadata_schema}_{_metadata_table}_log
outputTableNameTemplate (可选)副本表名称的模板。默认值:{_metadata_table}。如果您要复制多个架构,建议使用 {_metadata_schema}_{_metadata_table}
outputProjectId (可选)BigQuery 数据集的项目,用于将数据输出到其中。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
deadLetterQueueDirectory (可选)用于存储未处理消息以及无法处理原因的文件路径。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,默认值就可以了。
streamName (可选)用于轮询架构信息的数据流的名称或模板。默认值:{_metadata_stream}
mergeFrequencyMinutes (可选)合并给定表格的时间间隔(分钟)。默认值:5。
dlqRetryMinutes (可选)死信队列 (DLQ) 重试之间的分钟数。默认值:10。

运行 Datastream to BigQuery 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 Datastream to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery

请替换以下内容:

  • YOUR_PROJECT_ID:您的模板项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION_NAME:Dataflow 地区名称(例如 us-central1
  • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如 projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • BIGQUERY_DATASET:您的 BigQuery 数据集名称。
  • BIGQUERY_TABLE:您的 BigQuery 表模板。例如 {_metadata_schema}_{_metadata_table}_log
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_Datastream_to_BigQuery \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
outputStagingDatasetTemplate=BIGQUERY_DATASET,\
outputDatasetTemplate=BIGQUERY_DATASET,\
outputStagingTableNameTemplate=BIGQUERY_TABLE,\
outputTableNameTemplate=BIGQUERY_TABLE_log
  

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • YOUR_PROJECT_ID:您的模板项目 ID
  • JOB_NAME:您选择的作业名称
  • LOCATION:Dataflow 地区名称(例如 us-central1
  • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如 projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • BIGQUERY_DATASET:您的 BigQuery 数据集名称。
  • BIGQUERY_TABLE:您的 BigQuery 表模板。例如 {_metadata_schema}_{_metadata_table}_log
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
          "outputDatasetTemplate": "BIGQUERY_DATASET",
          "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
          "outputTableNameTemplate": "BIGQUERY_TABLE_log"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_BigQuery",
   }
}
  

Datastream to PostgreSQL (Stream)

Datastream to PostgreSQL 模板是一种流处理流水线,用于读取 Datastream 数据并将其复制到任何 PostgreSQL 数据库。该模板使用 Pub/Sub 通知从 Cloud Storage 中读取数据,并将此数据复制到 PostgreSQL 副本表。

该模板不支持数据定义语言 (DDL),并要求所有表已存在于 PostgreSQL 中。复制使用 Dataflow 有状态转换来过滤过时的数据,并确保数据无序的一致性。例如,如果传递了某行的较新版本,则会忽略该行较晚到达的版本。执行的数据操纵语言 (DML) 是将源数据最佳复制到目标数据的最佳尝试。执行的 DML 语句遵循以下规则:

  • 如果存在主键,则插入和更新操作使用插入/更新语法(例如 INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE)。
  • 如果存在主键,则删除将作为删除 DML 复制。
  • 如果不存在主键,则会将插入和更新操作插入表中。
  • 如果不存在主键,则会忽略删除操作。

如果您使用的是 Oracle to Postgres 实用程序,请在 PostgreSQL 中添加 ROWID 作为主键(如果主键不存在)。

对此流水线的要求如下

  • 已准备好或已在复制数据的 Datastream 数据流。
  • 已经为 Datastream 数据启用 Cloud Storage Pub/Sub 通知
  • 为 PostgreSQL 数据库提供了所需的架构。
  • 已在 Dataflow 工作器和 PostgreSQL 之间设置网络访问权限。

模板参数

参数 说明
inputFilePattern Cloud Storage 中要复制的 Datastream 文件的位置。此文件位置通常是数据流的根路径。
gcsPubSubSubscription 包含 Datastream 文件通知的 Pub/Sub 订阅。例如 projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
inputFileFormat Datastream 生成的输出文件的格式。例如:avro,json。默认值:avro
databaseHost 要在其上执行连接的 PostgreSQL 主机。
databaseUser 具有写入副本中所有表所需的全部权限的 PostgreSQL 用户。
databasePassword 给定 PostgreSQL 用户的密码。
databasePort (可选)要连接到的 PostgreSQL 数据库端口。默认值:5432。
databaseName (可选)要连接到的 PostgreSQL 数据库的名称。默认值:postgres。
streamName (可选)用于轮询架构信息的数据流的名称或模板。默认值:{_metadata_stream}

运行 Datastream to PostgreSQL 模板

控制台

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 Datastream to PostgreSQL template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

gcloud

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Postgres

请替换以下内容:

  • YOUR_PROJECT_ID:您的模板项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION_NAME:Dataflow 地区名称(例如 us-central1
  • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如 projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • DATABASE_HOST:您的 PostgreSQL 主机 IP。
  • DATABASE_USER:您的 PostgreSQL 用户。
  • DATABASE_PASSWORD:您的 PostgreSQL 密码。
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Postgres \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Cloud_Datastream_to_Postgres

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • YOUR_PROJECT_ID:您的模板项目 ID
  • JOB_NAME:您选择的作业名称
  • LOCATION:Dataflow 地区名称(例如 us-central1
  • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如 projects/PROJECT-ID/subscriptions/SUBSCRIPTION-NAME
  • DATABASE_HOST:您的 PostgreSQL 主机 IP。
  • DATABASE_USER:您的 PostgreSQL 用户。
  • DATABASE_PASSWORD:您的 PostgreSQL 密码。
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Postgres",
   }
}