为帮助您准备好从 Amazon Web Services (AWS) Step Functions 迁移到 Google Cloud上的 Workflows,本页介绍了这两种产品之间的主要相似之处和不同之处。本文旨在帮助已熟悉 Step Functions 的用户使用 Workflow 实现类似的架构。
与 Step Functions 一样,Workflows 是一个全代管式、基于状态的编排平台,该平台会按照您定义的顺序执行服务:工作流。这些工作流可以组合各种服务,包括 Cloud Run 或 Cloud Run functions 上托管的自定义服务、 Google Cloud Cloud Vision AI 和 BigQuery 等服务以及任何基于 HTTP 的 API。
请注意,由于极速工作流的持续时间有限,并且不支持精确一次工作流执行,因此我们在此处未考虑 Step Functions 极速工作流这一 AWS Step Functions 工作流类型。
Hello World
在 Step Functions 中,状态机就是工作流,任务是工作流中的一种状态,表示其他 AWS 服务执行的单个工作单元。步骤函数要求每个状态都定义下一个状态。
在 Workflows 中,一系列使用 Workflows 语法的步骤用于描述要执行的任务。Workflows 会将步骤视为有序列表并逐个执行,直到所有步骤都运行完毕。
以下“Hello World”示例演示了如何在步骤函数中使用状态,以及如何在工作流中使用步骤:
步骤函数
{ "Comment": "Hello world example of Pass states in Amazon States Language", "StartAt": "Hello", "States": { "Hello": { "Type": "Pass", "Result": "Hello", "Next": "World" }, "World": { "Type": "Pass", "Result": "World", "End": true } } }
工作流 YAML
--- # Hello world example of steps using Google Cloud Workflows syntax main: steps: - Hello: next: World - World: next: end ...
工作流 JSON
{ "main": { "steps": [ { "Hello": { "next": "World" } }, { "World": { "next": "end" } } ] } }
比较概览
本部分将更详细地比较这两款产品。
步骤函数 | Workflows | |
---|---|---|
语法 | JSON(在工具中为 YAML) | YAML 或 JSON |
控制流 | 在状态之间转换 | 包含步骤的命令式流控制 |
Worker | 资源 (ARN) 和 HTTP 任务 | HTTP 请求和连接器 |
可靠性 | 捕获/重试 | 捕获/重试 |
最大并行数量 | 支持 | 支持 |
状态数据 | 传递状态 | 工作流变量 |
Authentication | IAM | IAM |
用户体验 | Workflow Studio、CLI、SDK、IaC | Google Cloud 控制台、CLI、SDK、IaC |
价格 | 步骤函数价格 | Workflows 价格 |
- 语法
Step Functions 主要使用 JSON 来定义函数,不直接支持 YAML;不过,在适用于 Visual Studio Code 的 AWS 工具包和 AWS CloudFormation 中,您可以使用 YAML 来定义 Step Functions。
您可以使用 Workflows 语法描述 Workflows 步骤,这些步骤可以使用 YAML 或 JSON 编写。大多数工作流都是采用 YAML 格式。本页中的示例展示了 YAML 的优势,包括易于读写以及对注释的原生支持。如需详细了解 Workflows 语法,请参阅语法参考文档。
- 控制流
Workflows 和 Step Functions 都将工作流建模为一系列任务:Workflows 中的步骤,Step Functions 中的状态。这两种方式都允许任务指明下一步要执行哪项任务,并且支持类似于开关的条件,以便根据当前状态选择下一个工作单元。一个关键区别是,Step Functions 要求每个状态都定义下一个状态,而 Workflows 会按照指定的顺序(包括备选后续步骤)执行步骤。如需了解详情,请参阅条件和步骤。
- 工作器
这两款产品都可以编排函数、容器和其他 Web 服务等计算资源,以便完成各种任务。在 Step Functions 中,工作器由
Resource
字段标识,该字段在语法上是 URI。用于标识工作器资源的 URI 采用 Amazon 资源名称 (ARN) 格式。如需直接调用任意 HTTP 端点,您可以定义 HTTP 任务。工作流可以向任意 HTTP 端点发送 HTTP 请求并获取响应。借助连接器,您可以更轻松地连接到工作流中的其他 Google Cloud API,并将工作流与 Pub/Sub、BigQuery 或 Cloud Build 等其他 Google Cloud 产品集成。连接器会为您处理请求的格式,从而为您处理请求的格式设置并提供方法和参数,这样您就无需了解Google Cloud API 的详细信息。您还可以配置超时和轮询政策。
- 可靠性
如果任务失败,您的工作流必须能够适当地重试、在必要时捕获异常,并根据需要重新路由工作流。步骤函数和工作流都使用类似的机制来实现可靠性:使用重试捕获异常,并最终将其分派到工作流中的其他位置。如需了解详情,请参阅工作流错误。
- 最大并行数量
您可能希望工作流程并行协调多个任务。步骤函数提供了两种实现方式:您可以获取一个数据项,并将其并行传递给多个不同的任务;或者,您可以使用数组,并将其元素传递给同一任务。
在工作流中,您可以定义工作流的某个部分,让两个或更多步骤可以并发执行。您可以定义并发运行的分支,也可以定义迭代并发运行的循环。如需了解详情,请参阅并行执行工作流步骤。
- 状态数据
工作流引擎的一个好处是,它会为您维护状态数据,而无需外部数据存储区。在 Step Functions 中,状态数据会以 JSON 结构的形式从一个状态传递到另一个状态。
在 Workflows 中,您可以在全局变量中保存状态数据。由于您最多可以设置一年的执行时长,因此只要实例仍在执行,您就可以保留状态数据。
- 身份验证
这两款产品都依赖于底层 Identity and Access Management (IAM) 系统进行身份验证和访问权限控制。例如,您可以使用 IAM 角色调用 Step Functions。
在 Workflows 中,您可以使用服务账号调用工作流;您可以使用 OAuth 2.0 或 OIDC 与 Google CloudAPI 连接;您还可以使用授权请求标头对第三方 API 进行身份验证。如需了解详情,请参阅向工作流授予访问 Google Cloud 资源的权限和通过工作流发出经过身份验证的请求。
- 用户体验
您可以使用命令行工具或 Terraform 等基础架构即代码 (IaC) 来定义和管理 Step Functions 和工作流。
此外,Workflows 支持使用客户端库在 Google Cloud 控制台中执行工作流,也可以使用 Google Cloud CLI 或向 Workflows REST API 发送请求来执行工作流。如需了解详情,请参阅执行工作流。
- 价格
将状态类型映射到步骤
Step Functions 中有八种状态类型。状态是状态机中的元素,可以根据其输入做出决策、执行操作,并将输出传递给其他状态。在从 Step Functions 迁移到 Workflows 之前,请确保您了解如何将每种状态类型转换为 Workflows 步骤。
Choice
Choice
状态会向状态机添加分支逻辑。
在工作流中,您可以使用 switch
块作为选择机制,以允许表达式的值控制工作流的执行流。如果某个值匹配,系统会执行该条件的语句。如需了解详情,请参阅条件。
步骤函数
"ChoiceState": { "Type": "Choice", "Choices": [ { "Variable": "$.foo", "NumericEquals": 1, "Next": "FirstMatchState" }, { "Variable": "$.foo", "NumericEquals": 2, "Next": "SecondMatchState" } ], "Default": "DefaultState" }
工作流 YAML
switch: - condition: ${result.body.SomeField < 10} next: small - condition: ${result.body.SomeField < 100} next: medium - condition: true next: default_step
工作流 JSON
{ "switch": [ { "condition": "${result.body.SomeField < 10}", "next": "small" }, { "condition": "${result.body.SomeField < 100}", "next": "medium" }, { "condition": true, "next": "default_step" } ] }
Fail
Fail
状态会停止状态机的执行,并将其标记为失败。
在 Workflows 中,您可以使用 raise
语法引发自定义错误,还可以使用 try/except
块捕获和处理错误。如需了解详情,请参阅抛出错误。
步骤函数
"FailState": { "Type": "Fail", "Error": "ErrorA", "Cause": "Kaiju attack" }
工作流 YAML
raise: code: 55 message: "Something went wrong."
工作流 JSON
{ "raise": { "code": 55, "message": "Something went wrong." } }
Map
Map
状态可用于针对输入数组的每个元素运行一组步骤。
在 Workflows 中,您可以使用 for
循环进行迭代。
步骤函数
{ "StartAt": "ExampleMapState", "States": { "ExampleMapState": { "Type": "Map", "Iterator": { "StartAt": "CallLambda", "States": { "CallLambda": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction", "End": true } } }, "End": true } } }
工作流 YAML
- assignStep: assign: - map: 1: 10 2: 20 3: 30 - sum: 0 - loopStep: for: value: key in: ${keys(map)} steps: - sumStep: assign: - sum: ${sum + map[key]} - returnStep: return: ${sum}
工作流 JSON
[ { "assignStep": { "assign": [ { "map": { "1": 10, "2": 20, "3": 30 } }, { "sum": 0 } ] } }, { "loopStep": { "for": { "value": "key", "in": "${keys(map)}", "steps": [ { "sumStep": { "assign": [ { "sum": "${sum + map[key]}" } ] } } ] } } }, { "returnStep": { "return": "${sum}" } } ]
Parallel
Parallel
状态可用于在状态机中创建并行执行分支。在以下 Step Functions 示例中,系统会并行执行地址和电话号码查询。
在工作流中,您可以使用 parallel
步骤来定义工作流的某个部分,其中两个或更多步骤可以并发执行。如需了解详情,请参阅并行步骤。
步骤函数
{ "StartAt": "LookupCustomerInfo", "States": { "LookupCustomerInfo": { "Type": "Parallel", "End": true, "Branches": [ { "StartAt": "LookupAddress", "States": { "LookupAddress": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder", "End": true } } }, { "StartAt": "LookupPhone", "States": { "LookupPhone": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder", "End": true } } } ] } } }
工作流 YAML
main: params: [args] steps: - init: assign: - workflow_id: "lookupAddress" - customer_to_lookup: - address: ${args.customerId} - phone: ${args.customerId} - addressed: ["", ""] # to write to this variable, you must share it - parallel_address: parallel: shared: [addressed] for: in: ${customer_to_lookup} index: i # optional, use if index is required value: arg steps: - address: call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run args: workflow_id: ${workflow_id} argument: ${arg} result: r - set_result: assign: - addressed[i]: ${r} - return: return: ${addressed}
工作流 JSON
{ "main": { "params": [ "args" ], "steps": [ { "init": { "assign": [ { "workflow_id": "lookupAddress" }, { "customer_to_lookup": [ { "address": "${args.customerId}" }, { "phone": "${args.customerId}" } ] }, { "addressed": [ "", "" ] } ] } }, { "parallel_address": { "parallel": { "shared": [ "addressed" ], "for": { "in": "${customer_to_lookup}", "index": "i", "value": "arg", "steps": [ { "address": { "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run", "args": { "workflow_id": "${workflow_id}", "argument": "${arg}" }, "result": "r" } }, { "set_result": { "assign": [ { "addressed[i]": "${r}" } ] } } ] } } } }, { "return": { "return": "${addressed}" } } ] } }
Pass
Pass
状态会将其输入传递给其输出,而不会执行工作。这通常用于操作 JSON 中的状态数据。
由于 Workflow 不会以这种方式传递数据,因此您可以将状态保留为无操作,也可以使用分配步骤来修改变量。如需了解详情,请参阅分配变量。
步骤函数
"No-op": { "Type": "Pass", "Result": { "x-datum": 0.38, "y-datum": 622.22 }, "ResultPath": "$.coords", "Next": "End" }
工作流 YAML
assign: - number: 5 - number_plus_one: ${number+1} - other_number: 10 - string: "hello"
工作流 JSON
{ "assign": [ { "number": 5 }, { "number_plus_one": "${number+1}" }, { "other_number": 10 }, { "string": "hello" } ] }
成功
Succeed
状态表示成功停止执行。
在工作流中,您可以在主工作流中使用 return
来停止工作流的执行。您还可以通过完成最终步骤来完成工作流(假设该步骤不会跳转到其他步骤),或者如果您不需要返回值,也可以使用 next: end
停止工作流的执行。如需了解详情,请参阅完成工作流的执行。
步骤函数
"SuccessState": { "Type": "Succeed" }
工作流 YAML
return: "Success!" next: end
工作流 JSON
{ "return": "Success!", "next": "end" }
Task
Task
状态表示状态机执行的单个工作单元。在以下 Step Functions 示例中,它会调用 Lambda 函数。(activity 是 AWS 步骤函数的一项功能,可让您在状态机中创建任务,以便在其他位置执行工作。)
在工作流示例中,系统会调用 HTTP 端点来调用 Cloud Run 函数。您还可以使用连接器,轻松访问其他 Google Cloud 产品。此外,您还可以暂停工作流并轮询数据。或者,您也可以使用回调端点告知工作流指定事件已发生,然后等待该事件而不进行轮询。
步骤函数
"HelloWorld": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction", "End": true }
工作流 YAML
- HelloWorld: call: http.get args: url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld result: helloworld_result
工作流 JSON
[ { "HelloWorld": { "call": "http.get", "args": { "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld" }, "result": "helloworld_result" } } ]
Wait
Wait
状态会延迟状态机在指定时间内继续运行。
您可以使用 Workflows sys.sleep
标准库函数在指定的秒数内暂停执行,最长持续 31536000(一年)。
步骤函数
"wait_ten_seconds" : { "Type" : "Wait", "Seconds" : 10, "Next": "NextState" }
工作流 YAML
- someSleep: call: sys.sleep args: seconds: 10
工作流 JSON
[ { "someSleep": { "call": "sys.sleep", "args": { "seconds": 10 } } } ]
示例:编排微服务
以下 Step Functions 示例会检查股票价格、确定是买入还是卖出,并报告结果。示例中的状态机通过传递参数与 AWS Lambda 集成,使用 Amazon SQS 队列请求人工审批,并使用 Amazon SNS 主题返回查询结果。
{
"StartAt": "Check Stock Price",
"Comment": "An example of integrating Lambda functions in Step Functions state machine",
"States": {
"Check Stock Price": {
"Type": "Task",
"Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
"Next": "Generate Buy/Sell recommendation"
},
"Generate Buy/Sell recommendation": {
"Type": "Task",
"Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
"ResultPath": "$.recommended_type",
"Next": "Request Human Approval"
},
"Request Human Approval": {
"Type": "Task",
"Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
"MessageBody": {
"Input.$": "$",
"TaskToken.$": "$$.Task.Token"
}
},
"ResultPath": null,
"Next": "Buy or Sell?"
},
"Buy or Sell?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.recommended_type",
"StringEquals": "buy",
"Next": "Buy Stock"
},
{
"Variable": "$.recommended_type",
"StringEquals": "sell",
"Next": "Sell Stock"
}
]
},
"Buy Stock": {
"Type": "Task",
"Resource": "BUY_STOCK_LAMBDA_ARN",
"Next": "Report Result"
},
"Sell Stock": {
"Type": "Task",
"Resource": "SELL_STOCK_LAMBDA_ARN",
"Next": "Report Result"
},
"Report Result": {
"Type": "Task",
"Resource": "arn:PARTITION:states:::sns:publish",
"Parameters": {
"TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
"Message": {
"Input.$": "$"
}
},
"End": true
}
}
}
迁移到 Workflows
如需将上述 Step Functions 示例迁移到 Workflows,您可以通过集成 Cloud Run 函数、支持等待 HTTP 请求到达该端点的回调端点,以及使用 Workflows 连接器发布到 Pub/Sub 主题(而非 Amazon SNS 主题)来创建等效的 Workflows 步骤:
完成创建工作流的步骤,但请勿进行部署。
在工作流的定义中,添加一个步骤来创建等待人工输入的回调端点,以及一个步骤来使用 Workflows 连接器发布到 Pub/Sub 主题。例如:
工作流 YAML
--- main: steps: - init: assign: - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}' - region: LOCATION - topic: PUBSUB_TOPIC_NAME - Check Stock Price: call: http.get args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"} auth: type: OIDC result: stockPriceResponse - Generate Buy/Sell Recommendation: call: http.get args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"} auth: type: OIDC query: price: ${stockPriceResponse.body.stock_price} result: recommendResponse - Create Approval Callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - Print Approval Callback Details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - Await Approval Callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: approvalResponse - Approval?: try: switch: - condition: ${approvalResponse.http_request.query.response[0] == "yes"} next: Buy or Sell? except: as: e steps: - unknown_response: raise: ${"Unknown response:" + e.message} next: end - Buy or Sell?: switch: - condition: ${recommendResponse.body == "buy"} next: Buy Stock - condition: ${recommendResponse.body == "sell"} next: Sell Stock - condition: true raise: ${"Unknown recommendation:" + recommendResponse.body} - Buy Stock: call: http.post args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"} auth: type: OIDC body: action: ${recommendResponse.body} result: message - Sell Stock: call: http.post args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"} auth: type: OIDC body: action: ${recommendResponse.body} result: message - Report Result: call: googleapis.pubsub.v1.projects.topics.publish args: topic: ${"projects/" + projectId + "/topics/" + topic} body: messages: - data: '${base64.encode(json.encode(message))}' next: end ...
工作流 JSON
{ "main": { "steps": [ { "init": { "assign": [ { "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}" }, { "region": "LOCATION" }, { "topic": [ "PUBSUB_TOPIC_NAME" ] } ] } }, { "Check Stock Price": { "call": "http.get", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}", "auth": { "type": "OIDC" } }, "result": "stockPriceResponse" } }, { "Generate Buy/Sell Recommendation": { "call": "http.get", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}", "auth": { "type": "OIDC" }, "query": { "price": "${stockPriceResponse.body.stock_price}" } }, "result": "recommendResponse" } }, { "Create Approval Callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "Print Approval Callback Details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "Await Approval Callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "approvalResponse" } }, { "Approval?": { "try": { "switch": [ { "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}", "next": "Buy or Sell?" } ] }, "except": { "as": "e", "steps": [ { "unknown_response": { "raise": "${\"Unknown response:\" + e.message}", "next": "end" } } ] } } }, { "Buy or Sell?": { "switch": [ { "condition": "${recommendResponse.body == \"buy\"}", "next": "Buy Stock" }, { "condition": "${recommendResponse.body == \"sell\"}", "next": "Sell Stock" }, { "condition": true, "raise": "${\"Unknown recommendation:\" + recommendResponse.body}" } ] } }, { "Buy Stock": { "call": "http.post", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}", "auth": { "type": "OIDC" }, "body": { "action": "${recommendResponse.body}" } }, "result": "message" } }, { "Sell Stock": { "call": "http.post", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}", "auth": { "type": "OIDC" }, "body": { "action": "${recommendResponse.body}" } }, "result": "message" } }, { "Report Result": { "call": "googleapis.pubsub.v1.projects.topics.publish", "args": { "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}", "body": { "messages": [ { "data": "${base64.encode(json.encode(message))}" } ] } }, "next": "end" } } ] } }
替换以下内容:
LOCATION
:受支持的 Google Cloud 区域。例如us-central1
。PUBSUB_TOPIC_NAME
:您的 Pub/Sub 主题的名称。例如my_stock_example
。
部署,然后执行工作流。
在工作流执行期间,它会暂停并等待您调用回调端点。您可以使用 curl 命令执行此操作。例如:
curl -H "Authorization: Bearer $(gcloud auth print-access-token)" https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
将
CALLBACK_URL
替换为回调端点的路径的其余部分。工作流程成功完成后,您可以从 Pub/Sub 订阅接收消息。例如:
gcloud pubsub subscriptions pull stock_example-sub --format="value(message.data)" | jq
输出消息应类似如下所示(
buy
或sell
):{ "body": "buy", "code": 200, "headers": { [...] }