从 AWS Step Functions 迁移到 Workflows

为帮助您准备好从 Amazon Web Services (AWS) 步骤函数迁移到 Google Cloud 上的 Workflows,本页面介绍了这两种产品之间的主要相似之处和不同之处。此信息旨在帮助已经熟悉步骤函数的用户使用 Workflows 实现类似的架构。

与 Step Functions 一样,Workflows 是一个基于状态的全代管式编排平台,可以按您定义的顺序执行服务,即工作流。这些工作流可以组合使用如下服务:托管在 Cloud Run 或 Cloud Functions 上的自定义服务、Cloud Vision AI 和 BigQuery 等 Google Cloud 服务,以及任何基于 HTTP 的 API。

请注意,步骤函数 Express Workflows 是一种 AWS 步骤函数工作流类型,此处不考虑该类型,因为 Express 工作流的时长有限,并且不支持“正好一次”工作流执行。

Hello World

在步骤函数中,状态机是一种工作流,任务是工作流中的一种状态,表示另一 AWS 服务执行的单个工作单元。 单步函数要求每个状态都定义下一个状态。

在 Workflows 中,一系列使用 Workflows 语法的步骤描述了要执行的任务。Workflows 会将步骤视为位于有序列表中,并一次执行一个步骤,直到所有步骤都运行完毕。

以下“Hello world”示例演示了如何在步骤函数和 Workflows 的步骤中使用 states

步数函数

  {
    "Comment": "Hello world example of Pass states in Amazon States Language",
    "StartAt": "Hello",
    "States": {
      "Hello": {
        "Type": "Pass",
        "Result": "Hello",
        "Next": "World"
      },
      "World": {
        "Type": "Pass",
        "Result": "World",
        "End": true
      }
    }
  }

工作流 YAML

  ---
  # Hello world example of steps using Google Cloud Workflows syntax
  main:
      steps:
      - Hello:
          next: World
      - World:
          next: end
  ...

Workflows JSON

  {
    "main": {
      "steps": [
        {
          "Hello": {
            "next": "World"
          }
        },
        {
          "World": {
            "next": "end"
          }
        }
      ]
    }
  }

对比情况概览

本部分将更详细地比较这两种产品。

步数函数Workflows
语法JSON(工具中的 YAML) YAML 或 JSON
控制流状态之间的转换 命令式流控制与步骤
工作器资源 (ARN) HTTP 请求和连接器
可靠性捕获/重试 捕获/重试
最大并行数量受支持 受支持
状态数据状态会传递 Workflows 变量
AuthenticationIAM IAM
用户体验Workflow Studio、CLI、SDK、IaC Google Cloud 控制台、CLI、SDK、IaC
价格 Step Functions 价格 Workflows 价格
语法

Step Functions 主要使用 JSON 来定义函数,不直接支持 YAML;但在适用于 Visual Studio Code 的 AWS Toolkit 和 AWS CloudFormation 中,您可以使用 YAML 定义步骤函数。

您可以使用 Workflows 语法描述 Workflows 步骤,并且可以使用 YAML 或 JSON 编写这些步骤。大多数工作流都采用 YAML。本页中的示例展示了 YAML 的优势,包括易于读取和写入,以及对注释的原生支持。如需详细了解 Workflows 语法,请参阅语法参考文档

控制流

Workflows 和步骤函数会将工作流建模为一系列任务:Workflows 中的 steps 和步骤函数中的 states。两者都允许任务指示下一个要执行的任务,并且支持类似开关的条件,以便根据当前状态选择下一个工作单元。一个关键的区别在于,单步函数要求每个状态定义下一个状态,而 Workflows 会按照指定顺序执行步骤(包括其他后续步骤)。如需了解详情,请参阅条件步骤

工作器

这两种产品都可以编排函数、容器和其他网络服务等计算资源来完成任务。在步骤函数中,工作器由 Resource 字段标识,该字段在语法上是 URI。用于标识工作器资源的 URI 采用 Amazon 资源名称 (ARN) 格式,用户无法直接调用任意 HTTP 端点。

Workflows 可以向任意 HTTP 端点发送 HTTP 请求并获取响应。借助连接器,您可以更轻松地在工作流中连接到其他 Google Cloud API,并将工作流与 Pub/Sub、BigQuery 或 Cloud Build 等其他 Google Cloud 产品集成。连接器简化了调用服务,因为它们会为您处理请求的格式设置,提供方法和参数,因此您无需了解 Google Cloud API 的详细信息。您还可以配置超时和轮询政策。

可靠性

如果任务失败,您的工作流必须能够适当地重试,在必要时捕获异常,并根据需要重新路由工作流。步骤函数和 Workflows 都使用类似的机制来实现可靠性:通过重试捕获异常,以及最终分派到工作流中的其他位置。有关详情,请参阅工作流程错误

并行数量

您可能希望工作流并行编排多个任务。步骤函数提供了两种方法来实现这一点:您可以获取一个数据项并将其并行传递到多个不同的任务;或者,您可以使用数组并将其元素传递给同一任务。

在 Workflows 中,您可以定义工作流中可以同时执行两个或多个步骤的部分。您可以定义并发运行的分支,也可以定义并发运行的循环。如需了解详情,请参阅并行执行工作流步骤

状态数据

工作流引擎的一个好处是,无需外部数据存储区即可为您维护状态数据。在步骤函数中,状态数据以 JSON 结构从一个状态传递到另一个状态。

在 Workflows 中,您可以将状态数据保存在全局变量中。由于最长可设为一年的执行时长,因此只要实例仍在执行,您就可以保留状态数据。

Authentication

这两种产品都依赖于底层 Identity and Access Management (IAM) 系统进行身份验证和访问权限控制。例如,您可以使用 IAM 角色来调用 Step Functions。

在 Workflows 中,您可以使用服务帐号调用工作流;可以使用 OAuth 2.0 或 OIDC 连接 Google Cloud API;也可以使用授权请求标头进行第三方 API 身份验证。如需了解详情,请参阅授予工作流访问 Google Cloud 资源的权限从工作流发出经过身份验证的请求

用户体验

您可以使用命令行工具或基础架构即代码 (IaC)(如 Terraform)来定义和管理步骤函数及 Workflows。

此外,Workflows 还支持使用客户端库、在 Google Cloud 控制台中、使用 Google Cloud CLI 或通过向 Workflows REST API 发送请求来执行工作流。如需了解详情,请参阅执行工作流

价格

这两种产品都有一个免费层级。如需了解详情,请参阅相应的价格页面:步骤函数价格Workflows 价格

将状态类型映射到步骤

步骤函数中有八种状态类型。状态是状态机中的元素,这些元素可以根据其输入做出决策、执行操作,并将输出传递给其他状态。在从步骤函数迁移到 Workflows 之前,请确保您了解如何将每种状态类型转换为 Workflows 步骤。

Choice

Choice 状态会向状态机添加分支逻辑。

在 Workflows 中,您可以使用 switch 块作为选择机制,以允许表达式的值控制工作流的执行流程。如果某个值匹配,系统会执行该条件的语句。 如需了解详情,请参阅条件

步数函数

  "ChoiceState": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.foo",
        "NumericEquals": 1,
        "Next": "FirstMatchState"
      },
      {
        "Variable": "$.foo",
        "NumericEquals": 2,
        "Next": "SecondMatchState"
      }
    ],
    "Default": "DefaultState"
  }

工作流 YAML

  switch:
    - condition: ${result.body.SomeField < 10}
      next: small
    - condition: ${result.body.SomeField < 100}
      next: medium
    - condition: true
      next: default_step

Workflows JSON

  {
    "switch": [
      {
        "condition": "${result.body.SomeField < 10}",
        "next": "small"
      },
      {
        "condition": "${result.body.SomeField < 100}",
        "next": "medium"
      },
      {
        "condition": true,
        "next": "default_step"
      }
    ]
  }

Fail

Fail 状态会停止状态机的执行,并将其标记为失败。

在 Workflows 中,您可以使用 raise 语法引发自定义错误,也可以使用 try/except 代码块捕获和处理错误。如需了解详情,请参阅引发错误

步数函数

  "FailState": {
      "Type": "Fail",
      "Error": "ErrorA",
      "Cause": "Kaiju attack"
  }

工作流 YAML

  raise:
      code: 55
      message: "Something went wrong."

Workflows JSON

  {
    "raise": {
      "code": 55,
      "message": "Something went wrong."
    }
  }

Map

Map 状态可用于为输入数组的每个元素运行一组步骤。

在 Workflows 中,您可以将 for 循环用于迭代

步数函数

  { "StartAt": "ExampleMapState",
    "States": {
      "ExampleMapState": {
        "Type": "Map",
        "Iterator": {
           "StartAt": "CallLambda",
           "States": {
             "CallLambda": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
               "End": true
             }
           }
        }, "End": true
      }
    }
  }

工作流 YAML

  - assignStep:
      assign:
        - map:
            1: 10
            2: 20
            3: 30
        - sum: 0
  - loopStep:
      for:
          value: key
          in: ${keys(map)}
          steps:
            - sumStep:
                assign:
                  - sum: ${sum + map[key]}
  - returnStep:
      return: ${sum}

Workflows JSON

  [
    {
      "assignStep": {
        "assign": [
          {
            "map": {
              "1": 10,
              "2": 20,
              "3": 30
            }
          },
          {
            "sum": 0
          }
        ]
      }
    },
    {
      "loopStep": {
        "for": {
          "value": "key",
          "in": "${keys(map)}",
          "steps": [
            {
              "sumStep": {
                "assign": [
                  {
                    "sum": "${sum + map[key]}"
                  }
                ]
              }
            }
          ]
        }
      }
    },
    {
      "returnStep": {
        "return": "${sum}"
      }
    }
  ]

Parallel

Parallel 状态可用于在状态机中创建并行执行分支。在下列步骤函数示例中,地址和手机号码查找是并行执行的。

在 Workflows 中,您可以使用 parallel 步骤来定义可以并发执行两个或多个步骤的工作流的一部分。如需了解详情,请参阅并行步骤

步数函数

  { "StartAt": "LookupCustomerInfo",
    "States": {
      "LookupCustomerInfo": {
        "Type": "Parallel",
        "End": true,
        "Branches": [
          {
           "StartAt": "LookupAddress",
           "States": {
             "LookupAddress": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder",
               "End": true
             }
           }
         },
         {
           "StartAt": "LookupPhone",
           "States": {
             "LookupPhone": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder",
               "End": true
             }
           }
         }
        ]
      }
    }
  }

工作流 YAML

  main:
     params: [args]
     steps:
     - init:
         assign:
         - workflow_id: "lookupAddress"
         - customer_to_lookup:
             - address: ${args.customerId}
             - phone: ${args.customerId}
         - addressed: ["", ""] # to write to this variable, you must share it
     - parallel_address:
         parallel:
             shared: [addressed]
             for:
                 in: ${customer_to_lookup}
                 index: i # optional, use if index is required
                 value: arg
                 steps:
                 - address:
                     call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                     args:
                         workflow_id: ${workflow_id}
                         argument: ${arg}
                     result: r
                 - set_result:
                     assign:
                     - addressed[i]: ${r}
     - return:
             return: ${addressed}

Workflows JSON

  {
    "main": {
      "params": [
        "args"
      ],
      "steps": [
        {
          "init": {
            "assign": [
              {
                "workflow_id": "lookupAddress"
              },
              {
                "customer_to_lookup": [
                  {
                    "address": "${args.customerId}"
                  },
                  {
                    "phone": "${args.customerId}"
                  }
                ]
              },
              {
                "addressed": [
                  "",
                  ""
                ]
              }
            ]
          }
        },
        {
          "parallel_address": {
            "parallel": {
              "shared": [
                "addressed"
              ],
              "for": {
                "in": "${customer_to_lookup}",
                "index": "i",
                "value": "arg",
                "steps": [
                  {
                    "address": {
                      "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                      "args": {
                        "workflow_id": "${workflow_id}",
                        "argument": "${arg}"
                      },
                      "result": "r"
                    }
                  },
                  {
                    "set_result": {
                      "assign": [
                        {
                          "addressed[i]": "${r}"
                        }
                      ]
                    }
                  }
                ]
              }
            }
          }
        },
        {
          "return": {
            "return": "${addressed}"
          }
        }
      ]
    }
  }

Pass

Pass 状态会将其输入传递到其输出,而不执行操作。这通常用于操作 JSON 中的状态数据。

由于 Workflows 不会以这种方式传递数据,因此您可以将状态保留为空操作,也可以使用分配步骤来修改变量。如需了解详情,请参阅分配变量

步数函数

  "No-op": {
    "Type": "Pass",
    "Result": {
      "x-datum": 0.38,
      "y-datum": 622.22
    },
    "ResultPath": "$.coords",
    "Next": "End"
  }

工作流 YAML

  assign:
      - number: 5
      - number_plus_one: ${number+1}
      - other_number: 10
      - string: "hello"

Workflows JSON

  {
    "assign": [
      {
        "number": 5
      },
      {
        "number_plus_one": "${number+1}"
      },
      {
        "other_number": 10
      },
      {
        "string": "hello"
      }
    ]
  }

成功

Succeed 状态会成功停止执行。

在 Workflows 中,您可以在主工作流中使用 return 来停止工作流的执行。您还可以通过完成最后一步(假设该步骤不会跳转到另一个步骤)来完成工作流;或者,如果不需要返回值,您可以使用 next: end 来停止工作流的执行。如需了解详情,请参阅完成工作流的执行

步数函数

  "SuccessState": {
    "Type": "Succeed"
  }

工作流 YAML

  return: "Success!"
  next: end

Workflows JSON

  {
    "return": "Success!",
    "next": "end"
  }

Task

Task 状态表示状态机执行的单个工作单元。在下面的“步骤函数”示例中,它调用了 Lambda 函数。(Activity 是一项 AWS Step Functions 功能,让您可以通过状态机在其他位置执行工作的任务)。

在 Workflows 示例中,对 HTTP 端点发出调用以调用 Cloud Function。您还可以使用连接器,以便轻松访问其他 Google Cloud 产品。此外,您还可以暂停工作流并轮询数据。或者,您也可以使用回调端点向您的工作流发出指定事件已发生的信号,并等待该事件,但不进行轮询。

步数函数

  "HelloWorld": {
    "Type": "Task",
    "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
    "End": true
  }

工作流 YAML

  - HelloWorld:
      call: http.get
      args:
          url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld
      result: helloworld_result

Workflows JSON

  [
    {
      "HelloWorld": {
        "call": "http.get",
        "args": {
          "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld"
        },
        "result": "helloworld_result"
      }
    }
  ]

Wait

Wait 状态会延迟状态机继续运行指定的时间。

您可以使用 Workflows sys.sleep 标准库函数将执行暂停给定的秒数,上限为 31536000(一年)。

步数函数

  "wait_ten_seconds" : {
    "Type" : "Wait",
    "Seconds" : 10,
    "Next": "NextState"
  }

工作流 YAML

  - someSleep:
      call: sys.sleep
      args:
          seconds: 10

Workflows JSON

  [
    {
      "someSleep": {
        "call": "sys.sleep",
        "args": {
          "seconds": 10
        }
      }
    }
  ]

示例:编排微服务

下面的步骤函数示例会检查股票价格,确定是买入还是卖出,并报告结果。示例中的状态机通过传递参数与 AWS Lambda 集成,使用 Amazon SQS 队列请求人工批准,并使用 Amazon SNS 主题返回查询结果。

{
      "StartAt": "Check Stock Price",
      "Comment": "An example of integrating Lambda functions in Step Functions state machine",
      "States": {
          "Check Stock Price": {
              "Type": "Task",
              "Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
              "Next": "Generate Buy/Sell recommendation"
          },
          "Generate Buy/Sell recommendation": {
              "Type": "Task",
              "Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
              "ResultPath": "$.recommended_type",
              "Next": "Request Human Approval"
          },
          "Request Human Approval": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
              "Parameters": {
                  "QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
                  "MessageBody": {
                      "Input.$": "$",
                      "TaskToken.$": "$$.Task.Token"
                  }
              },
              "ResultPath": null,
              "Next": "Buy or Sell?"
          },
          "Buy or Sell?": {
              "Type": "Choice",
              "Choices": [
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "buy",
                      "Next": "Buy Stock"
                  },
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "sell",
                      "Next": "Sell Stock"
                  }
              ]
          },
          "Buy Stock": {
              "Type": "Task",
              "Resource": "BUY_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Sell Stock": {
              "Type": "Task",
              "Resource": "SELL_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Report Result": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sns:publish",
              "Parameters": {
                  "TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
                  "Message": {
                      "Input.$": "$"
                  }
              },
              "End": true
          }
      }
  }

Migrate to Workflows

如需将前面的步骤函数示例迁移到 Workflows,您可以创建等效的 Workflows 步骤,方法是集成 Cloud Functions,支持等待 HTTP 请求到达该端点的回调端点,并使用 Workflows 连接器发布到 Pub/Sub 主题来代替 Amazon SNS 主题:

  1. 完成创建工作流的步骤,但先不要部署。

  2. 在工作流的定义中,添加一个步骤来创建等待人工输入的回调端点,以及一个使用 Workflows 连接器发布到 Pub/Sub 主题的步骤。例如:

    工作流 YAML

      ---
      main:
        steps:
          - init:
              assign:
                - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'
                - region: LOCATION
                - topic: PUBSUB_TOPIC_NAME
          - Check Stock Price:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"}
                auth:
                  type: OIDC
              result: stockPriceResponse
          - Generate Buy/Sell Recommendation:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"}
                auth:
                  type: OIDC
                query:
                  price: ${stockPriceResponse.body.stock_price}
              result: recommendResponse
          - Create Approval Callback:
              call: events.create_callback_endpoint
              args:
                  http_callback_method: "GET"
              result: callback_details
          - Print Approval Callback Details:
              call: sys.log
              args:
                  severity: "INFO"
                  text: ${"Listening for callbacks on " + callback_details.url}
          - Await Approval Callback:
              call: events.await_callback
              args:
                  callback: ${callback_details}
                  timeout: 3600
              result: approvalResponse
          - Approval?:
              try:
                switch:
                  - condition: ${approvalResponse.http_request.query.response[0] == "yes"}
                    next: Buy or Sell?
              except:
                as: e
                steps:
                  - unknown_response:
                      raise: ${"Unknown response:" + e.message}
                      next: end
          - Buy or Sell?:
              switch:
                - condition: ${recommendResponse.body == "buy"}
                  next: Buy Stock
                - condition: ${recommendResponse.body == "sell"}
                  next: Sell Stock
                - condition: true
                  raise: ${"Unknown recommendation:" + recommendResponse.body}
          - Buy Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Sell Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Report Result:
              call: googleapis.pubsub.v1.projects.topics.publish
              args:
                topic: ${"projects/" + projectId + "/topics/" + topic}
                body:
                  messages:
                  - data: '${base64.encode(json.encode(message))}'
              next: end
      ...

    Workflows JSON

      {
        "main": {
          "steps": [
            {
              "init": {
                "assign": [
                  {
                    "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                  },
                  {
                    "region": "LOCATION"
                  },
                  {
                    "topic": [
                      "PUBSUB_TOPIC_NAME"
                    ]
                  }
                ]
              }
            },
            {
              "Check Stock Price": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}",
                  "auth": {
                    "type": "OIDC"
                  }
                },
                "result": "stockPriceResponse"
              }
            },
            {
              "Generate Buy/Sell Recommendation": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "query": {
                    "price": "${stockPriceResponse.body.stock_price}"
                  }
                },
                "result": "recommendResponse"
              }
            },
            {
              "Create Approval Callback": {
                "call": "events.create_callback_endpoint",
                "args": {
                  "http_callback_method": "GET"
                },
                "result": "callback_details"
              }
            },
            {
              "Print Approval Callback Details": {
                "call": "sys.log",
                "args": {
                  "severity": "INFO",
                  "text": "${\"Listening for callbacks on \" + callback_details.url}"
                }
              }
            },
            {
              "Await Approval Callback": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "approvalResponse"
              }
            },
            {
              "Approval?": {
                "try": {
                  "switch": [
                    {
                      "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}",
                      "next": "Buy or Sell?"
                    }
                  ]
                },
                "except": {
                  "as": "e",
                  "steps": [
                    {
                      "unknown_response": {
                        "raise": "${\"Unknown response:\" + e.message}",
                        "next": "end"
                      }
                    }
                  ]
                }
              }
            },
            {
              "Buy or Sell?": {
                "switch": [
                  {
                    "condition": "${recommendResponse.body == \"buy\"}",
                    "next": "Buy Stock"
                  },
                  {
                    "condition": "${recommendResponse.body == \"sell\"}",
                    "next": "Sell Stock"
                  },
                  {
                    "condition": true,
                    "raise": "${\"Unknown recommendation:\" + recommendResponse.body}"
                  }
                ]
              }
            },
            {
              "Buy Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Sell Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Report Result": {
                "call": "googleapis.pubsub.v1.projects.topics.publish",
                "args": {
                  "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}",
                  "body": {
                    "messages": [
                      {
                        "data": "${base64.encode(json.encode(message))}"
                      }
                    ]
                  }
                },
                "next": "end"
              }
            }
          ]
        }
      }

    替换以下内容:

    • LOCATION:受支持的 Google Cloud 区域。例如 us-central1
    • PUBSUB_TOPIC_NAME:您的 Pub/Sub 主题的名称。例如 my_stock_example
  3. 部署,然后执行工作流

  4. 在工作流执行期间,它会暂停并等待您调用回调端点。您可以使用 curl 命令执行此操作。例如:

    curl -H "Authorization: Bearer $(gcloud auth print-access-token)"
    https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
    

    CALLBACK_URL 替换为回调端点的路径的其余部分。

  5. 工作流成功完成后,您可以接收来自 Pub/Sub 订阅的消息。例如:

    gcloud pubsub subscriptions pull stock_example-sub  --format="value(message.data)" | jq
    

    输出消息应类似于以下内容(buysell):

    {
      "body": "buy",
      "code": 200,
      "headers": {
      [...]
      }
    

后续步骤