运行并行执行其他工作流的工作流

本教程将介绍如何创建和运行可并行执行多个子工作流的父工作流。

在下图中,子工作流的 4 个并行执行被调用。这样一来,父工作流就可以在并行分支中处理数据,从而缩短总体执行时间。父工作流会等待所有子工作流执行完毕,然后返回成功和失败执行的摘要,从而简化任何错误检测。

父工作流调用子工作流的并行迭代

创建和部署子工作流

子工作流可以接收和处理来自父工作流的数据。子工作流通过执行以下操作来演示这一点:

  • 接收一个整数作为实参
  • 休眠 10 秒以模拟某些处理
  • 返回一个指示器(基于整数是奇数还是偶数),以模拟工作流程执行的成功或失败

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面。

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称 workflow-child

  4. 区域列表中,选择 us-central1

  5. 选择您之前创建的服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - check_iteration_even_or_odd:
            switch:
              - condition: ${iteration % 2 == 0}
                next: raise_error
        - return_message:
            return: ${"Hello world"+iteration}
        - raise_error:
            raise: ${"Error with iteration "+iteration}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch workflow-child.yaml
  2. 在文本编辑器中打开源代码文件,然后将以下工作流复制到该文件中。

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - check_iteration_even_or_odd:
            switch:
              - condition: ${iteration % 2 == 0}
                next: raise_error
        - return_message:
            return: ${"Hello world"+iteration}
        - raise_error:
            raise: ${"Error with iteration "+iteration}
  3. 部署工作流:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    SERVICE_ACCOUNT_NAME 替换为您之前创建的服务账号的名称。

创建和部署父工作流

父工作流使用并行 for 循环执行子工作流的多个分支。

  1. 复制工作流定义的源代码。它由以下部分组成:

    1. 映射用于存储子工作流执行的结果。如需了解详情,请参阅地图

      main:
        steps:
          - init:
              assign:
                - execution_results: {} # results from each execution
                - execution_results.success: {} # successful executions saved under 'success' key
                - execution_results.failure: {} # failed executions saved under 'failure' key
    2. for 循环并行执行,以调用子工作流。如需了解详情,请参阅并行步骤迭代

      - execute_child_workflows:
          parallel:
            shared: [execution_results]
            for:
              value: iteration
              in: [1, 2, 3, 4]
              steps:
                  - iterate:
    3. 使用连接器调用子工作流。子工作流的每次迭代都会传递 iteration 实参。父工作流程会等待并存储每个子工作流程的执行结果。如需了解详情,请参阅 Workflows Executions API 连接器运行时参数

      try:
        steps:
          - execute_child_workflow:
              call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
              args:
                workflow_id: workflow-child
                #location: ...
                #project_id: ...
                argument:
                  iteration: ${iteration}
              result: execution_result
          - save_successful_execution:
              assign:
                - execution_results.success[string(iteration)]: ${execution_result}
      except:
          as: e
          steps:
            - save_failed_execution:
                assign:
                  - execution_results.failure[string(iteration)]: ${e}
    4. 系统会返回执行结果。如需了解详情,请参阅完成工作流的执行

      - return_execution_results:
          return: ${execution_results}
  2. 部署工作流:

    控制台

    1. 在 Google Cloud 控制台中,前往 Workflows 页面:

      进入 Workflows

    2. 点击 创建

    3. 输入新工作流的名称 workflow-parent

    4. 区域列表中,选择 us-central1

    5. 选择您之前创建的服务账号

    6. 点击下一步

    7. 在工作流编辑器中,粘贴父工作流的定义。

    8. 点击部署

    gcloud

    1. 为工作流创建源代码文件:

      touch workflow-parent.yaml
    2. 在文本编辑器中打开源代码文件,然后粘贴父工作流的定义。

    3. 部署工作流:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

      SERVICE_ACCOUNT_NAME 替换为您之前创建的服务账号的名称。

执行父工作流

执行父工作流,以便并行运行子工作流的调用。执行大约需要 10 秒才能完成。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    进入 Workflows

  2. 工作流页面上,点击 workflow-parent 工作流以转到其详情页面。

  3. 工作流详情页面上,点击 执行

  4. 再次点击执行

  5. 输出窗格中查看工作流的结果。

    结果应与以下内容类似,表明迭代 2 和 4 出现错误,而迭代 1 和 3 成功。

    "failure": {
      "2": {
        "message": "Execution failed or cancelled.",
        "operation": {
          "argument": "{\"iteration\":2}",
          "duration": "10.157992541s",
          "endTime": "2023-07-11T13:13:13.028424329Z",
          "error": {
            "context": "RuntimeError: \"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18",
            "payload": "\"Error with iteration 2\"",
    ...
      "4": {
        "message": "Execution failed or cancelled.",
        "operation": {
          "argument": "{\"iteration\":4}",
          "duration": "10.157929734s",
          "endTime": "2023-07-11T13:13:13.061289142Z",
          "error": {
            "context": "RuntimeError: \"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18",
            "payload": "\"Error with iteration 4\"",
    ...
    "success": {
      "1": "Hello world1",
      "3": "Hello world3"

gcloud

执行工作流:

gcloud workflows run workflow-parent \
    --location=us-central1

结果应与以下内容类似,表明迭代 2 和 4 出现错误,而迭代 1 和 3 成功。

Waiting for execution [06c753e4-6947-4c62-ac0b-2a9d53fb1b8f] to complete...done.
argument: 'null'
duration: 14.065415004s
endTime: '2023-07-11T12:50:43.929023883Z'
name: projects/386837416586/locations/us-central1/workflows/workflow-parent/executions/06c753e4-6947-4c62-ac0b-2a9d53fb1b8f
result: '{"failure":{"2":{"message":"Execution failed or cancelled.","operation":{"argument":"{\"iteration\":2}","duration":"10.143718070s","endTime":"2023-07-11T12:50:40.673209821Z","error":{"context":"RuntimeError:
...
"Error with iteration 2\"\nin step \"raise_error\", routine \"main\", line: 18","payload":"\"Error
...
"Error with iteration 4\"\nin step \"raise_error\", routine \"main\", line: 18","payload":"\"Error
...
"success":{"1":"Hello world1","3":"Hello world3"}}'
startTime: '2023-07-11T12:50:29.863608879Z'
state: SUCCEEDED

您已成功创建并部署了一个工作流,该工作流可调用子工作流、在并行分支中执行子工作流的四次迭代,并返回每次子工作流执行的成功或失败指示。