运行经典模板

在创建并暂存 Dataflow 模板后,您可以使用 Google Cloud Console、REST API 或 gcloud 命令行工具来运行该模板。您可以在众多环境中部署 Dataflow 模板作业,包括 App Engine 标准环境、Cloud Functions 和其他受限环境。

注意:除了模板文件之外,模板化流水线的运行还依赖于创建模板时暂存和引用的文件。如果您移动或移除暂存文件,您的流水线作业将会失败。

使用 Cloud Console

您可以使用 Cloud Console 来运行 Google 提供的和自定义的 Dataflow 模板。

Google 提供的模板

如需运行 Google 提供的模板,请执行以下操作:

  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Cloud Platform Console“从模板创建作业”按钮
  5. Dataflow 模板下拉菜单中,选择要运行的 Google 提供的模板。
  6. WordCount 模板执行表单
  7. 作业名称字段中输入作业名称。
  8. 在提供的参数字段中输入参数值。使用 Google 提供的模板时,您不需要填写附加参数部分。
  9. 点击运行作业

自定义模板

如需运行自定义模板,请执行以下操作:

  1. 转到 Cloud Console 中的 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Cloud Platform Console 的“基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中,选择自定义模板
  6. 自定义模板执行表单
  7. 作业名称字段中输入作业名称。
  8. 在模板 Cloud Storage 路径字段中输入模板文件的 Cloud Storage 路径。
  9. 如果模板需要参数,请点击附加参数部分中的 添加参数。输入参数的名称。对每个所需参数重复此步骤。
  10. 点击运行作业

使用 REST API

如需使用 REST API 请求运行模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

请参阅 projects.templates.launch 的 REST API 参考以详细了解可用的参数。

示例 1:创建自定义模板批处理作业

此示例 projects.templates.launch 请求从模板创建批量作业(读取文本文件并写入输出文本文件)。如果请求成功,则响应正文包含一个 LaunchTemplateResponse 实例。

您必须修改以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • LOCATION 替换为您选择的 Dataflow 区域端点
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。
  • gcsPath 设置为模板文件的 Cloud Storage 位置。
  • parameters 设置为您的键值对列表。
  • tempLocation 设置为您具有写入权限的位置。如需运行 Google 提供的模板,您必须输入此值。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "outputFile": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

示例 2:创建自定义模板流处理作业

projects.templates.launch 请求示例会通过模板创建一个可从 Pub/Sub 主题读取数据并将数据写入 BigQuery 表的流处理作业。BigQuery 表必须已存在且具有适当的架构。如果请求成功,则响应正文包含一个 LaunchTemplateResponse 实例。

您必须修改以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • LOCATION 替换为您选择的 Dataflow 区域端点
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。
  • YOUR_TOPIC_NAME 替换为您的 Pub/Sub 主题名称。
  • YOUR_DATASET 替换为 BigQuery 数据集,并将 YOUR_TABLE_NAME 替换为 BigQuery 表名称。
  • gcsPath 设置为模板文件的 Cloud Storage 位置。
  • parameters 设置为您的键值对列表。
  • tempLocation 设置为您具有写入权限的位置。如需运行 Google 提供的模板,您必须输入此值。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
            "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

示例 3:更新自定义模板流处理作业

projects.templates.launch 请求示例展示了如何更新模板流处理作业。

  1. 运行示例 2:创建自定义模板流处理作业以启动流处理模板作业。
  2. 发送以下 HTTP POST 请求,其中包含以下修改后的值:
    • YOUR_PROJECT_ID 替换为您的项目 ID。
    • LOCATION 替换为您选择的 Dataflow 区域端点
    • JOB_NAME 替换为您选择的作业名称。
    • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。
    • YOUR_TOPIC_NAME 替换为您的 Pub/Sub 主题名称。
    • YOUR_DATASET 替换为 BigQuery 数据集,并将 YOUR_TABLE_NAME 替换为 BigQuery 表名称。
    • gcsPath 设置为模板文件的 Cloud Storage 位置。
    • parameters 设置为您的键值对列表。
    • tempLocation 设置为您具有写入权限的位置。要运行 Google 提供的模板,您需要此值。
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "topic": "projects/YOUR_PROJECT_ID/topics/YOUR_TOPIC_NAME",
                "table": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
                "zone": "us-central1-f"
            }
            "update": true
        }
    
  3. 访问 Dataflow 监控界面并验证是否创建了具有相同名称的新作业。此作业的状态为已更新

使用 Google API 客户端库

请考虑使用 Google API 客户端库以轻松调用 Dataflow REST API。此示例脚本使用 Python 版 Google API 客户端库

在此示例中,您必须设置以下变量:

  • project:设置为您的项目 ID。
  • job:设置为您选择的唯一作业名称。
  • template:设置为模板文件的 Cloud Storage 位置。
  • parameters:设置为包含模板参数的字典。
from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
    projectId=project,
    gcsPath=template,
    body={
        'jobName': job,
        'parameters': parameters,
    }
)

response = request.execute()

如需详细了解可用的选项,请参阅 Cloud Dataflow REST API 参考中的 projects.templates.launch 方法

使用 gcloud

注意:如需使用 gcloud 命令行工具运行模板,您必须具备 Cloud SDK 138.0.0 或更高版本。

gcloud 命令行工具中,您可以使用 gcloud dataflow jobs run 命令运行自定义模板或 Google 提供的模板。如需查看运行 Google 提供的模板的示例,请参阅 Google 提供的模板页面

对于以下自定义模板示例,请设置以下值:

  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储分区的名称。
  • 您必须添加 --gcs-location 标志。将 --gcs-location 设置为模板文件的 Cloud Storage 位置。
  • --parameters 设置为要传递给作业的参数的逗号分隔列表。不允许在逗号和值之间添加空格。

示例 1:自定义模板,批量作业

此示例从模板创建批量作业(读取文本文件并写入输出文本文件)。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,outputFile=gs://YOUR_BUCKET_NAME/output/my_output

该请求返回具有以下格式的响应。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

示例 2:自定义模板,流处理作业

此示例会通过模板创建一个从 Pub/Sub 主题读取数据并将数据写入 BigQuery 表的流处理作业。BigQuery 表必须已存在且具有适当的架构。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

该请求返回具有以下格式的响应。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

如需查看 gcloud dataflow jobs run 命令的完整标志列表,请参阅 gcloud 工具参考

监控和问题排查

Dataflow 监控界面可让您监控 Dataflow 作业。如果作业失败,您可以在流水线问题排查指南中找到问题排查提示、调试策略和常见错误目录。