运行并行执行其他工作流的工作流


本教程介绍如何创建和运行并行执行多个子工作流的父工作流。

在下图中,调用了子工作流的四个并行执行。这样,父工作流就能在并行分支中处理数据,从而缩短总执行时间。父级工作流会等待所有子工作流执行完成,然后再返回成功和失败执行作业的摘要,从而简化任何错误检测。

调用子工作流并行迭代的父级工作流

目标

在此教程中,您将学习以下操作:

  1. 创建和部署从父级工作流接收数据的子工作流。
  2. 创建并部署使用并行 for 循环执行多个子工作流的父工作流。
  3. 运行可调用子工作流并行执行的父工作流。
  4. 所有成功和失败的子工作流执行的结果都会存储在映射中并返回。

您可以在 Google Cloud 控制台中运行以下命令,也可以在终端或 Cloud Shell 中使用 Google Cloud CLI 来运行以下命令。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

您的组织定义的安全限制条件可能会导致您无法完成以下步骤。如需了解相关问题排查信息,请参阅在受限的 Google Cloud 环境中开发应用

控制台

  1. 登录您的 Google 账号。

    如果您还没有 Google 账号,请注册新账号

  2. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  3. 确保您的 Google Cloud 项目已启用结算功能

  4. 启用 Workflow Executions and Workflows API。

    启用 API

  5. 创建服务帐号:

    1. 在 Google Cloud 控制台中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择您的项目。
    3. 服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. Workflows > Workflows Invoker 角色授予服务帐号。

      如需授予该角色,请找到选择角色列表,然后选择 Workflows > Workflows Invoker

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

  6. 在 Google Cloud Console 中的项目选择器页面上,选择或创建一个 Google Cloud 项目

    转到“项目选择器”

  7. 确保您的 Google Cloud 项目已启用结算功能

  8. 启用 Workflow Executions and Workflows API。

    启用 API

  9. 创建服务帐号:

    1. 在 Google Cloud 控制台中,转到创建服务帐号页面。

      转到“创建服务帐号”
    2. 选择您的项目。
    3. 服务帐号名称字段中,输入一个名称。Google Cloud 控制台会根据此名称填充服务帐号 ID 字段。

      服务帐号说明字段中,输入说明。例如,Service account for quickstart

    4. 点击创建并继续
    5. Workflows > Workflows Invoker 角色授予服务帐号。

      如需授予该角色,请找到选择角色列表,然后选择 Workflows > Workflows Invoker

    6. 点击继续
    7. 点击完成以完成服务帐号的创建过程。

gcloud

  1. 登录您的 Google 账号。

    如果您还没有 Google 账号,请注册新账号

  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Workflow Executions and Workflows APIs:

    gcloud services enable workflowexecutions.googleapis.com workflows.googleapis.com
  7. 设置身份验证:

    1. 创建服务帐号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。

    2. 向服务帐号授予 roles/workflows.invoker IAM 角色:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/workflows.invoker

      请替换以下内容:

      • SERVICE_ACCOUNT_NAME:服务帐号的名称
      • PROJECT_ID:您在其中创建服务帐号的项目的 ID
  8. 安装 Google Cloud CLI。
  9. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  10. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  11. 确保您的 Google Cloud 项目已启用结算功能

  12. Enable the Workflow Executions and Workflows APIs:

    gcloud services enable workflowexecutions.googleapis.com workflows.googleapis.com
  13. 设置身份验证:

    1. 创建服务帐号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。

    2. 向服务帐号授予 roles/workflows.invoker IAM 角色:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=roles/workflows.invoker

      请替换以下内容:

      • SERVICE_ACCOUNT_NAME:服务帐号的名称
      • PROJECT_ID:您在其中创建服务帐号的项目的 ID

创建和部署子工作流

子工作流可以接收和处理来自父级工作流的数据。子工作流通过执行以下操作来演示这一点:

  • 接收整数作为参数
  • 休眠 10 秒以模拟某些处理操作
  • 返回一个指示符(基于整数是奇数还是偶数),以模拟工作流执行成功或失败

控制台

  1. 在 Google Cloud 控制台中,前往工作流页面。

    进入 Workflows

  2. 点击 创建

  3. 为新工作流输入名称 workflow-child

  4. 区域列表中,选择 us-central1

  5. 选择您之前创建的服务帐号

  6. 点击下一步

  7. 在工作流编辑器中,为工作流输入以下定义:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - check_iteration_even_or_odd:
            switch:
              - condition: ${iteration % 2 == 0}
                next: raise_error
        - return_message:
            return: ${"Hello world"+iteration}
        - raise_error:
            raise: ${"Error with iteration "+iteration}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch workflow-child.yaml
    
  2. 在文本编辑器中打开您的源代码文件,并将以下工作流复制到该文件中。

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - check_iteration_even_or_odd:
            switch:
              - condition: ${iteration % 2 == 0}
                next: raise_error
        - return_message:
            return: ${"Hello world"+iteration}
        - raise_error:
            raise: ${"Error with iteration "+iteration}
  3. 部署工作流:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    

    SERVICE_ACCOUNT_NAME 替换为您之前创建的服务帐号的名称。

创建和部署父级工作流

父工作流使用并行 for 循环执行子工作流的多个分支。

  1. 复制工作流定义的源代码。它包含以下几个部分:

    1. 映射用于存储子工作流执行的结果。如需了解详情,请参阅地图

      main:
        steps:
          - init:
              assign:
                - execution_results: {} # results from each execution
                - execution_results.success: {} # successful executions saved under 'success' key
                - execution_results.failure: {} # failed executions saved under 'failure' key
    2. 系统会并行执行 for 循环以调用子工作流。如需了解详情,请参阅并行步骤迭代

      - execute_child_workflows:
          parallel:
            shared: [execution_results]
            for:
              value: iteration
              in: [1, 2, 3, 4]
              steps:
                  - iterate:
    3. 使用连接器调用子工作流。系统会向子工作流的每次迭代传递 iteration 参数。父工作流会等待并存储每个子工作流执行的结果。如需了解详情,请参阅 Workflows Executions API 连接器运行时参数

      try:
        steps:
          - execute_child_workflow:
              call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
              args:
                workflow_id: workflow-child
                #location: ...
                #project_id: ...
                argument:
                  iteration: ${iteration}
              result: execution_result
          - save_successful_execution:
              assign:
                - execution_results.success[string(iteration)]: ${execution_result}
      except:
          as: e
          steps:
            - save_failed_execution:
                assign:
                  - execution_results.failure[string(iteration)]: ${e}
    4. 返回执行结果。如需了解详情,请参阅完成工作流的执行

      - return_execution_results:
          return: ${execution_results}
  2. 部署工作流:

    控制台

    1. 在 Google Cloud 控制台中,转到工作流页面:

      进入 Workflows

    2. 点击 创建

    3. 为新工作流输入名称 workflow-parent

    4. 区域列表中,选择 us-central1

    5. 选择您之前创建的服务帐号

    6. 点击下一步

    7. 在工作流编辑器中,粘贴父级工作流的定义。

    8. 点击部署

    gcloud

    1. 为工作流创建源代码文件:

      touch workflow-parent.yaml
      
    2. 在文本编辑器中打开您的源代码文件,并粘贴父级工作流的定义。

    3. 部署工作流:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
      

      SERVICE_ACCOUNT_NAME 替换为您之前创建的服务帐号的名称。

执行父级工作流

执行父工作流,以便并行运行子工作流的调用。执行过程应该大约需要 10 秒才能完成。

控制台

  1. 在 Google Cloud 控制台中,转到工作流页面:

    进入 Workflows

  2. 工作流页面上,点击 workflow-parent 工作流以转到其详情页面。

  3. Workflow details 页面上,点击 Execute

  4. 再次点击执行

  5. 输出窗格中查看工作流的结果。

    结果应类似于以下内容,表示迭代 2 和 4 出错,迭代 1 和 3 成功。

    "failure": {
      "2": {
        "message": "Execution failed or cancelled.",
        "operation": {
          "argument": "{\"iteration\":2}",
          "duration": "10.157992541s",
          "endTime": "2023-07-11T13:13:13.028424329Z",
          "error": {
            "context": "RuntimeError: \"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18",
            "payload": "\"Error with iteration 2\"",
    ...
      "4": {
        "message": "Execution failed or cancelled.",
        "operation": {
          "argument": "{\"iteration\":4}",
          "duration": "10.157929734s",
          "endTime": "2023-07-11T13:13:13.061289142Z",
          "error": {
            "context": "RuntimeError: \"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18",
            "payload": "\"Error with iteration 4\"",
    ...
    "success": {
      "1": "Hello world1",
      "3": "Hello world3"

gcloud

执行工作流:

gcloud workflows run workflow-parent \
    --location=us-central1

结果应类似于以下内容,表示迭代 2 和 4 出错,迭代 1 和 3 成功。

Waiting for execution [06c753e4-6947-4c62-ac0b-2a9d53fb1b8f] to complete...done.
argument: 'null'
duration: 14.065415004s
endTime: '2023-07-11T12:50:43.929023883Z'
name: projects/386837416586/locations/us-central1/workflows/workflow-parent/executions/06c753e4-6947-4c62-ac0b-2a9d53fb1b8f
result: '{"failure":{"2":{"message":"Execution failed or cancelled.","operation":{"argument":"{\"iteration\":2}","duration":"10.143718070s","endTime":"2023-07-11T12:50:40.673209821Z","error":{"context":"RuntimeError:
...
"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18","payload":"\"Error
...
"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18","payload":"\"Error
...
"success":{"1":"Hello world1","3":"Hello world3"}}'
startTime: '2023-07-11T12:50:29.863608879Z'
state: SUCCEEDED

您已成功创建并部署了一个工作流,该工作流会调用子工作流,并行执行子工作流的四次迭代,并针对每个子工作流执行返回成功或失败指示器。

清理

如果您为本教程创建了一个新项目,请删除项目。 如果您使用的是现有项目,希望保留此项目且不保留本教程中添加的任何更改,请删除为教程创建的资源

删除项目

若要避免产生费用,最简单的方法是删除您为本教程创建的项目。

要删除项目,请执行以下操作:

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除教程资源

删除本教程中创建的工作流:

gcloud workflows delete workflow-child
gcloud workflows delete workflow-parent

后续步骤