从 AWS Step Functions 迁移到 Workflows

为了帮助您做好从 Amazon Web Services (AWS) Step Functions 迁移到 Google Cloud 上的 Workflows,本页介绍了主要的相似之处 两款产品之间的差异本文旨在帮助已熟悉 Step Functions 的用户使用 Workflow 实现类似的架构。

与 Step Functions 一样,Workflows 是一个全代管式、基于状态的编排平台,该平台会按照您定义的顺序执行服务:工作流。这些工作流可以结合使用服务(包括自定义服务) 托管在 Cloud Run 或 Cloud Run 函数上、Google Cloud 服务 例如 Cloud Vision AI 和 BigQuery,以及任何基于 HTTP 的 API。

请注意,Step Functions Express Workflows 是 AWS Step Functions 工作流 类型,则此处不作考虑,因为 Express Workflow 的持续时间为 受限,并且不支持“正好一次”工作流执行。

Hello World

在步骤函数中,状态机是一种工作流,而任务则是指 代表其他 AWS 服务执行的单个工作单元的工作流。 步骤函数要求每个状态都定义下一个状态。

在 Workflows 中,一系列使用 Workflows 语法的步骤用于描述要执行的任务。Workflows 会将步骤视为有序列表并逐个执行,直到所有步骤都运行完毕。

以下“Hello World”示例演示了如何在步骤函数中使用状态,以及如何在工作流中使用步骤

步骤函数

  {
    "Comment": "Hello world example of Pass states in Amazon States Language",
    "StartAt": "Hello",
    "States": {
      "Hello": {
        "Type": "Pass",
        "Result": "Hello",
        "Next": "World"
      },
      "World": {
        "Type": "Pass",
        "Result": "World",
        "End": true
      }
    }
  }

工作流 YAML

  ---
  # Hello world example of steps using Google Cloud Workflows syntax
  main:
      steps:
      - Hello:
          next: World
      - World:
          next: end
  ...

工作流 JSON

  {
    "main": {
      "steps": [
        {
          "Hello": {
            "next": "World"
          }
        },
        {
          "World": {
            "next": "end"
          }
        }
      ]
    }
  }

对比概览

本部分将对这两款产品进行更详细的比较。

步骤函数Workflows
语法JSON(工具中的 YAML) YAML 或 JSON
控制流在状态之间转换 包含步骤的命令式流控制
Worker资源 (ARN) HTTP 请求和连接器
可靠性捕获/重试 捕获/重试
最大并行数量支持 支持
状态数据状态通过 Workflows 变量
AuthenticationIAM IAM
用户体验Workflow Studio、CLI、SDK、IaC Google Cloud 控制台、CLI、SDK、IaC
价格 Step Functions 价格 Workflows 价格
语法

Step Functions 主要使用 JSON 来定义函数,不支持 YAML;但在 AWS Toolkit for Visual Studio Code 和 AWS 中 CloudFormation 格式,您可以使用 YAML 定义步骤函数。

您可以使用 Workflows 来描述 Workflows 步骤。 语法,可以使用 YAML 或 JSON 编写。大部分 使用 YAML 格式本页中的示例展示了 YAML 的优势,包括易于读写以及对注释的原生支持。有关 Workflows 语法的详细说明,请参阅 语法参考

控制流

Workflows 和 Step Functions 都将工作流建模为一系列任务:Workflows 中的步骤,Step Functions 中的状态。这两种方式都允许任务指明下一步要执行哪项任务,并且支持类似于开关的条件,以便根据当前状态选择下一个工作单元。一把钥匙 不同之处在于,步进函数要求每种状态定义下一个状态, 而 Workflows 会按照 (包括其他后续步骤)。如需了解详情,请参阅条件步骤

工作器

这两款产品都可以编排函数、容器和 其他网络服务来完成任务在步骤函数中,工作器 由 Resource 字段标识,该字段在语法上是一个 URI。用于标识工作器资源的 URI 采用 Amazon 资源名称 (ARN) 格式,并且用户无法直接调用任意 HTTP 端点。

Workflows 可以发送 HTTP 请求, 任意 HTTP 端点并获取响应。连接器 可以更轻松地在工作流中连接到其他 Google Cloud API, 并可将您的工作流与 Google Cloud 产品 Pub/Sub、BigQuery 或 Cloud Build。连接器 简化服务的调用,因为它们会为您处理请求的格式, 提供方法和参数,这样您就无需了解 Google Cloud API。您还可以配置超时和轮询 政策。

可靠性

如果任务失败,您的工作流必须能够适当地重试、在必要时捕获异常,并根据需要重新路由工作流。步骤函数和工作流都使用类似的机制来实现可靠性:使用重试捕获异常,并最终将其分派到工作流中的其他位置。如需了解详情,请参阅工作流错误

最大并行数量

您可能希望工作流程并行协调多个任务。第 函数提供了两种实现方法:您可以获取一个数据项并传递 可以并行执行多个不同的任务;或者,您可以使用数组并传递 将其元素添加到同一任务中。

在工作流中,您可以定义工作流的某个部分,让两个或更多步骤可以并发执行。你可以定义 或者迭代同时运行的循环。如需了解详情,请参阅并行执行工作流步骤

状态数据

工作流引擎的一个好处是,它会为您维护状态数据,而无需外部数据存储区。在 Step Functions 中,状态数据会以 JSON 结构的形式从一个状态传递到另一个状态。

在 Workflows 中,您可以在全局变量中保存状态数据。由于您最多可以设置一年的执行时长,因此只要实例仍在执行,您就可以保留状态数据。

身份验证

这两款产品都依赖于底层 Identity and Access Management (IAM) 系统进行身份验证和访问权限控制。例如,您可以使用 IAM 角色调用 Step Functions。

在 Workflows 中,您可以使用服务账号来调用 workflow;您可以使用 OAuth 2.0 或 OIDC 连接到 Google Cloud API;您可以使用授权请求标头 第三方 API。如需了解详情,请参阅向工作流授予访问 Google Cloud 资源的权限通过工作流发出经过身份验证的请求

用户体验

您可以使用命令行工具或 Terraform 等基础架构即代码 (IaC) 来定义和管理 Step Functions 和工作流。

此外,Workflows 支持使用客户端库在 Google Cloud 控制台中执行工作流,也可以使用 Google Cloud CLI 或向 Workflows REST API 发送请求来执行工作流。如需了解详情,请参阅执行工作流

价格

这两种产品都有免费层级。如需了解详情,请参阅相应价格 页面:Step Functions 价格Workflows 价格

将状态类型映射到步骤

Step Functions 中有八种状态类型。状态是指 可以基于输入做出决策、执行操作,并通过 输出到其他状态。从步骤函数迁移到 Workflows,请确保您了解如何翻译 状态类型转换为 Workflows 步骤。

Choice

Choice 状态会向状态机添加分支逻辑。

在工作流中,您可以使用 switch 块作为选择机制,以允许表达式的值控制工作流的执行流。如果某个值匹配,系统会执行该条件的语句。如需了解详情,请参阅条件

步骤函数

  "ChoiceState": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.foo",
        "NumericEquals": 1,
        "Next": "FirstMatchState"
      },
      {
        "Variable": "$.foo",
        "NumericEquals": 2,
        "Next": "SecondMatchState"
      }
    ],
    "Default": "DefaultState"
  }

工作流 YAML

  switch:
    - condition: ${result.body.SomeField < 10}
      next: small
    - condition: ${result.body.SomeField < 100}
      next: medium
    - condition: true
      next: default_step

工作流 JSON

  {
    "switch": [
      {
        "condition": "${result.body.SomeField < 10}",
        "next": "small"
      },
      {
        "condition": "${result.body.SomeField < 100}",
        "next": "medium"
      },
      {
        "condition": true,
        "next": "default_step"
      }
    ]
  }

Fail

Fail 状态会停止状态机的执行,并将其标记为失败。

在 Workflows 中,您可以使用 raise 来报告自定义错误, 语法,您可以使用 try/except 代码块捕获并处理错误。如需了解详情,请参阅抛出错误

步骤函数

  "FailState": {
      "Type": "Fail",
      "Error": "ErrorA",
      "Cause": "Kaiju attack"
  }

工作流 YAML

  raise:
      code: 55
      message: "Something went wrong."

工作流 JSON

  {
    "raise": {
      "code": 55,
      "message": "Something went wrong."
    }
  }

Map

Map 状态可用于为输入中的每个元素运行一组步骤 数组。

在工作流中,您可以使用 for 循环进行迭代

步骤函数

  { "StartAt": "ExampleMapState",
    "States": {
      "ExampleMapState": {
        "Type": "Map",
        "Iterator": {
           "StartAt": "CallLambda",
           "States": {
             "CallLambda": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
               "End": true
             }
           }
        }, "End": true
      }
    }
  }

工作流 YAML

  - assignStep:
      assign:
        - map:
            1: 10
            2: 20
            3: 30
        - sum: 0
  - loopStep:
      for:
          value: key
          in: ${keys(map)}
          steps:
            - sumStep:
                assign:
                  - sum: ${sum + map[key]}
  - returnStep:
      return: ${sum}

工作流 JSON

  [
    {
      "assignStep": {
        "assign": [
          {
            "map": {
              "1": 10,
              "2": 20,
              "3": 30
            }
          },
          {
            "sum": 0
          }
        ]
      }
    },
    {
      "loopStep": {
        "for": {
          "value": "key",
          "in": "${keys(map)}",
          "steps": [
            {
              "sumStep": {
                "assign": [
                  {
                    "sum": "${sum + map[key]}"
                  }
                ]
              }
            }
          ]
        }
      }
    },
    {
      "returnStep": {
        "return": "${sum}"
      }
    }
  ]

Parallel

Parallel 状态可用于在状态机中创建并行执行分支。在以下 Step Functions 示例中,系统会并行执行地址和电话号码查询。

在工作流中,您可以使用 parallel 步骤来定义工作流的某个部分,其中两个或更多步骤可以并发执行。有关 相关信息,请参阅并行步骤

步骤函数

  { "StartAt": "LookupCustomerInfo",
    "States": {
      "LookupCustomerInfo": {
        "Type": "Parallel",
        "End": true,
        "Branches": [
          {
           "StartAt": "LookupAddress",
           "States": {
             "LookupAddress": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder",
               "End": true
             }
           }
         },
         {
           "StartAt": "LookupPhone",
           "States": {
             "LookupPhone": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder",
               "End": true
             }
           }
         }
        ]
      }
    }
  }

工作流 YAML

  main:
     params: [args]
     steps:
     - init:
         assign:
         - workflow_id: "lookupAddress"
         - customer_to_lookup:
             - address: ${args.customerId}
             - phone: ${args.customerId}
         - addressed: ["", ""] # to write to this variable, you must share it
     - parallel_address:
         parallel:
             shared: [addressed]
             for:
                 in: ${customer_to_lookup}
                 index: i # optional, use if index is required
                 value: arg
                 steps:
                 - address:
                     call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                     args:
                         workflow_id: ${workflow_id}
                         argument: ${arg}
                     result: r
                 - set_result:
                     assign:
                     - addressed[i]: ${r}
     - return:
             return: ${addressed}

工作流 JSON

  {
    "main": {
      "params": [
        "args"
      ],
      "steps": [
        {
          "init": {
            "assign": [
              {
                "workflow_id": "lookupAddress"
              },
              {
                "customer_to_lookup": [
                  {
                    "address": "${args.customerId}"
                  },
                  {
                    "phone": "${args.customerId}"
                  }
                ]
              },
              {
                "addressed": [
                  "",
                  ""
                ]
              }
            ]
          }
        },
        {
          "parallel_address": {
            "parallel": {
              "shared": [
                "addressed"
              ],
              "for": {
                "in": "${customer_to_lookup}",
                "index": "i",
                "value": "arg",
                "steps": [
                  {
                    "address": {
                      "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                      "args": {
                        "workflow_id": "${workflow_id}",
                        "argument": "${arg}"
                      },
                      "result": "r"
                    }
                  },
                  {
                    "set_result": {
                      "assign": [
                        {
                          "addressed[i]": "${r}"
                        }
                      ]
                    }
                  }
                ]
              }
            }
          }
        },
        {
          "return": {
            "return": "${addressed}"
          }
        }
      ]
    }
  }

Pass

Pass 状态会将其输入传递给其输出,而不执行工作。这是 通常用于操作 JSON 中的状态数据。

由于工作流不会以这种方式传递数据,因此您可以将状态保留为无操作,也可以使用分配步骤修改变量。如需了解详情,请参阅 分配变量

步骤函数

  "No-op": {
    "Type": "Pass",
    "Result": {
      "x-datum": 0.38,
      "y-datum": 622.22
    },
    "ResultPath": "$.coords",
    "Next": "End"
  }

工作流 YAML

  assign:
      - number: 5
      - number_plus_one: ${number+1}
      - other_number: 10
      - string: "hello"

工作流 JSON

  {
    "assign": [
      {
        "number": 5
      },
      {
        "number_plus_one": "${number+1}"
      },
      {
        "other_number": 10
      },
      {
        "string": "hello"
      }
    ]
  }

成功

Succeed 状态表示成功停止执行。

在 Workflows 中,您可以在主工作流中使用 return 来停止 工作流执行情况您也可以通过完成 最后一步(假设该步骤不会跳转到其他步骤),也可以使用 next: end 来停止工作流的执行。 如需了解详情,请参阅 完成工作流的执行过程

步骤函数

  "SuccessState": {
    "Type": "Succeed"
  }

工作流 YAML

  return: "Success!"
  next: end

工作流 JSON

  {
    "return": "Success!",
    "next": "end"
  }

Task

Task 状态表示状态机执行的单个工作单元。在 下面的步骤函数示例中,它会调用 Lambda 函数。 (activity 是 AWS 步骤函数的一项功能,可让您在状态机中创建任务,以便在其他位置执行工作。)

在工作流示例中,系统会向 HTTP 端点发出调用来调用 Cloud Run 函数。您还可以使用连接器,轻松访问其他 Google Cloud 产品。此外, 您可以暂停工作流和轮询数据。或者,您也可以使用回调端点告知工作流指定事件已发生,然后等待该事件而不进行轮询。

步骤函数

  "HelloWorld": {
    "Type": "Task",
    "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
    "End": true
  }

工作流 YAML

  - HelloWorld:
      call: http.get
      args:
          url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld
      result: helloworld_result

工作流 JSON

  [
    {
      "HelloWorld": {
        "call": "http.get",
        "args": {
          "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld"
        },
        "result": "helloworld_result"
      }
    }
  ]

Wait

Wait 状态会延迟状态机在指定时间内继续运行。

您可以使用 Workflows sys.sleep 标准库函数在指定的秒数内暂停执行,最长持续 31536000(一年)。

步骤函数

  "wait_ten_seconds" : {
    "Type" : "Wait",
    "Seconds" : 10,
    "Next": "NextState"
  }

工作流 YAML

  - someSleep:
      call: sys.sleep
      args:
          seconds: 10

工作流 JSON

  [
    {
      "someSleep": {
        "call": "sys.sleep",
        "args": {
          "seconds": 10
        }
      }
    }
  ]

示例:编排微服务

下面的步骤函数示例检查股票价格,确定是否 买卖并报告结果示例中的状态机通过传递参数与 AWS Lambda 集成,使用 Amazon SQS 队列请求人工审批,并使用 Amazon SNS 主题返回查询结果。

{
      "StartAt": "Check Stock Price",
      "Comment": "An example of integrating Lambda functions in Step Functions state machine",
      "States": {
          "Check Stock Price": {
              "Type": "Task",
              "Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
              "Next": "Generate Buy/Sell recommendation"
          },
          "Generate Buy/Sell recommendation": {
              "Type": "Task",
              "Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
              "ResultPath": "$.recommended_type",
              "Next": "Request Human Approval"
          },
          "Request Human Approval": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
              "Parameters": {
                  "QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
                  "MessageBody": {
                      "Input.$": "$",
                      "TaskToken.$": "$$.Task.Token"
                  }
              },
              "ResultPath": null,
              "Next": "Buy or Sell?"
          },
          "Buy or Sell?": {
              "Type": "Choice",
              "Choices": [
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "buy",
                      "Next": "Buy Stock"
                  },
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "sell",
                      "Next": "Sell Stock"
                  }
              ]
          },
          "Buy Stock": {
              "Type": "Task",
              "Resource": "BUY_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Sell Stock": {
              "Type": "Task",
              "Resource": "SELL_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Report Result": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sns:publish",
              "Parameters": {
                  "TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
                  "Message": {
                      "Input.$": "$"
                  }
              },
              "End": true
          }
      }
  }

迁移到 Workflows

如需将前面的步骤函数示例迁移到 Workflows,请执行以下操作: 您可以创建等效的 Workflows 步骤,只需使用 Cloud Run 函数,支持 回调端点,它会等待 HTTP 请求到达该端点,并使用 Workflows 发布到 Pub/Sub 主题的连接器 Amazon SNS 主题:

  1. 完成创建工作流程的步骤 先不要部署

  2. 在工作流的定义中,添加一个步骤来创建等待人工输入的回调端点,以及一个使用 Workflows 连接器发布到 Pub/Sub 主题的步骤。例如:

    Workflows YAML

      ---
      main:
        steps:
          - init:
              assign:
                - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'
                - region: LOCATION
                - topic: PUBSUB_TOPIC_NAME
          - Check Stock Price:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"}
                auth:
                  type: OIDC
              result: stockPriceResponse
          - Generate Buy/Sell Recommendation:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"}
                auth:
                  type: OIDC
                query:
                  price: ${stockPriceResponse.body.stock_price}
              result: recommendResponse
          - Create Approval Callback:
              call: events.create_callback_endpoint
              args:
                  http_callback_method: "GET"
              result: callback_details
          - Print Approval Callback Details:
              call: sys.log
              args:
                  severity: "INFO"
                  text: ${"Listening for callbacks on " + callback_details.url}
          - Await Approval Callback:
              call: events.await_callback
              args:
                  callback: ${callback_details}
                  timeout: 3600
              result: approvalResponse
          - Approval?:
              try:
                switch:
                  - condition: ${approvalResponse.http_request.query.response[0] == "yes"}
                    next: Buy or Sell?
              except:
                as: e
                steps:
                  - unknown_response:
                      raise: ${"Unknown response:" + e.message}
                      next: end
          - Buy or Sell?:
              switch:
                - condition: ${recommendResponse.body == "buy"}
                  next: Buy Stock
                - condition: ${recommendResponse.body == "sell"}
                  next: Sell Stock
                - condition: true
                  raise: ${"Unknown recommendation:" + recommendResponse.body}
          - Buy Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Sell Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Report Result:
              call: googleapis.pubsub.v1.projects.topics.publish
              args:
                topic: ${"projects/" + projectId + "/topics/" + topic}
                body:
                  messages:
                  - data: '${base64.encode(json.encode(message))}'
              next: end
      ...

    工作流 JSON

      {
        "main": {
          "steps": [
            {
              "init": {
                "assign": [
                  {
                    "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                  },
                  {
                    "region": "LOCATION"
                  },
                  {
                    "topic": [
                      "PUBSUB_TOPIC_NAME"
                    ]
                  }
                ]
              }
            },
            {
              "Check Stock Price": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}",
                  "auth": {
                    "type": "OIDC"
                  }
                },
                "result": "stockPriceResponse"
              }
            },
            {
              "Generate Buy/Sell Recommendation": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "query": {
                    "price": "${stockPriceResponse.body.stock_price}"
                  }
                },
                "result": "recommendResponse"
              }
            },
            {
              "Create Approval Callback": {
                "call": "events.create_callback_endpoint",
                "args": {
                  "http_callback_method": "GET"
                },
                "result": "callback_details"
              }
            },
            {
              "Print Approval Callback Details": {
                "call": "sys.log",
                "args": {
                  "severity": "INFO",
                  "text": "${\"Listening for callbacks on \" + callback_details.url}"
                }
              }
            },
            {
              "Await Approval Callback": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "approvalResponse"
              }
            },
            {
              "Approval?": {
                "try": {
                  "switch": [
                    {
                      "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}",
                      "next": "Buy or Sell?"
                    }
                  ]
                },
                "except": {
                  "as": "e",
                  "steps": [
                    {
                      "unknown_response": {
                        "raise": "${\"Unknown response:\" + e.message}",
                        "next": "end"
                      }
                    }
                  ]
                }
              }
            },
            {
              "Buy or Sell?": {
                "switch": [
                  {
                    "condition": "${recommendResponse.body == \"buy\"}",
                    "next": "Buy Stock"
                  },
                  {
                    "condition": "${recommendResponse.body == \"sell\"}",
                    "next": "Sell Stock"
                  },
                  {
                    "condition": true,
                    "raise": "${\"Unknown recommendation:\" + recommendResponse.body}"
                  }
                ]
              }
            },
            {
              "Buy Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Sell Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Report Result": {
                "call": "googleapis.pubsub.v1.projects.topics.publish",
                "args": {
                  "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}",
                  "body": {
                    "messages": [
                      {
                        "data": "${base64.encode(json.encode(message))}"
                      }
                    ]
                  }
                },
                "next": "end"
              }
            }
          ]
        }
      }

    替换以下内容:

    • LOCATION:受支持的 Google Cloud 区域。例如 us-central1
    • PUBSUB_TOPIC_NAME:您的 Pub/Sub 主题的名称。例如 my_stock_example
  3. 部署,然后执行工作流

  4. 在工作流执行期间,它会暂停并等待您调用 回调端点。您可以使用 curl 命令执行此操作。例如:

    curl -H "Authorization: Bearer $(gcloud auth print-access-token)"
    https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
    

    CALLBACK_URL 替换为 回调端点的路径。

  5. 工作流程成功完成后,您可以从 Pub/Sub 订阅接收消息。例如:

    gcloud pubsub subscriptions pull stock_example-sub  --format="value(message.data)" | jq
    

    输出消息应类似于以下内容(buysell):

    {
      "body": "buy",
      "code": 200,
      "headers": {
      [...]
      }
    

后续步骤