Google 提供的流处理模板

Google 提供了一组开源 Dataflow 模板。如需了解有关模板的一般信息,请参阅概览页面。如需查看 Google 提供的所有模板的列表,请参阅开始使用 Google 提供的模板页面。

本页面介绍了流处理模板:

Pub/Sub Subscription to BigQuery

Pub/Sub Subscription to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

对此流水线的要求

  • Pub/Sub 消息必须采用 JSON 格式(请参阅此处)。例如,您可以将 {"k1":"v1", "k2":"v2"} 格式的消息插入具有两个列(k1k2)且数据类型为字符串的 BigQuery 表格中。
  • 在运行该流水线之前,输出表必须已存在。

模板参数

参数 说明
inputSubscription 要读取的 Pub/Sub 输入订阅,格式为 projects/<project>/subscriptions/<subscription>
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
outputDeadletterTable 未能到达输出表的消息的 BigQuery 表,格式为 <my-project>:<my-dataset>.<my-table>。如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Pub/Sub Subscription to BigQuery 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub Subscription to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称。
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/PubSub_Subscription_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称。
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_Subscription_to_BigQuery
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   },
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
}

Pub/Sub Topic to BigQuery

Pub/Sub Topic to BigQuery 模板是一种流处理流水线,可从 Cloud Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表格中。您可以使用该模板作为将 Pub/Sub 数据移动到 BigQuery 的快速解决方案。此模板可从 Pub/Sub 中读取 JSON 格式的消息并将其转换为 BigQuery 元素。

对此流水线的要求

  • Pub/Sub 消息必须采用 JSON 格式(请参阅此处)。例如,您可以将 {"k1":"v1", "k2":"v2"} 格式的消息插入具有两个列(k1k2)且数据类型为字符串的 BigQuery 表格中。
  • 输出表必须存在才能执行流水线。

模板参数

参数 说明
inputTopic 要读取的 Cloud Pub/Sub 输入主题,格式为 projects/<project>/topics/<topic>
outputTableSpec BigQuery 输出表位置,格式为 <my-project>:<my-dataset>.<my-table>
outputDeadletterTable 无法到达输出表的消息的 BigQuery 表。格式应为 <my-project>:<my-dataset>.<my-table>。 如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Cloud Pub/Sub Topic to BigQuery 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub Topic to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/PubSub_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • DATASET:您的 BigQuery 数据集
  • TABLE_NAME:您的 BigQuery 表名称
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/PubSub_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
   }
}

Pub/Sub Avro to BigQuery

Pub/Sub Avro to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。

对此流水线的要求

  • 用作输入来源的 Pub/Sub 订阅必须存在。
  • Avro 记录的架构文件必须存在于 Cloud Storage 存储空间中。
  • 未处理的 Pub/Sub 主题必须存在。
  • 用作输出目标的 BigQuery 数据集必须已存在。

模板参数

参数 说明
schemaPath Avro 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/schema.avsc
inputSubscription 要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription>
outputTopic 要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 输出表位置。例如 <my-project>:<my-dataset>.<my-table>。 根据指定的 createDisposition,系统可能会使用用户提供的 Avro 架构自动创建输出表。
writeDisposition (可选)BigQuery WriteDisposition。例如 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认:WRITE_APPEND
createDisposition (可选)BigQuery CreateDisposition。例如 CREATE_IF_NEEDEDCREATE_NEVER。默认:CREATE_IF_NEEDED

运行 Pub/Sub Avro to BigQuery 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub Avro to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

请替换以下内容:

  • JOB_NAME:您选择的作业名称
  • REGION_NAME:Dataflow 地区名称(例如 us-central1
  • SCHEMA_PATH:Avro 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • DEADLETTER_TOPIC:要用于未处理的队列的 Pub/Sub 主题
gcloud beta dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/PubSub_Avro_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • JOB_NAME:您选择的作业名称
  • LOCATION:Dataflow 地区名称(例如 us-central1
  • SCHEMA_PATH:Avro 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.avsc
  • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
  • BIGQUERY_TABLE:BigQuery 输出表名称
  • DEADLETTER_TOPIC:要用于未处理的队列的 Pub/Sub 主题
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Pub/Sub to Pub/Sub

Pub/Sub to Pub/Sub 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并将这些消息写入其他 Pub/Sub 主题。该流水线还接受一个可选的消息属性键以及值,该值可用于过滤应写入 Pub/Sub 主题的消息。您可以使用此模板将消息从 Pub/Sub 订阅复制到带有可选消息过滤器的其他 Pub/Sub 主题。

对此流水线的要求

  • 源 Pub/Sub 订阅必须已存在才能执行流水线。
  • 目的地 Pub/Sub 主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name>
outputTopic 要将输出写入其中的 Cloud Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
filterKey [可选] 根据特性键过滤事件。如果未指定 filterKey,则不会应用过滤器。
filterValue [可选] 提供了 filterKey 时要使用的过滤器特性值。默认情况下,filterValue 为 null。

运行 Pub/Sub to Pub/Sub 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub to Pub/Sub template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOPIC_NAME:Pub/Sub 主题名称
  • FILTER_KEY:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。
  • FILTER_VALUE:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。部分匹配(如子字符串)不会被过滤。默认使用 null 事件过滤值。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
filterKey=FILTER_KEY,\
filterValue=FILTER_VALUE

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Cloud_PubSub

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOPIC_NAME:Pub/Sub 主题名称
  • FILTER_KEY:用于过滤事件的属性键。如果未指定键,则不会应用过滤器。
  • FILTER_VALUE:提供事件过滤键时要使用的过滤器属性值。接受有效的 Java 正则表达式字符串作为事件过滤值。如果提供了正则表达式,则应匹配整个表达式以过滤消息。部分匹配(如子字符串)不会被过滤。默认使用 null 事件过滤值。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "filterKey": "FILTER_KEY",
       "filterValue": "FILTER_VALUE"
   }
}

Pub/Sub to Splunk

Pub/Sub to Splunk 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并通过 Splunk 的 HTTP Event Collector (HEC) 将消息载荷写入 Splunk。在写入 Splunk 之前,您还可以将 JavaScript 用户定义函数应用于消息载荷。任何未能成功处理的的消息都会被转发到 Pub/Sub 未处理的主题,以便进一步进行问题排查并重新处理。

要为 HEC 令牌提供额外保护,您还可以传入 Cloud KMS 密钥和使用 Cloud KMS 密钥加密的 base64 编码 HEC 令牌参数。如需详细了解如何对 HEC 令牌参数进行加密,请参阅 Cloud KMS API 加密端点

对此流水线的要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Splunk HEC 端点必须可从 Dataflow 工作器的网络访问。
  • Splunk HEC 令牌必须已生成并且可用。

模板参数

参数 说明
inputSubscription 要从中读取输入的 Pub/Sub 订阅,例如 projects/<project-id>/subscriptions/<subscription-name>
token Splunk HEC 身份验证令牌。此 base64 编码的字符串可以使用 Cloud KMS 密钥进行加密,以提高安全性。
url Splunk HEC 网址,必须可从运行流水线的 VPC 路由。例如,https://splunk-hec-host:8088。
outputDeadletterTopic 用于转发无法递送的消息的 Pub/Sub 主题,例如 projects/<project-id>/topics/<topic-name>
javascriptTextTransformGcsPath [可选] 包含所有 JavaScript 代码的 Cloud Storage 路径,例如 gs://mybucket/mytransforms/*.js
javascriptTextTransformFunctionName [可选] 要调用的 JavaScript 函数的名称。例如,如果您的 JavaScript 函数为 function myTransform(inJson) { ...dostuff...},则函数名称为 myTransform
batchCount [可选] 向 Splunk 发送多个事件的批次大小。默认值为 1(无批处理)。
parallelism [可选] 最大并行请求数。默认值为 1(无并行)。
disableCertificateValidation [可选] 停用 SSL 证书验证。默认为 false(已启用验证)。
includePubsubMessage [可选] 在载荷中包含完整的 Pub/Sub 消息。默认值为 false(只有数据元素包含在载荷中)。
tokenKMSEncryptionKey [可选] 用于解密 HEC 令牌字符串的 Cloud KMS 密钥。如果提供了 Cloud KMS 密钥,则必须以加密方式传递 HEC 令牌字符串。

运行 Pub/Sub to Splunk 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub to Splunk template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION:您的 JavaScript 函数名称
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径(例如 gs://your-bucket/your-function.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Splunk

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION:您的 JavaScript 函数名称
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径(例如 gs://your-bucket/your-function.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION"
   }
}

Pub/Sub to Avro Files on Cloud Storage

Pub/Sub to Avro files on Cloud Storage 模板是一个流处理流水线,可从 Pub/Sub 主题中读取数据,并将 Avro 文件写入指定的 Cloud Storage 存储分区。

对此流水线的要求

  • Pub/Sub 输入主题必须已存在才能执行此流水线。

模板参数

参数 说明
inputTopic 要订阅用来处理消息的 Cloud Pub/Sub 主题。主题名称应采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 要用于归档输出 Avro 文件的输出目录。在末尾添加“/”。例如:gs://example-bucket/example-directory/
avroTempDirectory 临时 Avro 文件的目录。请在末尾添加 /,例如 gs://example-bucket/example-directory/
outputFilenamePrefix [可选] Avro 文件的输出文件名前缀。
outputFilenameSuffix [可选] Avro 文件的输出文件名后缀。
outputShardTemplate [可选] 输出文件的分片模板。以一系列重复“S”或“N”字母的形式指定(例如 SSS-NNN)。 这些字母会分别替换成分片编号或分片数。 未指定此参数时,默认模板格式为“W-P-SS-of-NN”。
numShards [可选]写入时产生的最大输出分片数。默认最大分片数为 1。

运行 Pub/Sub to Cloud Storage Avro 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub to Cloud Storage Avro template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板
  • NUM_SHARDS:输出分片数量
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=FILENAME_PREFIX,\
outputFilenameSuffix=FILENAME_SUFFIX,\
outputShardTemplate=SHARD_TEMPLATE,\
numShards=NUM_SHARDS,\
avroTempDirectory=gs://BUCKET_NAME/temp/

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_Avro

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
  • FILENAME_PREFIX:首选输出文件名前缀
  • FILENAME_SUFFIX:首选输出文件名后缀
  • SHARD_TEMPLATE:首选输出分片模板
  • NUM_SHARDS:输出分片数量
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_Avro
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": TEMP_LOCATION,
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME",
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "avroTempDirectory": "gs://BUCKET_NAME/temp/",
       "outputFilenamePrefix": "FILENAME_PREFIX",
       "outputFilenameSuffix": "FILENAME_SUFFIX",
       "outputShardTemplate": "SHARD_TEMPLATE",
       "numShards": "NUM_SHARDS",
   }
}

Pub/Sub to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。

对此流水线的要求

  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 发布到主题的消息必须采用文本格式。
  • 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。

模板参数

参数 说明
inputTopic 要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<project-id>/topics/<topic-name> 格式。
outputDirectory 用于写入输出文件的路径和文件名前缀,例如 gs://bucket-name/path/。该值必须以斜杠结尾。
outputFilenamePrefix 要在各窗口文件上放置的前缀。例如 output-
outputFilenameSuffix 要放置于每个窗口化文件上的后缀,通常是文件扩展名,例如 .txt.csv
outputShardTemplate 分片式模板定义每个窗口文件的动态部分。默认情况下,该管道使用单一碎片输出到各窗口内的文件系统。这意味着所有数据都将按窗口归入单个文件中。outputShardTemplate 默认为 W-P-SS-of-NN,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplateSS-of-NN 部分将为 00-of-01

运行 Pub/Sub to Text Files on Cloud Storage 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Pub/Sub to Text Files on Cloud Storage template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Cloud_PubSub_to_GCS_Text

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储分区的名称。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_IDlocations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

Pub/Sub to MongoDB

Pub/Sub to MongoDB 模板是一种流处理流水线,可从 Pub/Sub 订阅读取 JSON 编码的消息并将其以文档的形式写入 MongoDB。如果需要,此流水线还能支持额外的转换,只需通过 JavaScript 用户定义的函数 (UDF) 包含这些转换即可。因架构不匹配、JOSN 格式不正确或执行转换时发生的所有错误都会记录在未处理消息的 BigQuery 表中,并随附输入消息。如果在执行之前不存在任何未处理记录的表,则流水线会自动创建该表。

对此流水线的要求

  • Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • MongoDB 集群必须存在,并且应该可通过 Dataflow 工作器机器访问。

模板参数

参数 说明
inputSubscription Pub/Sub 订阅的名称。例如:projects/<project-id>/subscriptions/<subscription-name>
mongoDBUri 以英文逗号分隔的 MongoDB 服务器列表。例如:192.285.234.12:27017,192.287.123.11:27017
database 存储集合的 MongoDB 数据库。例如:my-db
collection MongoDB 数据库中集合的名称。例如:my-collection
deadletterTable 存储失败消息(架构不匹配、JSON 格式错误)的 BigQuery 表。例如:project-id:dataset-name.table-name
javascriptTextTransformGcsPath [可选] 包含 UDF 转换的 JavaScript 文件的 Cloud Storage 位置。例如:gs://mybucket/filename.json
javascriptTextTransformFunctionName [可选] JavaScript UDF 的名称。例如:transform
batchSize [可选] 用于将文档批量插入 MongoDB 的批次大小。默认值:1000
batchSizeBytes [可选] 批次大小(以字节为单位)。默认值:5242880
maxConnectionIdleTime [可选] 在出现连接超时之前所允许的空闲时间上限(以秒为单位)。默认值:60000
sslEnabled [可选] 用于指示与 MongoDB 的连接是否启用了 SSL 的布尔值。默认值:true
ignoreSSLCertificate [可选] 用于指示是否应忽略 SSL 证书的布尔值。默认值:true
withOrdered [可选] 允许依次批量插入 MongoDB 的布尔值。默认值:true
withSSLInvalidHostNameAllowed [可选] 用于指示是否允许 SSL 连接使用无效主机名的布尔值。默认值:true

运行 Pub/Sub to MongoDB 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 Pub/Sub to MongoDB template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • REGION_NAME:Dataflow 地区名称(例如 us-central1
  • JOB_NAME:您选择的作业名称
  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/<project-id>/subscriptions/<subscription-name>
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name
gcloud beta dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Cloud_PubSub_to_MongoDB

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • LOCATION:Dataflow 地区名称(例如 us-central1
  • JOB_NAME:您选择的作业名称
  • INPUT_SUBSCRIPTION:Pub/Sub 订阅(例如 projects/<project-id>/subscriptions/<subscription-name>
  • MONGODB_URI:MongoDB 服务器地址(例如 192.285.234.12:27017,192.287.123.11:27017
  • DATABASE:MongoDB 数据库名称(例如 users
  • COLLECTION:MongoDB 集合名称(例如 profiles
  • UNPROCESSED_TABLE:BigQuery 表名称(例如 your-project:your-dataset.your-table-name
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Text Files on Cloud Storage to BigQuery (Stream)

Text Files on Cloud Storage to BigQuery 流水线是一种流处理流水线,可用于流式传输 Cloud Storage 中存储的文本文件,使用您提供的 JavaScript 用户定义函数 (UDF) 转换这些文件,然后将结果输出到 BigQuery。

对此流水线的要求

  • 创建一个 JSON 格式的 BigQuery 架构文件,用于描述您的输出表。
    {
        'fields': [{
            'name': 'location',
            'type': 'STRING'
        }, {
            'name': 'name',
            'type': 'STRING'
        }, {
            'name': 'age',
            'type': 'STRING',
        }, {
            'name': 'color',
            'type': 'STRING'
        }, {
            'name': 'coffee',
            'type': 'STRING',
            'mode': 'REQUIRED'
        }, {
            'name': 'cost',
            'type': 'NUMERIC',
            'mode': 'REQUIRED'
        }]
    }
    
  • 使用您的 UDF 函数(该函数提供转换文本行的逻辑)创建一个 JavaScript (.js) 文件。请注意,此函数必须返回 JSON 字符串。

    例如,以下函数将拆分 CSV 文件的每行文本,并通过转换值返回 JSON 字符串。

    function transform(line) {
    var values = line.split(',');
    
    var obj = new Object();
    obj.location = values[0];
    obj.name = values[1];
    obj.age = values[2];
    obj.color = values[3];
    obj.coffee = values[4];
    var jsonString = JSON.stringify(obj);
    
    return jsonString;
    }
    

模板参数

参数 说明
javascriptTextTransformGcsPath 您的 JavaScript UDF 的 Cloud Storage 位置。例如:gs://my_bucket/my_function.js
JSONPath 您的 BigQuery 架构文件的 Cloud Storage 位置,以 JSON 格式描述。例如:gs://path/to/my/schema.json
javascriptTextTransformFunctionName 要作为 UDF 调用的 JavaScript 函数的名称,例如:transform
outputTable 完全限定的 BigQuery 表,例如 my-project:dataset.table
inputFilePattern 您要处理的文本的 Cloud Storage 位置,例如:gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录,例如:gs://my-bucket/my-files/temp_dir
outputDeadletterTable 无法到达输出表的消息表。例如 my-project:dataset.my-unprocessed-table。如果消息表不存在,则系统将在执行流水线期间创建该表。 如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Cloud Storage Text to BigQuery (Stream) 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Cloud Storage Text to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION:UDF 的名称
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • BIGQUERY_UNPROCESSED_TABLE:未处理消息的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • JAVASCRIPT_FUNCTION:UDF 的名称
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • BIGQUERY_UNPROCESSED_TABLE:未处理消息的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/REGION/templates:launch?gcsPath=gs://dataflow-templates/latest/Stream_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
   }
}

Text Files on Cloud Storage to Pub/Sub (Stream)

此模板可以创建一种流处理流水线,该流水线可持续轮询新上传到 Cloud Storage 的文本文件,并逐行读取每个文件,然后将字符串发布到 Pub/Sub 主题。此外,此模板还能以包含 JSON 记录的换行符分隔文件或 CSV 文件形式,将记录发布到 Pub/Sub 主题进行实时处理。您可以使用此模板将数据重放到 Pub/Sub。

目前,轮询间隔固定为 10 秒。此模板不会对个别记录设置任何时间戳,因此在执行期间,事件时间将与发布时间相同。如果您的流水线依赖准确的事件时间来执行处理,建议不要使用此流水线。

对此流水线的要求

  • 输入文件必须采用换行符分隔的 JSON 或 CSV 格式。跨越源文件中多个行的记录会导致下游问题,因为文件中的每一行都将以消息形式发布到 Pub/Sub。
  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 流水线无限期运行,需要手动终止。

模板参数

参数 说明
inputFilePattern 需要读取的输入文件格式。例如 gs://bucket-name/files/*.jsongs://bucket-name/path/*.csv
outputTopic 要向其写入数据的 Pub/Sub 输入主题。该名称应采用 projects/<project-id>/topics/<topic-name> 格式。

运行 Text Files on Cloud Storage to Pub/Sub (Stream) 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Text Files on Cloud Storage to Pub/Sub (Stream) template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储分区的名称
  • FILE_PATTERN:要从 Cloud Storage 存储分区中读取的文件格式 glob(例如 path/*.csv
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub \
    --region REGION\
    --staging-location TEMP_LOCATION\
    --parameters \
inputFilePattern=gs://BUCKET_NAME/FILE_PATTERN,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_GCS_Text_to_Cloud_PubSub

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • PROJECT_ID:您的项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储分区的名称
  • FILE_PATTERN:要从 Cloud Storage 存储分区中读取的文件格式 glob(例如 path/*.csv
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "gs://your-bucket/temp",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/FILE_PATTERN",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   }
}

Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP)

Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) 模板是一种流处理流水线,会从 Cloud Storage 存储分区读取 csv 文件,调用 Cloud Data Loss Prevention (Cloud DLP) API 进行去标识化处理,并将经过去标识化处理的数据写入指定的 BigQuery 表中。此模板支持使用 Cloud DLP 检查模板和 Cloud DLP 去标识化模板。 这样,用户不但能检查潜在的敏感信息并执行去标识化,而且还能对结构化数据(其中有数据列被指定为要进行去标识化处理且无需检查)执行去标识化。

对此流水线的要求

  • 要令牌化的输入数据必须存在。
  • Cloud DLP 模板(例如,DeidentifyTemplate 和 InspectTemplate)必须存在。如需了解更多详细信息,请参阅 Cloud DLP 模板
  • BigQuery 数据集必须存在。

模板参数

参数 说明
inputFilePattern 要从中读取输入数据记录的 csv 文件。也可以使用通配符,例如 gs://mybucket/my_csv_filename.csvgs://mybucket/file-*.csv
dlpProjectId 拥有 Cloud DLP API 资源的 Cloud DLP 项目 ID。 此 Cloud DLP 项目可以是拥有 Cloud DLP 模板的项目,也可以是其他项目; 例如 my_dlp_api_project
deidentifyTemplateName 要用于 API 请求的 Cloud DLP 去标识化模板;以 projects/{template_project_id}/deidentifyTemplates/{deIdTemplateId} 模式指定,例如 projects/my_project/deidentifyTemplates/100
datasetName 用于发送标记化结果的 BigQuery 数据集。
batchSize 用于发送数据以进行检查和/或去标记化处理的区块/批次大小。对于 csv 文件,batchSize 是一个批次包含的行数。用户必须根据记录大小和文件大小确定批次大小。请注意,Cloud DLP API 的载荷大小上限为每个 API 调用 524 KB。
inspectTemplateName [可选] 要用于 API 请求的 Cloud DLP 检查模板;以 projects/{template_project_id}/identifyTemplates/{idTemplateId} 模式指定,例如 projects/my_project/identifyTemplates/100

运行 Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Data Masking/Tokenization from Cloud Storage to BigQuery (using Cloud DLP) template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须拥有 Cloud SDK 138.0.0 或更高版本。

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

请替换以下内容:

  • TEMPLATE_PROJECT_ID:您的模板项目 ID
  • DLP_API_PROJECT_ID:您的 Cloud DLP API 项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_DATA:输入文件路径
  • DEIDENTIFY_TEMPLATE:Cloud DLPDeidentify 模板编号
  • DATASET_NAME:BigQuery 数据集名称
  • INSPECT_TEMPLATE_NUMBER:Cloud DLPInspect 模板编号
  • BATCH_SIZE_VALUE:批次大小(对于 csv 文件,批次大小是每个 API 的行数)
gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery \
    --region REGION \
    --staging-location TEMP_LOCATION \
    --parameters \
inputFilePattern=INPUT_DATA,\
datasetName=DATASET_NAME,\
batchSize=BATCH_SIZE_VALUE,\
dlpProjectId=DLP_API_PROJECT_ID,\
deidentifyTemplateName=projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE,\
inspectTemplateName=projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER

API

通过 REST API 运行

运行此模板时,您需要使用模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/Stream_DLP_GCS_Text_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • TEMPLATE_PROJECT_ID:您的模板项目 ID
  • DLP_API_PROJECT_ID:您的 Cloud DLP API 项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION:地区端点(例如 us-west1
  • TEMP_LOCATION:写入临时文件的位置(例如 gs://your-bucket/temp
  • INPUT_DATA:输入文件路径
  • DEIDENTIFY_TEMPLATE:Cloud DLPDeidentify 模板编号
  • DATASET_NAME:BigQuery 数据集名称
  • INSPECT_TEMPLATE_NUMBER:Cloud DLPInspect 模板编号
  • BATCH_SIZE_VALUE:批次大小(对于 csv 文件,批次大小是每个 API 的行数)
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/REGION/templates:launch?gcsPath=gs://dataflow-templates-REGION/latest/Stream_DLP_GCS_Text_to_BigQuery
{
   "jobName": "JOB_NAME",
   "environment": {
       "bypassTempDirValidation": false,
       "tempLocation": "TEMP_LOCATION",
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
      "inputFilePattern":INPUT_DATA,
      "datasetName": "DATASET_NAME",
      "batchSize": "BATCH_SIZE_VALUE",
      "dlpProjectId": "DLP_API_PROJECT_ID",
      "deidentifyTemplateName": "projects/TEMPLATE_PROJECT_ID/deidentifyTemplates/DEIDENTIFY_TEMPLATE",
      "inspectTemplateName": "projects/TEMPLATE_PROJECT_ID/identifyTemplates/INSPECT_TEMPLATE_NUMBER"
   }
}

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream)

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub 模版是一种流处理流水线,用于从 MySQL 数据库中读取包含更改数据的 Pub/Sub 消息并将记录写入 BigQuery。Debezium 连接器捕获对 MySQL 数据库的更改,并将更改后的数据发布到 Pub/Sub。然后,该模板读取 Pub/Sub 消息并将其写入 BigQuery。

您可以使用此模板同步 MySQL 数据库和 BigQuery 表。流水线将更改后的数据写入 BigQuery 暂存表,并间歇性地更新复制 MySQL 数据库的 BigQuery 表。

对此流水线的要求

  • Debezium 连接器必须已部署
  • Pub/Sub 消息必须在 Beam 行中序列化。

模板参数

参数 说明
inputSubscriptions 要读取的 Pub/Sub 输入订阅列表(以英文逗号分隔),格式为 <subscription>,<subscription>, ...
changeLogDataset 用于存储暂存表的 BigQuery 数据集,格式为 <my-dataset>
replicaDataset 用于存储副本表的 BigQuery 数据集的位置,格式为 <my-dataset>
updateFrequencySecs (可选)流水线更新复制 MySQL 数据库的 BigQuery 表的时间间隔。

运行 Change Data Capture using Debezium and MySQL from Pub/Sub to BigQuery 模版

如需运行此模板,请执行以下步骤:

  1. 在本地机器上,克隆 DataflowTemplates 代码库
  2. 切换到 v2/cdc-parent 目录:
  3. 确保已部署 Debezium 连接器
  4. 使用 Maven,运行 Dataflow 模板。

    替换以下值:

    • PROJECT_ID:您的项目 ID。
    • YOUR_SUBSCRIPTIONS:以英文逗号分隔的 Pub/Sub 订阅名称列表。
    • YOUR_CHANGELOG_DATASET:用于变更日志数据的 BigQuery 数据集。
    • YOUR_REPLICA_DATASET:您的副本表的 BigQuery 数据集。
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
                --inputSubscriptions=YOUR_SUBSCRIPTIONS \
                --updateFrequencySecs=300 \
                --changeLogDataset=YOUR_CHANGELOG_DATASET \
                --replicaDataset=YOUR_REPLICA_DATASET \
                --project=PROJECT_ID"
      

Apache Kafka to BigQuery

Apache Kafka to BigQuery 模板是一种流处理流水线,可从 Apache Kafka 提取文本数据、执行用户定义的函数 (UDF) 并将生成的记录输出到 BigQuery。在转换数据、执行 UDF 或插入输出表时出现的任何错误都将被插入 BigQuery 中单独的错误表。如果在执行之前错误表不存在,则创建该表。

对此流水线的要求

  • 输出 BigQuery 表必须已存在。
  • Apache Kafka 代理服务器必须正在运行并可从 Dataflow 工作器机器进行访问。
  • Apache Kafka 主题必须存在,并且消息必须采用有效的 JSON 格式编码。

模板参数

参数 说明
outputTableSpec 要写入 Apache Kafka 消息的 BigQuery 输出表位置,格式为 my-project:dataset.table
inputTopics 以英文逗号分隔的列表中可读取的 Apache Kafka 输入主题。例如:messages
bootstrapServers 以英文逗号分隔的列表中正在运行的 Apache Kafka Broker 服务器的主机地址,每个主机地址的格式为 35.70.252.199:9092
javascriptTextTransformGcsPath (可选)JavaScript UDF 的 Cloud Storage 位置路径。例如:gs://my_bucket/my_function.js
javascriptTextTransformFunctionName (可选)要作为 UDF 调用的 JavaScript 的名称。例如 transform
outputDeadletterTable (可选)未能到达输出表的消息的 BigQuery 表,格式为 my-project:dataset.my-deadletter-table。如果不存在,则系统会在流水线执行期间创建该表。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

运行 Apache Kafka to BigQuery 模板

CONSOLE

通过 Google Cloud Console 运行
  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中选择 the Apache Kafka to BigQuery template。
  6. 作业名称字段中输入作业名称。
  7. 在提供的参数字段中输入参数值。
  8. 点击运行作业

GCLOUD

通过 gcloud 命令行工具运行

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 284.0.0 或更高版本。

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

请替换以下内容:

  • YOUR_PROJECT_ID:您的模板项目 ID
  • JOB_NAME:您选择的作业名称
  • REGION_NAME:Dataflow 地区名称(例如 us-central1
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • KAFKA_TOPICS:Apache Kakfa 主题列表。如果提供了多个主题,请按照说明了解如何转义英文逗号。
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • YOUR_JAVASCRIPT_FUNCTION:UDF 的名称
  • KAFKA_SERVER_ADDRESSES:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都应与服务器可访问的端口号相匹配。例如:35.70.252.199:9092。如果提供了多个地址,请按照说明了解如何转义英文逗号。
gcloud beta dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=YOUR_JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

API

通过 REST API 运行

运行此模板时,您需要模板的 Cloud Storage 路径:

gs://dataflow-templates/VERSION/flex/Kafka_to_BigQuery

如需使用 REST API 请求运行此模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要授权

请替换以下内容:

  • YOUR_PROJECT_ID:您的模板项目 ID
  • JOB_NAME:您选择的作业名称
  • LOCATION:Dataflow 地区名称(例如 us-central1
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • KAFKA_TOPICS:Apache Kakfa 主题列表。如果提供了多个主题,请按照说明了解如何转义英文逗号。
  • PATH_TO_JAVASCRIPT_UDF_FILE:包含 JavaScript 代码的 .js 文件的 Cloud Storage 路径
  • YOUR_JAVASCRIPT_FUNCTION:UDF 的名称
  • KAFKA_SERVER_ADDRESSES:Apache Kafka broker 服务器 IP 地址列表。每个 IP 地址都应与服务器可访问的端口号相匹配。例如:35.70.252.199:9092。如果提供了多个地址,请按照说明了解如何转义英文逗号。
POST  https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "YOUR_JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates/latest/flex/Kafka_to_BigQuery",
   }
}