为帮助您准备好从 Amazon Web Services (AWS) 步骤函数迁移到 Google Cloud 上的 Workflows,本页面介绍了这两种产品之间的主要相似之处和不同之处。此信息旨在帮助已经熟悉步骤函数的用户使用 Workflows 实现类似的架构。
与 Step Functions 一样,Workflows 是一个基于状态的全代管式编排平台,可以按您定义的顺序执行服务,即工作流。这些工作流可以组合使用如下服务:托管在 Cloud Run 或 Cloud Functions 上的自定义服务、Cloud Vision AI 和 BigQuery 等 Google Cloud 服务,以及任何基于 HTTP 的 API。
请注意,步骤函数 Express Workflows 是一种 AWS 步骤函数工作流类型,此处不考虑该类型,因为 Express 工作流的时长有限,并且不支持“正好一次”工作流执行。
Hello World
在步骤函数中,状态机是一种工作流,任务是工作流中的一种状态,表示另一 AWS 服务执行的单个工作单元。 单步函数要求每个状态都定义下一个状态。
在 Workflows 中,一系列使用 Workflows 语法的步骤描述了要执行的任务。Workflows 会将步骤视为位于有序列表中,并一次执行一个步骤,直到所有步骤都运行完毕。
以下“Hello world”示例演示了如何在步骤函数和 Workflows 的步骤中使用 states:
步数函数
{ "Comment": "Hello world example of Pass states in Amazon States Language", "StartAt": "Hello", "States": { "Hello": { "Type": "Pass", "Result": "Hello", "Next": "World" }, "World": { "Type": "Pass", "Result": "World", "End": true } } }
工作流 YAML
--- # Hello world example of steps using Google Cloud Workflows syntax main: steps: - Hello: next: World - World: next: end ...
Workflows JSON
{ "main": { "steps": [ { "Hello": { "next": "World" } }, { "World": { "next": "end" } } ] } }
对比情况概览
本部分将更详细地比较这两种产品。
步数函数 | Workflows | |
---|---|---|
语法 | JSON(工具中的 YAML) | YAML 或 JSON |
控制流 | 状态之间的转换 | 命令式流控制与步骤 |
工作器 | 资源 (ARN) | HTTP 请求和连接器 |
可靠性 | 捕获/重试 | 捕获/重试 |
最大并行数量 | 受支持 | 受支持 |
状态数据 | 状态会传递 | Workflows 变量 |
Authentication | IAM | IAM |
用户体验 | Workflow Studio、CLI、SDK、IaC | Google Cloud 控制台、CLI、SDK、IaC |
价格 | Step Functions 价格 | Workflows 价格 |
- 语法
Step Functions 主要使用 JSON 来定义函数,不直接支持 YAML;但在适用于 Visual Studio Code 的 AWS Toolkit 和 AWS CloudFormation 中,您可以使用 YAML 定义步骤函数。
您可以使用 Workflows 语法描述 Workflows 步骤,并且可以使用 YAML 或 JSON 编写这些步骤。大多数工作流都采用 YAML。本页中的示例展示了 YAML 的优势,包括易于读取和写入,以及对注释的原生支持。如需详细了解 Workflows 语法,请参阅语法参考文档。
- 控制流
Workflows 和步骤函数会将工作流建模为一系列任务:Workflows 中的 steps 和步骤函数中的 states。两者都允许任务指示下一个要执行的任务,并且支持类似开关的条件,以便根据当前状态选择下一个工作单元。一个关键的区别在于,单步函数要求每个状态定义下一个状态,而 Workflows 会按照指定顺序执行步骤(包括其他后续步骤)。如需了解详情,请参阅条件和步骤。
- 工作器
这两种产品都可以编排函数、容器和其他网络服务等计算资源来完成任务。在步骤函数中,工作器由
Resource
字段标识,该字段在语法上是 URI。用于标识工作器资源的 URI 采用 Amazon 资源名称 (ARN) 格式,用户无法直接调用任意 HTTP 端点。Workflows 可以向任意 HTTP 端点发送 HTTP 请求并获取响应。借助连接器,您可以更轻松地在工作流中连接到其他 Google Cloud API,并将工作流与 Pub/Sub、BigQuery 或 Cloud Build 等其他 Google Cloud 产品集成。连接器简化了调用服务,因为它们会为您处理请求的格式设置,提供方法和参数,因此您无需了解 Google Cloud API 的详细信息。您还可以配置超时和轮询政策。
- 可靠性
如果任务失败,您的工作流必须能够适当地重试,在必要时捕获异常,并根据需要重新路由工作流。步骤函数和 Workflows 都使用类似的机制来实现可靠性:通过重试捕获异常,以及最终分派到工作流中的其他位置。有关详情,请参阅工作流程错误。
- 并行数量
您可能希望工作流并行编排多个任务。步骤函数提供了两种方法来实现这一点:您可以获取一个数据项并将其并行传递到多个不同的任务;或者,您可以使用数组并将其元素传递给同一任务。
在 Workflows 中,您可以定义工作流中可以同时执行两个或多个步骤的部分。您可以定义并发运行的分支,也可以定义并发运行的循环。如需了解详情,请参阅并行执行工作流步骤。
- 状态数据
工作流引擎的一个好处是,无需外部数据存储区即可为您维护状态数据。在步骤函数中,状态数据以 JSON 结构从一个状态传递到另一个状态。
在 Workflows 中,您可以将状态数据保存在全局变量中。由于最长可设为一年的执行时长,因此只要实例仍在执行,您就可以保留状态数据。
- Authentication
这两种产品都依赖于底层 Identity and Access Management (IAM) 系统进行身份验证和访问权限控制。例如,您可以使用 IAM 角色来调用 Step Functions。
在 Workflows 中,您可以使用服务帐号调用工作流;可以使用 OAuth 2.0 或 OIDC 连接 Google Cloud API;也可以使用授权请求标头进行第三方 API 身份验证。如需了解详情,请参阅授予工作流访问 Google Cloud 资源的权限和从工作流发出经过身份验证的请求。
- 用户体验
您可以使用命令行工具或基础架构即代码 (IaC)(如 Terraform)来定义和管理步骤函数及 Workflows。
此外,Workflows 还支持使用客户端库、在 Google Cloud 控制台中、使用 Google Cloud CLI 或通过向 Workflows REST API 发送请求来执行工作流。如需了解详情,请参阅执行工作流。
- 价格
这两种产品都有一个免费层级。如需了解详情,请参阅相应的价格页面:步骤函数价格和 Workflows 价格。
将状态类型映射到步骤
步骤函数中有八种状态类型。状态是状态机中的元素,这些元素可以根据其输入做出决策、执行操作,并将输出传递给其他状态。在从步骤函数迁移到 Workflows 之前,请确保您了解如何将每种状态类型转换为 Workflows 步骤。
Choice
Choice
状态会向状态机添加分支逻辑。
在 Workflows 中,您可以使用 switch
块作为选择机制,以允许表达式的值控制工作流的执行流程。如果某个值匹配,系统会执行该条件的语句。
如需了解详情,请参阅条件。
步数函数
"ChoiceState": { "Type": "Choice", "Choices": [ { "Variable": "$.foo", "NumericEquals": 1, "Next": "FirstMatchState" }, { "Variable": "$.foo", "NumericEquals": 2, "Next": "SecondMatchState" } ], "Default": "DefaultState" }
工作流 YAML
switch: - condition: ${result.body.SomeField < 10} next: small - condition: ${result.body.SomeField < 100} next: medium - condition: true next: default_step
Workflows JSON
{ "switch": [ { "condition": "${result.body.SomeField < 10}", "next": "small" }, { "condition": "${result.body.SomeField < 100}", "next": "medium" }, { "condition": true, "next": "default_step" } ] }
Fail
Fail
状态会停止状态机的执行,并将其标记为失败。
在 Workflows 中,您可以使用 raise
语法引发自定义错误,也可以使用 try/except
代码块捕获和处理错误。如需了解详情,请参阅引发错误。
步数函数
"FailState": { "Type": "Fail", "Error": "ErrorA", "Cause": "Kaiju attack" }
工作流 YAML
raise: code: 55 message: "Something went wrong."
Workflows JSON
{ "raise": { "code": 55, "message": "Something went wrong." } }
Map
Map
状态可用于为输入数组的每个元素运行一组步骤。
在 Workflows 中,您可以将 for
循环用于迭代。
步数函数
{ "StartAt": "ExampleMapState", "States": { "ExampleMapState": { "Type": "Map", "Iterator": { "StartAt": "CallLambda", "States": { "CallLambda": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction", "End": true } } }, "End": true } } }
工作流 YAML
- assignStep: assign: - map: 1: 10 2: 20 3: 30 - sum: 0 - loopStep: for: value: key in: ${keys(map)} steps: - sumStep: assign: - sum: ${sum + map[key]} - returnStep: return: ${sum}
Workflows JSON
[ { "assignStep": { "assign": [ { "map": { "1": 10, "2": 20, "3": 30 } }, { "sum": 0 } ] } }, { "loopStep": { "for": { "value": "key", "in": "${keys(map)}", "steps": [ { "sumStep": { "assign": [ { "sum": "${sum + map[key]}" } ] } } ] } } }, { "returnStep": { "return": "${sum}" } } ]
Parallel
Parallel
状态可用于在状态机中创建并行执行分支。在下列步骤函数示例中,地址和手机号码查找是并行执行的。
在 Workflows 中,您可以使用 parallel
步骤来定义可以并发执行两个或多个步骤的工作流的一部分。如需了解详情,请参阅并行步骤。
步数函数
{ "StartAt": "LookupCustomerInfo", "States": { "LookupCustomerInfo": { "Type": "Parallel", "End": true, "Branches": [ { "StartAt": "LookupAddress", "States": { "LookupAddress": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder", "End": true } } }, { "StartAt": "LookupPhone", "States": { "LookupPhone": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder", "End": true } } } ] } } }
工作流 YAML
main: params: [args] steps: - init: assign: - workflow_id: "lookupAddress" - customer_to_lookup: - address: ${args.customerId} - phone: ${args.customerId} - addressed: ["", ""] # to write to this variable, you must share it - parallel_address: parallel: shared: [addressed] for: in: ${customer_to_lookup} index: i # optional, use if index is required value: arg steps: - address: call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run args: workflow_id: ${workflow_id} argument: ${arg} result: r - set_result: assign: - addressed[i]: ${r} - return: return: ${addressed}
Workflows JSON
{ "main": { "params": [ "args" ], "steps": [ { "init": { "assign": [ { "workflow_id": "lookupAddress" }, { "customer_to_lookup": [ { "address": "${args.customerId}" }, { "phone": "${args.customerId}" } ] }, { "addressed": [ "", "" ] } ] } }, { "parallel_address": { "parallel": { "shared": [ "addressed" ], "for": { "in": "${customer_to_lookup}", "index": "i", "value": "arg", "steps": [ { "address": { "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run", "args": { "workflow_id": "${workflow_id}", "argument": "${arg}" }, "result": "r" } }, { "set_result": { "assign": [ { "addressed[i]": "${r}" } ] } } ] } } } }, { "return": { "return": "${addressed}" } } ] } }
Pass
Pass
状态会将其输入传递到其输出,而不执行操作。这通常用于操作 JSON 中的状态数据。
由于 Workflows 不会以这种方式传递数据,因此您可以将状态保留为空操作,也可以使用分配步骤来修改变量。如需了解详情,请参阅分配变量。
步数函数
"No-op": { "Type": "Pass", "Result": { "x-datum": 0.38, "y-datum": 622.22 }, "ResultPath": "$.coords", "Next": "End" }
工作流 YAML
assign: - number: 5 - number_plus_one: ${number+1} - other_number: 10 - string: "hello"
Workflows JSON
{ "assign": [ { "number": 5 }, { "number_plus_one": "${number+1}" }, { "other_number": 10 }, { "string": "hello" } ] }
成功
Succeed
状态会成功停止执行。
在 Workflows 中,您可以在主工作流中使用 return
来停止工作流的执行。您还可以通过完成最后一步(假设该步骤不会跳转到另一个步骤)来完成工作流;或者,如果不需要返回值,您可以使用 next: end
来停止工作流的执行。如需了解详情,请参阅完成工作流的执行。
步数函数
"SuccessState": { "Type": "Succeed" }
工作流 YAML
return: "Success!" next: end
Workflows JSON
{ "return": "Success!", "next": "end" }
Task
Task
状态表示状态机执行的单个工作单元。在下面的“步骤函数”示例中,它调用了 Lambda 函数。(Activity 是一项 AWS Step Functions 功能,让您可以通过状态机在其他位置执行工作的任务)。
在 Workflows 示例中,对 HTTP 端点发出调用以调用 Cloud Function。您还可以使用连接器,以便轻松访问其他 Google Cloud 产品。此外,您还可以暂停工作流并轮询数据。或者,您也可以使用回调端点向您的工作流发出指定事件已发生的信号,并等待该事件,但不进行轮询。
步数函数
"HelloWorld": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction", "End": true }
工作流 YAML
- HelloWorld: call: http.get args: url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld result: helloworld_result
Workflows JSON
[ { "HelloWorld": { "call": "http.get", "args": { "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld" }, "result": "helloworld_result" } } ]
Wait
Wait
状态会延迟状态机继续运行指定的时间。
您可以使用 Workflows sys.sleep
标准库函数将执行暂停给定的秒数,上限为 31536000(一年)。
步数函数
"wait_ten_seconds" : { "Type" : "Wait", "Seconds" : 10, "Next": "NextState" }
工作流 YAML
- someSleep: call: sys.sleep args: seconds: 10
Workflows JSON
[ { "someSleep": { "call": "sys.sleep", "args": { "seconds": 10 } } } ]
示例:编排微服务
下面的步骤函数示例会检查股票价格,确定是买入还是卖出,并报告结果。示例中的状态机通过传递参数与 AWS Lambda 集成,使用 Amazon SQS 队列请求人工批准,并使用 Amazon SNS 主题返回查询结果。
{
"StartAt": "Check Stock Price",
"Comment": "An example of integrating Lambda functions in Step Functions state machine",
"States": {
"Check Stock Price": {
"Type": "Task",
"Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
"Next": "Generate Buy/Sell recommendation"
},
"Generate Buy/Sell recommendation": {
"Type": "Task",
"Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
"ResultPath": "$.recommended_type",
"Next": "Request Human Approval"
},
"Request Human Approval": {
"Type": "Task",
"Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
"MessageBody": {
"Input.$": "$",
"TaskToken.$": "$$.Task.Token"
}
},
"ResultPath": null,
"Next": "Buy or Sell?"
},
"Buy or Sell?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.recommended_type",
"StringEquals": "buy",
"Next": "Buy Stock"
},
{
"Variable": "$.recommended_type",
"StringEquals": "sell",
"Next": "Sell Stock"
}
]
},
"Buy Stock": {
"Type": "Task",
"Resource": "BUY_STOCK_LAMBDA_ARN",
"Next": "Report Result"
},
"Sell Stock": {
"Type": "Task",
"Resource": "SELL_STOCK_LAMBDA_ARN",
"Next": "Report Result"
},
"Report Result": {
"Type": "Task",
"Resource": "arn:PARTITION:states:::sns:publish",
"Parameters": {
"TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
"Message": {
"Input.$": "$"
}
},
"End": true
}
}
}
Migrate to Workflows
如需将前面的步骤函数示例迁移到 Workflows,您可以创建等效的 Workflows 步骤,方法是集成 Cloud Functions,支持等待 HTTP 请求到达该端点的回调端点,并使用 Workflows 连接器发布到 Pub/Sub 主题来代替 Amazon SNS 主题:
完成创建工作流的步骤,但先不要部署。
在工作流的定义中,添加一个步骤来创建等待人工输入的回调端点,以及一个使用 Workflows 连接器发布到 Pub/Sub 主题的步骤。例如:
工作流 YAML
--- main: steps: - init: assign: - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}' - region: LOCATION - topic: PUBSUB_TOPIC_NAME - Check Stock Price: call: http.get args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"} auth: type: OIDC result: stockPriceResponse - Generate Buy/Sell Recommendation: call: http.get args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"} auth: type: OIDC query: price: ${stockPriceResponse.body.stock_price} result: recommendResponse - Create Approval Callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - Print Approval Callback Details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - Await Approval Callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: approvalResponse - Approval?: try: switch: - condition: ${approvalResponse.http_request.query.response[0] == "yes"} next: Buy or Sell? except: as: e steps: - unknown_response: raise: ${"Unknown response:" + e.message} next: end - Buy or Sell?: switch: - condition: ${recommendResponse.body == "buy"} next: Buy Stock - condition: ${recommendResponse.body == "sell"} next: Sell Stock - condition: true raise: ${"Unknown recommendation:" + recommendResponse.body} - Buy Stock: call: http.post args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"} auth: type: OIDC body: action: ${recommendResponse.body} result: message - Sell Stock: call: http.post args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"} auth: type: OIDC body: action: ${recommendResponse.body} result: message - Report Result: call: googleapis.pubsub.v1.projects.topics.publish args: topic: ${"projects/" + projectId + "/topics/" + topic} body: messages: - data: '${base64.encode(json.encode(message))}' next: end ...
Workflows JSON
{ "main": { "steps": [ { "init": { "assign": [ { "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}" }, { "region": "LOCATION" }, { "topic": [ "PUBSUB_TOPIC_NAME" ] } ] } }, { "Check Stock Price": { "call": "http.get", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}", "auth": { "type": "OIDC" } }, "result": "stockPriceResponse" } }, { "Generate Buy/Sell Recommendation": { "call": "http.get", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}", "auth": { "type": "OIDC" }, "query": { "price": "${stockPriceResponse.body.stock_price}" } }, "result": "recommendResponse" } }, { "Create Approval Callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "Print Approval Callback Details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "Await Approval Callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "approvalResponse" } }, { "Approval?": { "try": { "switch": [ { "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}", "next": "Buy or Sell?" } ] }, "except": { "as": "e", "steps": [ { "unknown_response": { "raise": "${\"Unknown response:\" + e.message}", "next": "end" } } ] } } }, { "Buy or Sell?": { "switch": [ { "condition": "${recommendResponse.body == \"buy\"}", "next": "Buy Stock" }, { "condition": "${recommendResponse.body == \"sell\"}", "next": "Sell Stock" }, { "condition": true, "raise": "${\"Unknown recommendation:\" + recommendResponse.body}" } ] } }, { "Buy Stock": { "call": "http.post", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}", "auth": { "type": "OIDC" }, "body": { "action": "${recommendResponse.body}" } }, "result": "message" } }, { "Sell Stock": { "call": "http.post", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}", "auth": { "type": "OIDC" }, "body": { "action": "${recommendResponse.body}" } }, "result": "message" } }, { "Report Result": { "call": "googleapis.pubsub.v1.projects.topics.publish", "args": { "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}", "body": { "messages": [ { "data": "${base64.encode(json.encode(message))}" } ] } }, "next": "end" } } ] } }
替换以下内容:
LOCATION
:受支持的 Google Cloud 区域。例如us-central1
。PUBSUB_TOPIC_NAME
:您的 Pub/Sub 主题的名称。例如my_stock_example
。
部署,然后执行工作流。
在工作流执行期间,它会暂停并等待您调用回调端点。您可以使用 curl 命令执行此操作。例如:
curl -H "Authorization: Bearer $(gcloud auth print-access-token)" https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
将
CALLBACK_URL
替换为回调端点的路径的其余部分。工作流成功完成后,您可以接收来自 Pub/Sub 订阅的消息。例如:
gcloud pubsub subscriptions pull stock_example-sub --format="value(message.data)" | jq
输出消息应类似于以下内容(
buy
或sell
):{ "body": "buy", "code": 200, "headers": { [...] }