运行经典模板

在创建并暂存 Dataflow 模板后,您可以使用 Google Cloud 控制台、REST API 或 Google Cloud CLI 来运行该模板。您可以在众多环境中部署 Dataflow 模板作业,包括 App Engine 标准环境、Cloud Run functions 和其他受限环境。

使用 Google Cloud 控制台

您可以使用 Google Cloud 控制台来运行 Google 提供的和自定义的 Dataflow 模板。

Google 提供的模板

如需运行 Google 提供的模板,请执行以下操作:

  1. 在 Google Cloud 控制台中,转到 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击 基于模板创建作业
  4. Google Cloud 控制台 “基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中,选择要运行的 Google 提供的模板。
  6. WordCount 模板执行表单
  7. 作业名称字段中输入作业名称。
  8. 在提供的参数字段中输入参数值。使用 Google 提供的模板时,您不需要填写附加参数部分。
  9. 点击运行作业

自定义模板

如需运行自定义模板,请执行以下操作:

  1. 在 Google Cloud 控制台中,转到 Dataflow 页面。
  2. 转到 Dataflow 页面
  3. 点击基于模板创建作业
  4. Google Cloud 控制台 “基于模板创建作业”按钮
  5. Dataflow 模板下拉菜单中,选择自定义模板
  6. 自定义模板执行表单
  7. 作业名称字段中输入作业名称。
  8. 在模板 Cloud Storage 路径字段中输入模板文件的 Cloud Storage 路径。
  9. 如果模板需要参数,请点击附加参数部分中的 添加参数。输入参数的名称。对每个所需参数重复此步骤。
  10. 点击 Run Job(运行任务)。

使用 REST API

如需使用 REST API 请求运行模板,请发送带有项目 ID 的 HTTP POST 请求。此请求需要获得授权

请参阅 projects.locations.templates.launch 的 REST API 参考以详细了解可用的参数。

创建自定义模板批处理作业

projects.locations.templates.launch 请求示例会基于模板创建一个可读取文本文件并写入输出文本文件的批处理作业。如果请求成功,则响应正文包含一个 LaunchTemplateResponse 实例。

修改以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • LOCATION 替换为您选择的 Dataflow 区域
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储桶的名称。
  • gcsPath 设置为模板文件的 Cloud Storage 位置。
  • parameters 设置为您的键值对列表。
  • tempLocation 设置为您具有写入权限的位置。如需运行 Google 提供的模板,您必须输入此值。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://YOUR_BUCKET_NAME/templates/TemplateName
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputFile" : "gs://YOUR_BUCKET_NAME/input/my_input.txt",
            "output": "gs://YOUR_BUCKET_NAME/output/my_output"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

创建自定义模板流处理作业

projects.locations.templates.launch 请求示例会通过模板创建一个可从 Pub/Sub 订阅读取数据并将数据写入 BigQuery 表的流处理作业。如果要启动 Flex 模板,请改用 projects.locations.flexTemplates.launch示例模板是 Google 提供的模板。您可以修改模板中的路径,使其指向自定义模板。启动 Google 提供的模板和自定义模板使用的是相同的逻辑。在此示例中,BigQuery 表必须已存在且具有适当的架构。如果成功,则响应正文包含一个 LaunchTemplateResponse 实例。

修改以下值:

  • YOUR_PROJECT_ID 替换为您的项目 ID。
  • LOCATION 替换为您选择的 Dataflow 区域
  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储桶的名称。
  • GCS_PATH 替换为模板文件的 Cloud Storage 位置。位置应以 gs:// 开头
  • parameters 设置为您的键值对列表。列出的参数特定于此模板示例。 如果您使用的是自定义模板,请根据需要修改参数。如果您使用的是示例模板,请替换以下变量。
    • YOUR_SUBSCRIPTION_NAME 替换为您的 Pub/Sub 订阅名称。
    • YOUR_DATASET 替换为 BigQuery 数据集,并将 YOUR_TABLE_NAME 替换为 BigQuery 表名。
  • tempLocation 设置为您具有写入权限的位置。如需运行 Google 提供的模板,您必须输入此值。
    POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
    {
        "jobName": "JOB_NAME",
        "parameters": {
            "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_SUBSCRIPTION_NAME",
            "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
        },
        "environment": {
            "tempLocation": "gs://YOUR_BUCKET_NAME/temp",
            "zone": "us-central1-f"
        }
    }

更新自定义模板流处理作业

projects.locations.templates.launch 请求示例展示了如何更新模板流处理作业。如果要更新 Flex 模板,请改用 projects.locations.flexTemplates.launch

  1. 运行示例 2:创建自定义模板流处理作业以启动流处理模板作业。
  2. 发送以下 HTTP POST 请求,其中包含以下修改后的值:
    • YOUR_PROJECT_ID 替换为您的项目 ID。
    • LOCATION 替换为您要更新的作业的 Dataflow 区域
    • JOB_NAME 替换为您要更新的作业的确切名称。
    • GCS_PATH 替换为模板文件的 Cloud Storage 位置。位置应以 gs:// 开头
    • parameters 设置为您的键值对列表。列出的参数特定于此模板示例。 如果您使用的是自定义模板,请根据需要修改参数。如果您使用的是示例模板,请替换以下变量。
      • YOUR_SUBSCRIPTION_NAME 替换为您的 Pub/Sub 订阅名称。
      • YOUR_DATASET 替换为 BigQuery 数据集,并将 YOUR_TABLE_NAME 替换为 BigQuery 表名。
    • 使用 environment 参数更改环境设置,例如机器类型。此示例使用 n2-highmem-2 机器类型,该类型每个工作器的内存和 CPU 数量多于默认机器类型。
        POST https://dataflow.googleapis.com/v1b3/projects/YOUR_PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=GCS_PATH
        {
            "jobName": "JOB_NAME",
            "parameters": {
                "inputSubscription": "projects/YOUR_PROJECT_ID/subscriptions/YOUR_TOPIC_NAME",
                "outputTableSpec": "YOUR_PROJECT_ID:YOUR_DATASET.YOUR_TABLE_NAME"
            },
            "environment": {
                "machineType": "n2-highmem-2"
            },
            "update": true
        }
    
  3. 访问 Dataflow 监控界面并验证是否创建了具有相同名称的新作业。此作业的状态为已更新

使用 Google API 客户端库

请考虑使用 Google API 客户端库以轻松调用 Dataflow REST API。此示例脚本使用 Python 版 Google API 客户端库

在此示例中,您必须设置以下变量:

  • project:设置为您的项目 ID。
  • job:设置为您选择的唯一作业名称。
  • template:设置为模板文件的 Cloud Storage 位置。
  • parameters:设置为包含模板参数的字典。

如需设置区域,请添加 location 参数。

from googleapiclient.discovery import build

# project = 'your-gcp-project'
# job = 'unique-job-name'
# template = 'gs://dataflow-templates/latest/Word_Count'
# parameters = {
#     'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
#     'output': 'gs://<your-gcs-bucket>/wordcount/outputs',
# }

dataflow = build("dataflow", "v1b3")
request = (
    dataflow.projects()
    .templates()
    .launch(
        projectId=project,
        gcsPath=template,
        body={
            "jobName": job,
            "parameters": parameters,
        },
    )
)

response = request.execute()

如需详细了解可用的选项,请参阅 Dataflow REST API 参考中的 projects.locations.templates.launch 方法

使用 gcloud CLI

gcloud CLI 可以使用 gcloud dataflow jobs run 命令运行自定义模板或 Google 提供的模板。如需查看运行 Google 提供的模板的示例,请参阅 Google 提供的模板页面

对于以下自定义模板示例,请设置以下值:

  • JOB_NAME 替换为您选择的作业名称。
  • YOUR_BUCKET_NAME 替换为 Cloud Storage 存储桶的名称。
  • --gcs-location 设置为模板文件的 Cloud Storage 位置。
  • --parameters 设置为要传递给作业的参数的逗号分隔列表。不允许在逗号和值之间添加空格。
  • 如需阻止虚拟机接受存储在项目元数据中的 SSH 密钥,请将 additional-experiments 标志与 block_project_ssh_keys 服务选项结合使用:--additional-experiments=block_project_ssh_keys

创建自定义模板批处理作业

此示例从模板创建批量作业(读取文本文件并写入输出文本文件)。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters inputFile=gs://YOUR_BUCKET_NAME/input/my_input.txt,output=gs://YOUR_BUCKET_NAME/output/my_output

该请求返回具有以下格式的响应。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_BATCH

创建自定义模板流处理作业

此示例会通过模板创建一个从 Pub/Sub 主题读取数据并将数据写入 BigQuery 表的流处理作业。BigQuery 表必须已存在且具有适当的架构。

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://YOUR_BUCKET_NAME/templates/MyTemplate \
        --parameters topic=projects/project-identifier/topics/resource-name,table=my_project:my_dataset.my_table_name

该请求返回具有以下格式的响应。

    id: 2016-10-11_17_10_59-1234530157620696789
    projectId: YOUR_PROJECT_ID
    type: JOB_TYPE_STREAMING

如需查看 gcloud dataflow jobs run 命令的完整标志列表,请参阅 gcloud CLI 参考

监控和问题排查

Dataflow 监控界面可让您监控 Dataflow 作业。如果作业失败,您可以在流水线问题排查指南中找到问题排查提示、调试策略和常见错误目录。