从 AWS 步骤函数迁移到 Workflows

为帮助您准备好从 Amazon Web Services (AWS) Step Functions 迁移到 Google Cloud上的 Workflows,本页介绍了这两种产品之间的主要相似之处和不同之处。本文旨在帮助已熟悉 Step Functions 的用户使用 Workflow 实现类似的架构。

与 Step Functions 一样,Workflows 是一个全代管式、基于状态的编排平台,该平台会按照您定义的顺序执行服务:工作流。这些工作流可以组合各种服务,包括 Cloud Run 或 Cloud Run functions 上托管的自定义服务、 Google Cloud Cloud Vision AI 和 BigQuery 等服务以及任何基于 HTTP 的 API。

请注意,由于极速工作流的持续时间有限,并且不支持精确一次工作流执行,因此我们在此处未考虑 Step Functions 极速工作流这一 AWS Step Functions 工作流类型。

Hello World

在 Step Functions 中,状态机就是工作流,任务是工作流中的一种状态,表示其他 AWS 服务执行的单个工作单元。步骤函数要求每个状态都定义下一个状态。

在 Workflows 中,一系列使用 Workflows 语法的步骤用于描述要执行的任务。Workflows 会将步骤视为有序列表并逐个执行,直到所有步骤都运行完毕。

以下“Hello World”示例演示了如何在步骤函数中使用状态,以及如何在工作流中使用步骤

步骤函数

  {
    "Comment": "Hello world example of Pass states in Amazon States Language",
    "StartAt": "Hello",
    "States": {
      "Hello": {
        "Type": "Pass",
        "Result": "Hello",
        "Next": "World"
      },
      "World": {
        "Type": "Pass",
        "Result": "World",
        "End": true
      }
    }
  }

工作流 YAML

  ---
  # Hello world example of steps using Google Cloud Workflows syntax
  main:
      steps:
      - Hello:
          next: World
      - World:
          next: end
  ...

工作流 JSON

  {
    "main": {
      "steps": [
        {
          "Hello": {
            "next": "World"
          }
        },
        {
          "World": {
            "next": "end"
          }
        }
      ]
    }
  }

比较概览

本部分将更详细地比较这两款产品。

步骤函数Workflows
语法JSON(在工具中为 YAML) YAML 或 JSON
控制流在状态之间转换 包含步骤的命令式流控制
Worker资源 (ARN) 和 HTTP 任务 HTTP 请求和连接器
可靠性捕获/重试 捕获/重试
最大并行数量支持 支持
状态数据传递状态 工作流变量
AuthenticationIAM IAM
用户体验Workflow Studio、CLI、SDK、IaC Google Cloud 控制台、CLI、SDK、IaC
价格 步骤函数价格 Workflows 价格
语法

Step Functions 主要使用 JSON 来定义函数,不直接支持 YAML;不过,在适用于 Visual Studio Code 的 AWS 工具包和 AWS CloudFormation 中,您可以使用 YAML 来定义 Step Functions。

您可以使用 Workflows 语法描述 Workflows 步骤,这些步骤可以使用 YAML 或 JSON 编写。大多数工作流都是采用 YAML 格式。本页中的示例展示了 YAML 的优势,包括易于读写以及对注释的原生支持。如需详细了解 Workflows 语法,请参阅语法参考文档

控制流

Workflows 和 Step Functions 都将工作流建模为一系列任务:Workflows 中的步骤,Step Functions 中的状态。这两种方式都允许任务指明下一步要执行哪项任务,并且支持类似于开关的条件,以便根据当前状态选择下一个工作单元。一个关键区别是,Step Functions 要求每个状态都定义下一个状态,而 Workflows 会按照指定的顺序(包括备选后续步骤)执行步骤。如需了解详情,请参阅条件步骤

工作器

这两款产品都可以编排函数、容器和其他 Web 服务等计算资源,以便完成各种任务。在 Step Functions 中,工作器由 Resource 字段标识,该字段在语法上是 URI。用于标识工作器资源的 URI 采用 Amazon 资源名称 (ARN) 格式。如需直接调用任意 HTTP 端点,您可以定义 HTTP 任务

工作流可以向任意 HTTP 端点发送 HTTP 请求并获取响应。借助连接器,您可以更轻松地连接到工作流中的其他 Google Cloud API,并将工作流与 Pub/Sub、BigQuery 或 Cloud Build 等其他 Google Cloud 产品集成。连接器会为您处理请求的格式,从而为您处理请求的格式设置并提供方法和参数,这样您就无需了解Google Cloud API 的详细信息。您还可以配置超时和轮询政策。

可靠性

如果任务失败,您的工作流必须能够适当地重试、在必要时捕获异常,并根据需要重新路由工作流。步骤函数和工作流都使用类似的机制来实现可靠性:使用重试捕获异常,并最终将其分派到工作流中的其他位置。如需了解详情,请参阅工作流错误

最大并行数量

您可能希望工作流程并行协调多个任务。步骤函数提供了两种实现方式:您可以获取一个数据项,并将其并行传递给多个不同的任务;或者,您可以使用数组,并将其元素传递给同一任务。

在工作流中,您可以定义工作流的某个部分,让两个或更多步骤可以并发执行。您可以定义并发运行的分支,也可以定义迭代并发运行的循环。如需了解详情,请参阅并行执行工作流步骤

状态数据

工作流引擎的一个好处是,它会为您维护状态数据,而无需外部数据存储区。在 Step Functions 中,状态数据会以 JSON 结构的形式从一个状态传递到另一个状态。

在 Workflows 中,您可以在全局变量中保存状态数据。由于您最多可以设置一年的执行时长,因此只要实例仍在执行,您就可以保留状态数据。

身份验证

这两款产品都依赖于底层 Identity and Access Management (IAM) 系统进行身份验证和访问权限控制。例如,您可以使用 IAM 角色调用 Step Functions。

在 Workflows 中,您可以使用服务账号调用工作流;您可以使用 OAuth 2.0 或 OIDC 与 Google CloudAPI 连接;您还可以使用授权请求标头对第三方 API 进行身份验证。如需了解详情,请参阅向工作流授予访问 Google Cloud 资源的权限通过工作流发出经过身份验证的请求

用户体验

您可以使用命令行工具或 Terraform 等基础架构即代码 (IaC) 来定义和管理 Step Functions 和工作流。

此外,Workflows 支持使用客户端库在 Google Cloud 控制台中执行工作流,也可以使用 Google Cloud CLI 或向 Workflows REST API 发送请求来执行工作流。如需了解详情,请参阅执行工作流

价格

这两款产品都有免费层级。如需了解详情,请参阅各自的价格页面:步骤函数价格工作流价格

将状态类型映射到步骤

Step Functions 中有八种状态类型。状态是状态机中的元素,可以根据其输入做出决策、执行操作,并将输出传递给其他状态。在从 Step Functions 迁移到 Workflows 之前,请确保您了解如何将每种状态类型转换为 Workflows 步骤。

Choice

Choice 状态会向状态机添加分支逻辑。

在工作流中,您可以使用 switch 块作为选择机制,以允许表达式的值控制工作流的执行流。如果某个值匹配,系统会执行该条件的语句。如需了解详情,请参阅条件

步骤函数

  "ChoiceState": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.foo",
        "NumericEquals": 1,
        "Next": "FirstMatchState"
      },
      {
        "Variable": "$.foo",
        "NumericEquals": 2,
        "Next": "SecondMatchState"
      }
    ],
    "Default": "DefaultState"
  }

工作流 YAML

  switch:
    - condition: ${result.body.SomeField < 10}
      next: small
    - condition: ${result.body.SomeField < 100}
      next: medium
    - condition: true
      next: default_step

工作流 JSON

  {
    "switch": [
      {
        "condition": "${result.body.SomeField < 10}",
        "next": "small"
      },
      {
        "condition": "${result.body.SomeField < 100}",
        "next": "medium"
      },
      {
        "condition": true,
        "next": "default_step"
      }
    ]
  }

Fail

Fail 状态会停止状态机的执行,并将其标记为失败。

在 Workflows 中,您可以使用 raise 语法引发自定义错误,还可以使用 try/except 块捕获和处理错误。如需了解详情,请参阅抛出错误

步骤函数

  "FailState": {
      "Type": "Fail",
      "Error": "ErrorA",
      "Cause": "Kaiju attack"
  }

工作流 YAML

  raise:
      code: 55
      message: "Something went wrong."

工作流 JSON

  {
    "raise": {
      "code": 55,
      "message": "Something went wrong."
    }
  }

Map

Map 状态可用于针对输入数组的每个元素运行一组步骤。

在 Workflows 中,您可以使用 for 循环进行迭代

步骤函数

  { "StartAt": "ExampleMapState",
    "States": {
      "ExampleMapState": {
        "Type": "Map",
        "Iterator": {
           "StartAt": "CallLambda",
           "States": {
             "CallLambda": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
               "End": true
             }
           }
        }, "End": true
      }
    }
  }

工作流 YAML

  - assignStep:
      assign:
        - map:
            1: 10
            2: 20
            3: 30
        - sum: 0
  - loopStep:
      for:
          value: key
          in: ${keys(map)}
          steps:
            - sumStep:
                assign:
                  - sum: ${sum + map[key]}
  - returnStep:
      return: ${sum}

工作流 JSON

  [
    {
      "assignStep": {
        "assign": [
          {
            "map": {
              "1": 10,
              "2": 20,
              "3": 30
            }
          },
          {
            "sum": 0
          }
        ]
      }
    },
    {
      "loopStep": {
        "for": {
          "value": "key",
          "in": "${keys(map)}",
          "steps": [
            {
              "sumStep": {
                "assign": [
                  {
                    "sum": "${sum + map[key]}"
                  }
                ]
              }
            }
          ]
        }
      }
    },
    {
      "returnStep": {
        "return": "${sum}"
      }
    }
  ]

Parallel

Parallel 状态可用于在状态机中创建并行执行分支。在以下 Step Functions 示例中,系统会并行执行地址和电话号码查询。

在工作流中,您可以使用 parallel 步骤来定义工作流的某个部分,其中两个或更多步骤可以并发执行。如需了解详情,请参阅并行步骤

步骤函数

  { "StartAt": "LookupCustomerInfo",
    "States": {
      "LookupCustomerInfo": {
        "Type": "Parallel",
        "End": true,
        "Branches": [
          {
           "StartAt": "LookupAddress",
           "States": {
             "LookupAddress": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder",
               "End": true
             }
           }
         },
         {
           "StartAt": "LookupPhone",
           "States": {
             "LookupPhone": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder",
               "End": true
             }
           }
         }
        ]
      }
    }
  }

工作流 YAML

  main:
     params: [args]
     steps:
     - init:
         assign:
         - workflow_id: "lookupAddress"
         - customer_to_lookup:
             - address: ${args.customerId}
             - phone: ${args.customerId}
         - addressed: ["", ""] # to write to this variable, you must share it
     - parallel_address:
         parallel:
             shared: [addressed]
             for:
                 in: ${customer_to_lookup}
                 index: i # optional, use if index is required
                 value: arg
                 steps:
                 - address:
                     call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                     args:
                         workflow_id: ${workflow_id}
                         argument: ${arg}
                     result: r
                 - set_result:
                     assign:
                     - addressed[i]: ${r}
     - return:
             return: ${addressed}

工作流 JSON

  {
    "main": {
      "params": [
        "args"
      ],
      "steps": [
        {
          "init": {
            "assign": [
              {
                "workflow_id": "lookupAddress"
              },
              {
                "customer_to_lookup": [
                  {
                    "address": "${args.customerId}"
                  },
                  {
                    "phone": "${args.customerId}"
                  }
                ]
              },
              {
                "addressed": [
                  "",
                  ""
                ]
              }
            ]
          }
        },
        {
          "parallel_address": {
            "parallel": {
              "shared": [
                "addressed"
              ],
              "for": {
                "in": "${customer_to_lookup}",
                "index": "i",
                "value": "arg",
                "steps": [
                  {
                    "address": {
                      "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                      "args": {
                        "workflow_id": "${workflow_id}",
                        "argument": "${arg}"
                      },
                      "result": "r"
                    }
                  },
                  {
                    "set_result": {
                      "assign": [
                        {
                          "addressed[i]": "${r}"
                        }
                      ]
                    }
                  }
                ]
              }
            }
          }
        },
        {
          "return": {
            "return": "${addressed}"
          }
        }
      ]
    }
  }

Pass

Pass 状态会将其输入传递给其输出,而不会执行工作。这通常用于操作 JSON 中的状态数据。

由于 Workflow 不会以这种方式传递数据,因此您可以将状态保留为无操作,也可以使用分配步骤来修改变量。如需了解详情,请参阅分配变量

步骤函数

  "No-op": {
    "Type": "Pass",
    "Result": {
      "x-datum": 0.38,
      "y-datum": 622.22
    },
    "ResultPath": "$.coords",
    "Next": "End"
  }

工作流 YAML

  assign:
      - number: 5
      - number_plus_one: ${number+1}
      - other_number: 10
      - string: "hello"

工作流 JSON

  {
    "assign": [
      {
        "number": 5
      },
      {
        "number_plus_one": "${number+1}"
      },
      {
        "other_number": 10
      },
      {
        "string": "hello"
      }
    ]
  }

成功

Succeed 状态表示成功停止执行。

在工作流中,您可以在主工作流中使用 return 来停止工作流的执行。您还可以通过完成最终步骤来完成工作流(假设该步骤不会跳转到其他步骤),或者如果您不需要返回值,也可以使用 next: end 停止工作流的执行。如需了解详情,请参阅完成工作流的执行

步骤函数

  "SuccessState": {
    "Type": "Succeed"
  }

工作流 YAML

  return: "Success!"
  next: end

工作流 JSON

  {
    "return": "Success!",
    "next": "end"
  }

Task

Task 状态表示状态机执行的单个工作单元。在以下 Step Functions 示例中,它会调用 Lambda 函数。(activity 是 AWS 步骤函数的一项功能,可让您在状态机中创建任务,以便在其他位置执行工作。)

在工作流示例中,系统会调用 HTTP 端点来调用 Cloud Run 函数。您还可以使用连接器,轻松访问其他 Google Cloud 产品。此外,您还可以暂停工作流并轮询数据。或者,您也可以使用回调端点告知工作流指定事件已发生,然后等待该事件而不进行轮询。

步骤函数

  "HelloWorld": {
    "Type": "Task",
    "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
    "End": true
  }

工作流 YAML

  - HelloWorld:
      call: http.get
      args:
          url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld
      result: helloworld_result

工作流 JSON

  [
    {
      "HelloWorld": {
        "call": "http.get",
        "args": {
          "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld"
        },
        "result": "helloworld_result"
      }
    }
  ]

Wait

Wait 状态会延迟状态机在指定时间内继续运行。

您可以使用 Workflows sys.sleep 标准库函数在指定的秒数内暂停执行,最长持续 31536000(一年)。

步骤函数

  "wait_ten_seconds" : {
    "Type" : "Wait",
    "Seconds" : 10,
    "Next": "NextState"
  }

工作流 YAML

  - someSleep:
      call: sys.sleep
      args:
          seconds: 10

工作流 JSON

  [
    {
      "someSleep": {
        "call": "sys.sleep",
        "args": {
          "seconds": 10
        }
      }
    }
  ]

示例:编排微服务

以下 Step Functions 示例会检查股票价格、确定是买入还是卖出,并报告结果。示例中的状态机通过传递参数与 AWS Lambda 集成,使用 Amazon SQS 队列请求人工审批,并使用 Amazon SNS 主题返回查询结果。

{
      "StartAt": "Check Stock Price",
      "Comment": "An example of integrating Lambda functions in Step Functions state machine",
      "States": {
          "Check Stock Price": {
              "Type": "Task",
              "Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
              "Next": "Generate Buy/Sell recommendation"
          },
          "Generate Buy/Sell recommendation": {
              "Type": "Task",
              "Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
              "ResultPath": "$.recommended_type",
              "Next": "Request Human Approval"
          },
          "Request Human Approval": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
              "Parameters": {
                  "QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
                  "MessageBody": {
                      "Input.$": "$",
                      "TaskToken.$": "$$.Task.Token"
                  }
              },
              "ResultPath": null,
              "Next": "Buy or Sell?"
          },
          "Buy or Sell?": {
              "Type": "Choice",
              "Choices": [
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "buy",
                      "Next": "Buy Stock"
                  },
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "sell",
                      "Next": "Sell Stock"
                  }
              ]
          },
          "Buy Stock": {
              "Type": "Task",
              "Resource": "BUY_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Sell Stock": {
              "Type": "Task",
              "Resource": "SELL_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Report Result": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sns:publish",
              "Parameters": {
                  "TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
                  "Message": {
                      "Input.$": "$"
                  }
              },
              "End": true
          }
      }
  }

迁移到 Workflows

如需将上述 Step Functions 示例迁移到 Workflows,您可以通过集成 Cloud Run 函数、支持等待 HTTP 请求到达该端点的回调端点,以及使用 Workflows 连接器发布到 Pub/Sub 主题(而非 Amazon SNS 主题)来创建等效的 Workflows 步骤:

  1. 完成创建工作流的步骤,但请勿进行部署。

  2. 在工作流的定义中,添加一个步骤来创建等待人工输入的回调端点,以及一个步骤来使用 Workflows 连接器发布到 Pub/Sub 主题。例如:

    工作流 YAML

      ---
      main:
        steps:
          - init:
              assign:
                - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'
                - region: LOCATION
                - topic: PUBSUB_TOPIC_NAME
          - Check Stock Price:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"}
                auth:
                  type: OIDC
              result: stockPriceResponse
          - Generate Buy/Sell Recommendation:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"}
                auth:
                  type: OIDC
                query:
                  price: ${stockPriceResponse.body.stock_price}
              result: recommendResponse
          - Create Approval Callback:
              call: events.create_callback_endpoint
              args:
                  http_callback_method: "GET"
              result: callback_details
          - Print Approval Callback Details:
              call: sys.log
              args:
                  severity: "INFO"
                  text: ${"Listening for callbacks on " + callback_details.url}
          - Await Approval Callback:
              call: events.await_callback
              args:
                  callback: ${callback_details}
                  timeout: 3600
              result: approvalResponse
          - Approval?:
              try:
                switch:
                  - condition: ${approvalResponse.http_request.query.response[0] == "yes"}
                    next: Buy or Sell?
              except:
                as: e
                steps:
                  - unknown_response:
                      raise: ${"Unknown response:" + e.message}
                      next: end
          - Buy or Sell?:
              switch:
                - condition: ${recommendResponse.body == "buy"}
                  next: Buy Stock
                - condition: ${recommendResponse.body == "sell"}
                  next: Sell Stock
                - condition: true
                  raise: ${"Unknown recommendation:" + recommendResponse.body}
          - Buy Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Sell Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Report Result:
              call: googleapis.pubsub.v1.projects.topics.publish
              args:
                topic: ${"projects/" + projectId + "/topics/" + topic}
                body:
                  messages:
                  - data: '${base64.encode(json.encode(message))}'
              next: end
      ...

    工作流 JSON

      {
        "main": {
          "steps": [
            {
              "init": {
                "assign": [
                  {
                    "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                  },
                  {
                    "region": "LOCATION"
                  },
                  {
                    "topic": [
                      "PUBSUB_TOPIC_NAME"
                    ]
                  }
                ]
              }
            },
            {
              "Check Stock Price": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}",
                  "auth": {
                    "type": "OIDC"
                  }
                },
                "result": "stockPriceResponse"
              }
            },
            {
              "Generate Buy/Sell Recommendation": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "query": {
                    "price": "${stockPriceResponse.body.stock_price}"
                  }
                },
                "result": "recommendResponse"
              }
            },
            {
              "Create Approval Callback": {
                "call": "events.create_callback_endpoint",
                "args": {
                  "http_callback_method": "GET"
                },
                "result": "callback_details"
              }
            },
            {
              "Print Approval Callback Details": {
                "call": "sys.log",
                "args": {
                  "severity": "INFO",
                  "text": "${\"Listening for callbacks on \" + callback_details.url}"
                }
              }
            },
            {
              "Await Approval Callback": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "approvalResponse"
              }
            },
            {
              "Approval?": {
                "try": {
                  "switch": [
                    {
                      "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}",
                      "next": "Buy or Sell?"
                    }
                  ]
                },
                "except": {
                  "as": "e",
                  "steps": [
                    {
                      "unknown_response": {
                        "raise": "${\"Unknown response:\" + e.message}",
                        "next": "end"
                      }
                    }
                  ]
                }
              }
            },
            {
              "Buy or Sell?": {
                "switch": [
                  {
                    "condition": "${recommendResponse.body == \"buy\"}",
                    "next": "Buy Stock"
                  },
                  {
                    "condition": "${recommendResponse.body == \"sell\"}",
                    "next": "Sell Stock"
                  },
                  {
                    "condition": true,
                    "raise": "${\"Unknown recommendation:\" + recommendResponse.body}"
                  }
                ]
              }
            },
            {
              "Buy Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Sell Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Report Result": {
                "call": "googleapis.pubsub.v1.projects.topics.publish",
                "args": {
                  "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}",
                  "body": {
                    "messages": [
                      {
                        "data": "${base64.encode(json.encode(message))}"
                      }
                    ]
                  }
                },
                "next": "end"
              }
            }
          ]
        }
      }

    替换以下内容:

    • LOCATION:受支持的 Google Cloud 区域。例如 us-central1
    • PUBSUB_TOPIC_NAME:您的 Pub/Sub 主题的名称。例如 my_stock_example
  3. 部署,然后执行工作流

  4. 在工作流执行期间,它会暂停并等待您调用回调端点。您可以使用 curl 命令执行此操作。例如:

    curl -H "Authorization: Bearer $(gcloud auth print-access-token)"
    https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
    

    CALLBACK_URL 替换为回调端点的路径的其余部分。

  5. 工作流程成功完成后,您可以从 Pub/Sub 订阅接收消息。例如:

    gcloud pubsub subscriptions pull stock_example-sub  --format="value(message.data)" | jq
    

    输出消息应类似如下所示(buysell):

    {
      "body": "buy",
      "code": 200,
      "headers": {
      [...]
      }
    

后续步骤