在创建并暂存 Dataflow 模板后,您可以使用 Google Cloud Console、REST API 或 gcloud
命令行工具来运行该模板。您可以在众多环境中部署 Dataflow 模板作业,包括 App Engine 标准环境、Cloud Functions 和其他受限环境。
注意:除了模板文件之外,模板化流水线的运行还依赖于创建模板时暂存和引用的文件。如果您移动或移除暂存文件,您的流水线作业将会失败。
使用 Cloud Console
您可以使用 Cloud Console 来运行 Google 提供的和自定义的 Dataflow 模板。
Google 提供的模板
如需运行 Google 提供的模板,请执行以下操作:
- 转到 Cloud Console 中的 Dataflow 页面。 转到 Dataflow 页面
- 点击 add_box基于模板创建作业。
- 从 Dataflow 模板下拉菜单中,选择要运行的 Google 提供的模板。
- 在作业名称字段中输入作业名称。
- 在提供的参数字段中输入参数值。使用 Google 提供的模板时,您不需要填写附加参数部分。
- 点击运行作业。


自定义模板
如需运行自定义模板,请执行以下操作:
- 转到 Cloud Console 中的 Dataflow 页面。 转到 Dataflow 页面
- 点击基于模板创建作业。
- 从 Dataflow 模板下拉菜单中,选择自定义模板。
- 在作业名称字段中输入作业名称。
- 在模板 Cloud Storage 路径字段中输入模板文件的 Cloud Storage 路径。
- 如果模板需要参数,请点击附加参数部分中的 add添加参数。输入参数的名称和值。对每个所需参数重复此步骤。
- 点击运行作业。


使用 REST API
如需使用 REST API 请求运行模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权。
请参阅 projects.templates.launch 的 REST API 参考以详细了解可用的参数。
示例 1:创建自定义模板批处理作业
此示例 projects.templates.launch 请求从模板创建批量作业(读取文本文件并写入输出文本文件)。如果请求成功,则响应正文包含一个 LaunchTemplateResponse 实例。
您必须修改以下值:
- 将
YOUR_PROJECT_ID
替换为您的项目 ID。 - 将
JOB_NAME
替换为您选择的作业名称。 - 将
YOUR_BUCKET_NAME
替换为 Cloud Storage 存储分区的名称。 - 将
gcsPath
设置为模板文件的 Cloud Storage 位置。 - 将
parameters
设置为您的键值对列表。 - 将
tempLocation
设置为您具有写入权限的位置。如需运行 Google 提供的模板,您必须输入此值。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName { "jobName": "JOB_NAME", "parameters": { "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt", "outputFile": "gs://YOUR_BUCKET_NAME/output/my_output" }, "environment": { "tempLocation": "gs://YOUR_BUCKET_NAME/temp", "zone": "us-central1-f" } }
示例 2:创建自定义模板流处理作业
此 projects.templates.launch 请求示例会通过模板创建一个可从 Pub/Sub 主题读取数据并将数据写入 BigQuery 表的流处理作业。BigQuery 表必须已存在且具有适当的架构。如果请求成功,则响应正文包含一个 LaunchTemplateResponse 实例。
您必须修改以下值:
- 将
YOUR_PROJECT_ID
替换为您的项目 ID。 - 将
JOB_NAME
替换为您选择的作业名称。 - 将
YOUR_BUCKET_NAME
替换为 Cloud Storage 存储分区的名称。 - 将
YOUR_TOPIC_NAME
替换为您的 Pub/Sub 主题名称。 - 将
YOUR_DATASET
替换为 BigQuery 数据集,并将YOUR_TABLE_NAME
替换为 BigQuery 表名称。 - 将
gcsPath
设置为模板文件的 Cloud Storage 位置。 - 将
parameters
设置为您的键值对列表。 - 将
tempLocation
设置为您具有写入权限的位置。如需运行 Google 提供的模板,您必须输入此值。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName { "jobName": "JOB_NAME", "parameters": { "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME", "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME" }, "environment": { "tempLocation": "gs://YOUR_BUCKET_NAME/temp", "zone": "us-central1-f" } }
示例 3:更新自定义模板流处理作业
此 projects.templates.launch 请求示例展示了如何更新模板流处理作业。
- 运行示例 2:创建自定义模板流处理作业以启动流处理模板作业。
- 发送以下 HTTP POST 请求,其中包含以下修改后的值:
- 将
YOUR_PROJECT_ID
替换为您的项目 ID。 - 将
JOB_NAME
替换为您选择的作业名称。 - 将
YOUR_BUCKET_NAME
替换为 Cloud Storage 存储分区的名称。 - 将
YOUR_TOPIC_NAME
替换为您的 Pub/Sub 主题名称。 - 将
YOUR_DATASET
替换为 BigQuery 数据集,并将YOUR_TABLE_NAME
替换为 BigQuery 表名称。 - 将
gcsPath
设置为模板文件的 Cloud Storage 位置。 - 将
parameters
设置为您的键值对列表。 - 将
tempLocation
设置为您具有写入权限的位置。要运行 Google 提供的模板,您需要此值。
POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName { "jobName": "JOB_NAME", "parameters": { "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME", "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME" }, "environment": { "tempLocation": "gs://YOUR_BUCKET_NAME/temp", "zone": "us-central1-f" } "update": true }
- 将
- 访问 Dataflow 监控界面并验证是否创建了具有相同名称的新作业。此作业的状态为已更新。
使用 Google API 客户端库
请考虑使用 Google API 客户端库以轻松调用 Dataflow REST API。此示例脚本使用 Python 版 Google API 客户端库。
在此示例中,您必须设置以下变量:
project
:设置为您的项目 ID。job
:设置为您选择的唯一作业名称。template
:设置为模板文件的 Cloud Storage 位置。parameters
:设置为包含模板参数的字典。
如需详细了解可用的选项,请参阅 Cloud Dataflow REST API 参考中的 projects.templates.launch
方法。
使用 gcloud
注意:如需使用 gcloud
命令行工具运行模板,您必须具备 Cloud SDK 138.0.0 或更高版本。
在 gcloud
命令行工具中,您可以使用 gcloud dataflow jobs run
命令运行自定义模板或 Google 提供的模板。如需查看运行 Google 提供的模板的示例,请参阅 Google 提供的模板页面。
对于以下自定义模板示例,请设置以下值:
- 将
JOB_NAME
替换为您选择的作业名称。 - 将
YOUR_BUCKET_NAME
替换为 Cloud Storage 存储分区的名称。 - 您必须添加
--gcs-location
标志。将--gcs-location
设置为模板文件的 Cloud Storage 位置。 - 将
--parameters
设置为要传递给作业的参数的逗号分隔列表。不允许在逗号和值之间添加空格。
示例 1:自定义模板,批量作业
此示例从模板创建批量作业(读取文本文件并写入输出文本文件)。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \ --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,outputFile=gs://YOUR_BUCKET_NAME/output/my_output
该请求返回具有以下格式的响应。
id: 2016-10-11_17_10_59-1234530157620696789 projectId: YOUR_PROJECT_ID type: JOB_TYPE_BATCH
示例 2:自定义模板,流处理作业
此示例会通过模板创建一个从 Pub/Sub 主题读取数据并将数据写入 BigQuery 表的流处理作业。BigQuery 表必须已存在且具有适当的架构。
gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \ --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name
该请求返回具有以下格式的响应。
id: 2016-10-11_17_10_59-1234530157620696789 projectId: YOUR_PROJECT_ID type: JOB_TYPE_STREAMING
如需查看 gcloud dataflow jobs run
命令的完整标志列表,请参阅 gcloud 工具参考。
监控和问题排查
Dataflow 监控界面可让您监控 Dataflow 作业。如果作业失败,您可以在流水线问题排查指南中找到问题排查提示、调试策略和常见错误目录。