为了帮助您做好从 Amazon Web Services (AWS) Step Functions 迁移到 Google Cloud 上的 Workflows,本页介绍了主要的相似之处 两款产品之间的差异本文旨在帮助已熟悉 Step Functions 的用户使用 Workflow 实现类似的架构。
与 Step Functions 一样,Workflows 是一个全代管式、基于状态的编排平台,该平台会按照您定义的顺序执行服务:工作流。这些工作流可以结合使用服务(包括自定义服务) 托管在 Cloud Run 或 Cloud Run 函数上、Google Cloud 服务 例如 Cloud Vision AI 和 BigQuery,以及任何基于 HTTP 的 API。
请注意,Step Functions Express Workflows 是 AWS Step Functions 工作流 类型,则此处不作考虑,因为 Express Workflow 的持续时间为 受限,并且不支持“正好一次”工作流执行。
Hello World
在步骤函数中,状态机是一种工作流,而任务则是指 代表其他 AWS 服务执行的单个工作单元的工作流。 步骤函数要求每个状态都定义下一个状态。
在 Workflows 中,一系列使用 Workflows 语法的步骤用于描述要执行的任务。Workflows 会将步骤视为有序列表并逐个执行,直到所有步骤都运行完毕。
以下“Hello World”示例演示了如何在步骤函数中使用状态,以及如何在工作流中使用步骤:
步骤函数
{ "Comment": "Hello world example of Pass states in Amazon States Language", "StartAt": "Hello", "States": { "Hello": { "Type": "Pass", "Result": "Hello", "Next": "World" }, "World": { "Type": "Pass", "Result": "World", "End": true } } }
工作流 YAML
--- # Hello world example of steps using Google Cloud Workflows syntax main: steps: - Hello: next: World - World: next: end ...
工作流 JSON
{ "main": { "steps": [ { "Hello": { "next": "World" } }, { "World": { "next": "end" } } ] } }
对比概览
本部分将对这两款产品进行更详细的比较。
步骤函数 | Workflows | |
---|---|---|
语法 | JSON(工具中的 YAML) | YAML 或 JSON |
控制流 | 在状态之间转换 | 包含步骤的命令式流控制 |
Worker | 资源 (ARN) | HTTP 请求和连接器 |
可靠性 | 捕获/重试 | 捕获/重试 |
最大并行数量 | 支持 | 支持 |
状态数据 | 状态通过 | Workflows 变量 |
Authentication | IAM | IAM |
用户体验 | Workflow Studio、CLI、SDK、IaC | Google Cloud 控制台、CLI、SDK、IaC |
价格 | Step Functions 价格 | Workflows 价格 |
- 语法
Step Functions 主要使用 JSON 来定义函数,不支持 YAML;但在 AWS Toolkit for Visual Studio Code 和 AWS 中 CloudFormation 格式,您可以使用 YAML 定义步骤函数。
您可以使用 Workflows 来描述 Workflows 步骤。 语法,可以使用 YAML 或 JSON 编写。大部分 使用 YAML 格式本页中的示例展示了 YAML 的优势,包括易于读写以及对注释的原生支持。有关 Workflows 语法的详细说明,请参阅 语法参考。
- 控制流
Workflows 和 Step Functions 都将工作流建模为一系列任务:Workflows 中的步骤,Step Functions 中的状态。这两种方式都允许任务指明下一步要执行哪项任务,并且支持类似于开关的条件,以便根据当前状态选择下一个工作单元。一把钥匙 不同之处在于,步进函数要求每种状态定义下一个状态, 而 Workflows 会按照 (包括其他后续步骤)。如需了解详情,请参阅条件和步骤。
- 工作器
这两款产品都可以编排函数、容器和 其他网络服务来完成任务在步骤函数中,工作器 由
Resource
字段标识,该字段在语法上是一个 URI。用于标识工作器资源的 URI 采用 Amazon 资源名称 (ARN) 格式,并且用户无法直接调用任意 HTTP 端点。Workflows 可以发送 HTTP 请求, 任意 HTTP 端点并获取响应。连接器 可以更轻松地在工作流中连接到其他 Google Cloud API, 并可将您的工作流与 Google Cloud 产品 Pub/Sub、BigQuery 或 Cloud Build。连接器 简化服务的调用,因为它们会为您处理请求的格式, 提供方法和参数,这样您就无需了解 Google Cloud API。您还可以配置超时和轮询 政策。
- 可靠性
如果任务失败,您的工作流必须能够适当地重试、在必要时捕获异常,并根据需要重新路由工作流。步骤函数和工作流都使用类似的机制来实现可靠性:使用重试捕获异常,并最终将其分派到工作流中的其他位置。如需了解详情,请参阅工作流错误。
- 最大并行数量
您可能希望工作流程并行协调多个任务。第 函数提供了两种实现方法:您可以获取一个数据项并传递 可以并行执行多个不同的任务;或者,您可以使用数组并传递 将其元素添加到同一任务中。
在工作流中,您可以定义工作流的某个部分,让两个或更多步骤可以并发执行。你可以定义 或者迭代同时运行的循环。如需了解详情,请参阅并行执行工作流步骤。
- 状态数据
工作流引擎的一个好处是,它会为您维护状态数据,而无需外部数据存储区。在 Step Functions 中,状态数据会以 JSON 结构的形式从一个状态传递到另一个状态。
在 Workflows 中,您可以在全局变量中保存状态数据。由于您最多可以设置一年的执行时长,因此只要实例仍在执行,您就可以保留状态数据。
- 身份验证
这两款产品都依赖于底层 Identity and Access Management (IAM) 系统进行身份验证和访问权限控制。例如,您可以使用 IAM 角色调用 Step Functions。
在 Workflows 中,您可以使用服务账号来调用 workflow;您可以使用 OAuth 2.0 或 OIDC 连接到 Google Cloud API;您可以使用授权请求标头 第三方 API。如需了解详情,请参阅向工作流授予访问 Google Cloud 资源的权限和通过工作流发出经过身份验证的请求。
- 用户体验
您可以使用命令行工具或 Terraform 等基础架构即代码 (IaC) 来定义和管理 Step Functions 和工作流。
此外,Workflows 支持使用客户端库在 Google Cloud 控制台中执行工作流,也可以使用 Google Cloud CLI 或向 Workflows REST API 发送请求来执行工作流。如需了解详情,请参阅执行工作流。
- 价格
这两种产品都有免费层级。如需了解详情,请参阅相应价格 页面:Step Functions 价格 和 Workflows 价格。
将状态类型映射到步骤
Step Functions 中有八种状态类型。状态是指 可以基于输入做出决策、执行操作,并通过 输出到其他状态。从步骤函数迁移到 Workflows,请确保您了解如何翻译 状态类型转换为 Workflows 步骤。
Choice
Choice
状态会向状态机添加分支逻辑。
在工作流中,您可以使用 switch
块作为选择机制,以允许表达式的值控制工作流的执行流。如果某个值匹配,系统会执行该条件的语句。如需了解详情,请参阅条件。
步骤函数
"ChoiceState": { "Type": "Choice", "Choices": [ { "Variable": "$.foo", "NumericEquals": 1, "Next": "FirstMatchState" }, { "Variable": "$.foo", "NumericEquals": 2, "Next": "SecondMatchState" } ], "Default": "DefaultState" }
工作流 YAML
switch: - condition: ${result.body.SomeField < 10} next: small - condition: ${result.body.SomeField < 100} next: medium - condition: true next: default_step
工作流 JSON
{ "switch": [ { "condition": "${result.body.SomeField < 10}", "next": "small" }, { "condition": "${result.body.SomeField < 100}", "next": "medium" }, { "condition": true, "next": "default_step" } ] }
Fail
Fail
状态会停止状态机的执行,并将其标记为失败。
在 Workflows 中,您可以使用 raise
来报告自定义错误,
语法,您可以使用 try/except
代码块捕获并处理错误。如需了解详情,请参阅抛出错误。
步骤函数
"FailState": { "Type": "Fail", "Error": "ErrorA", "Cause": "Kaiju attack" }
工作流 YAML
raise: code: 55 message: "Something went wrong."
工作流 JSON
{ "raise": { "code": 55, "message": "Something went wrong." } }
Map
Map
状态可用于为输入中的每个元素运行一组步骤
数组。
在工作流中,您可以使用 for
循环进行迭代。
步骤函数
{ "StartAt": "ExampleMapState", "States": { "ExampleMapState": { "Type": "Map", "Iterator": { "StartAt": "CallLambda", "States": { "CallLambda": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction", "End": true } } }, "End": true } } }
工作流 YAML
- assignStep: assign: - map: 1: 10 2: 20 3: 30 - sum: 0 - loopStep: for: value: key in: ${keys(map)} steps: - sumStep: assign: - sum: ${sum + map[key]} - returnStep: return: ${sum}
工作流 JSON
[ { "assignStep": { "assign": [ { "map": { "1": 10, "2": 20, "3": 30 } }, { "sum": 0 } ] } }, { "loopStep": { "for": { "value": "key", "in": "${keys(map)}", "steps": [ { "sumStep": { "assign": [ { "sum": "${sum + map[key]}" } ] } } ] } } }, { "returnStep": { "return": "${sum}" } } ]
Parallel
Parallel
状态可用于在状态机中创建并行执行分支。在以下 Step Functions 示例中,系统会并行执行地址和电话号码查询。
在工作流中,您可以使用 parallel
步骤来定义工作流的某个部分,其中两个或更多步骤可以并发执行。有关
相关信息,请参阅并行步骤。
步骤函数
{ "StartAt": "LookupCustomerInfo", "States": { "LookupCustomerInfo": { "Type": "Parallel", "End": true, "Branches": [ { "StartAt": "LookupAddress", "States": { "LookupAddress": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder", "End": true } } }, { "StartAt": "LookupPhone", "States": { "LookupPhone": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder", "End": true } } } ] } } }
工作流 YAML
main: params: [args] steps: - init: assign: - workflow_id: "lookupAddress" - customer_to_lookup: - address: ${args.customerId} - phone: ${args.customerId} - addressed: ["", ""] # to write to this variable, you must share it - parallel_address: parallel: shared: [addressed] for: in: ${customer_to_lookup} index: i # optional, use if index is required value: arg steps: - address: call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run args: workflow_id: ${workflow_id} argument: ${arg} result: r - set_result: assign: - addressed[i]: ${r} - return: return: ${addressed}
工作流 JSON
{ "main": { "params": [ "args" ], "steps": [ { "init": { "assign": [ { "workflow_id": "lookupAddress" }, { "customer_to_lookup": [ { "address": "${args.customerId}" }, { "phone": "${args.customerId}" } ] }, { "addressed": [ "", "" ] } ] } }, { "parallel_address": { "parallel": { "shared": [ "addressed" ], "for": { "in": "${customer_to_lookup}", "index": "i", "value": "arg", "steps": [ { "address": { "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run", "args": { "workflow_id": "${workflow_id}", "argument": "${arg}" }, "result": "r" } }, { "set_result": { "assign": [ { "addressed[i]": "${r}" } ] } } ] } } } }, { "return": { "return": "${addressed}" } } ] } }
Pass
Pass
状态会将其输入传递给其输出,而不执行工作。这是
通常用于操作 JSON 中的状态数据。
由于工作流不会以这种方式传递数据,因此您可以将状态保留为无操作,也可以使用分配步骤修改变量。如需了解详情,请参阅 分配变量。
步骤函数
"No-op": { "Type": "Pass", "Result": { "x-datum": 0.38, "y-datum": 622.22 }, "ResultPath": "$.coords", "Next": "End" }
工作流 YAML
assign: - number: 5 - number_plus_one: ${number+1} - other_number: 10 - string: "hello"
工作流 JSON
{ "assign": [ { "number": 5 }, { "number_plus_one": "${number+1}" }, { "other_number": 10 }, { "string": "hello" } ] }
成功
Succeed
状态表示成功停止执行。
在 Workflows 中,您可以在主工作流中使用 return
来停止
工作流执行情况您也可以通过完成
最后一步(假设该步骤不会跳转到其他步骤),也可以使用
next: end
来停止工作流的执行。
如需了解详情,请参阅
完成工作流的执行过程。
步骤函数
"SuccessState": { "Type": "Succeed" }
工作流 YAML
return: "Success!" next: end
工作流 JSON
{ "return": "Success!", "next": "end" }
Task
Task
状态表示状态机执行的单个工作单元。在
下面的步骤函数示例中,它会调用 Lambda 函数。
(activity 是 AWS 步骤函数的一项功能,可让您在状态机中创建任务,以便在其他位置执行工作。)
在工作流示例中,系统会向 HTTP 端点发出调用来调用 Cloud Run 函数。您还可以使用连接器,轻松访问其他 Google Cloud 产品。此外, 您可以暂停工作流和轮询数据。或者,您也可以使用回调端点告知工作流指定事件已发生,然后等待该事件而不进行轮询。
步骤函数
"HelloWorld": { "Type": "Task", "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction", "End": true }
工作流 YAML
- HelloWorld: call: http.get args: url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld result: helloworld_result
工作流 JSON
[ { "HelloWorld": { "call": "http.get", "args": { "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld" }, "result": "helloworld_result" } } ]
Wait
Wait
状态会延迟状态机在指定时间内继续运行。
您可以使用 Workflows sys.sleep
标准库函数在指定的秒数内暂停执行,最长持续 31536000(一年)。
步骤函数
"wait_ten_seconds" : { "Type" : "Wait", "Seconds" : 10, "Next": "NextState" }
工作流 YAML
- someSleep: call: sys.sleep args: seconds: 10
工作流 JSON
[ { "someSleep": { "call": "sys.sleep", "args": { "seconds": 10 } } } ]
示例:编排微服务
下面的步骤函数示例检查股票价格,确定是否 买卖并报告结果示例中的状态机通过传递参数与 AWS Lambda 集成,使用 Amazon SQS 队列请求人工审批,并使用 Amazon SNS 主题返回查询结果。
{
"StartAt": "Check Stock Price",
"Comment": "An example of integrating Lambda functions in Step Functions state machine",
"States": {
"Check Stock Price": {
"Type": "Task",
"Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
"Next": "Generate Buy/Sell recommendation"
},
"Generate Buy/Sell recommendation": {
"Type": "Task",
"Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
"ResultPath": "$.recommended_type",
"Next": "Request Human Approval"
},
"Request Human Approval": {
"Type": "Task",
"Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
"MessageBody": {
"Input.$": "$",
"TaskToken.$": "$$.Task.Token"
}
},
"ResultPath": null,
"Next": "Buy or Sell?"
},
"Buy or Sell?": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.recommended_type",
"StringEquals": "buy",
"Next": "Buy Stock"
},
{
"Variable": "$.recommended_type",
"StringEquals": "sell",
"Next": "Sell Stock"
}
]
},
"Buy Stock": {
"Type": "Task",
"Resource": "BUY_STOCK_LAMBDA_ARN",
"Next": "Report Result"
},
"Sell Stock": {
"Type": "Task",
"Resource": "SELL_STOCK_LAMBDA_ARN",
"Next": "Report Result"
},
"Report Result": {
"Type": "Task",
"Resource": "arn:PARTITION:states:::sns:publish",
"Parameters": {
"TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
"Message": {
"Input.$": "$"
}
},
"End": true
}
}
}
迁移到 Workflows
如需将前面的步骤函数示例迁移到 Workflows,请执行以下操作: 您可以创建等效的 Workflows 步骤,只需使用 Cloud Run 函数,支持 回调端点,它会等待 HTTP 请求到达该端点,并使用 Workflows 发布到 Pub/Sub 主题的连接器 Amazon SNS 主题:
完成创建工作流程的步骤 先不要部署
在工作流的定义中,添加一个步骤来创建等待人工输入的回调端点,以及一个使用 Workflows 连接器发布到 Pub/Sub 主题的步骤。例如:
Workflows YAML
--- main: steps: - init: assign: - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}' - region: LOCATION - topic: PUBSUB_TOPIC_NAME - Check Stock Price: call: http.get args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"} auth: type: OIDC result: stockPriceResponse - Generate Buy/Sell Recommendation: call: http.get args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"} auth: type: OIDC query: price: ${stockPriceResponse.body.stock_price} result: recommendResponse - Create Approval Callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - Print Approval Callback Details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - Await Approval Callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: approvalResponse - Approval?: try: switch: - condition: ${approvalResponse.http_request.query.response[0] == "yes"} next: Buy or Sell? except: as: e steps: - unknown_response: raise: ${"Unknown response:" + e.message} next: end - Buy or Sell?: switch: - condition: ${recommendResponse.body == "buy"} next: Buy Stock - condition: ${recommendResponse.body == "sell"} next: Sell Stock - condition: true raise: ${"Unknown recommendation:" + recommendResponse.body} - Buy Stock: call: http.post args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"} auth: type: OIDC body: action: ${recommendResponse.body} result: message - Sell Stock: call: http.post args: url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"} auth: type: OIDC body: action: ${recommendResponse.body} result: message - Report Result: call: googleapis.pubsub.v1.projects.topics.publish args: topic: ${"projects/" + projectId + "/topics/" + topic} body: messages: - data: '${base64.encode(json.encode(message))}' next: end ...
工作流 JSON
{ "main": { "steps": [ { "init": { "assign": [ { "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}" }, { "region": "LOCATION" }, { "topic": [ "PUBSUB_TOPIC_NAME" ] } ] } }, { "Check Stock Price": { "call": "http.get", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}", "auth": { "type": "OIDC" } }, "result": "stockPriceResponse" } }, { "Generate Buy/Sell Recommendation": { "call": "http.get", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}", "auth": { "type": "OIDC" }, "query": { "price": "${stockPriceResponse.body.stock_price}" } }, "result": "recommendResponse" } }, { "Create Approval Callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "Print Approval Callback Details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "Await Approval Callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "approvalResponse" } }, { "Approval?": { "try": { "switch": [ { "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}", "next": "Buy or Sell?" } ] }, "except": { "as": "e", "steps": [ { "unknown_response": { "raise": "${\"Unknown response:\" + e.message}", "next": "end" } } ] } } }, { "Buy or Sell?": { "switch": [ { "condition": "${recommendResponse.body == \"buy\"}", "next": "Buy Stock" }, { "condition": "${recommendResponse.body == \"sell\"}", "next": "Sell Stock" }, { "condition": true, "raise": "${\"Unknown recommendation:\" + recommendResponse.body}" } ] } }, { "Buy Stock": { "call": "http.post", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}", "auth": { "type": "OIDC" }, "body": { "action": "${recommendResponse.body}" } }, "result": "message" } }, { "Sell Stock": { "call": "http.post", "args": { "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}", "auth": { "type": "OIDC" }, "body": { "action": "${recommendResponse.body}" } }, "result": "message" } }, { "Report Result": { "call": "googleapis.pubsub.v1.projects.topics.publish", "args": { "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}", "body": { "messages": [ { "data": "${base64.encode(json.encode(message))}" } ] } }, "next": "end" } } ] } }
替换以下内容:
LOCATION
:受支持的 Google Cloud 区域。例如us-central1
。PUBSUB_TOPIC_NAME
:您的 Pub/Sub 主题的名称。例如my_stock_example
。
部署,然后执行工作流。
在工作流执行期间,它会暂停并等待您调用 回调端点。您可以使用 curl 命令执行此操作。例如:
curl -H "Authorization: Bearer $(gcloud auth print-access-token)" https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
将
CALLBACK_URL
替换为 回调端点的路径。工作流程成功完成后,您可以从 Pub/Sub 订阅接收消息。例如:
gcloud pubsub subscriptions pull stock_example-sub --format="value(message.data)" | jq
输出消息应类似于以下内容(
buy
或sell
):{ "body": "buy", "code": 200, "headers": { [...] }