从 AWS Step Functions 迁移到 Workflows

为帮助您从 Amazon Web Services (AWS) Step Functions 迁移到 Google Cloud 上的工作流,本页面将介绍这两款产品的主要相似之处和不同之处。此信息旨在帮助已熟悉步骤函数的用户使用工作流实现类似的架构。

与 Step Functions 类似,Workflows 是一种基于状态的全代管式编排平台,该平台按照您指定的顺序执行服务:工作流程。这些工作流可以组合各种服务,包括托管在 Cloud Run 或 Cloud Functions 上的自定义服务、Google Cloud 服务(例如 Cloud Vision AI 和 BigQuery)以及任何基于 HTTP 的 API。

请注意,Step Functions Express Workflows 是一种 AWS Step Functions 工作流类型,因为 Express Workflow 工作流的时长有限,并且不支持执行一次性工作流。

Hello World

在步骤函数中,状态机是工作流,而任务是工作流中的状态,表示其他 AWS 服务执行的单个工作单元。Step Functions 要求每个状态定义下一个状态。

在 Workflows 中,使用 Workflows 语法的一系列步骤描述了要执行的任务。工作流会将步骤视为有序列表进行处理,并逐个执行,直到所有步骤都运行完毕。

以下“Hello world”示例演示了如何在步骤函数和工作流的步骤中使用状态

步骤函数

  {
    "Comment": "Hello world example of Pass states in Amazon States Language",
    "StartAt": "Hello",
    "States": {
      "Hello": {
        "Type": "Pass",
        "Result": "Hello",
        "Next": "World"
      },
      "World": {
        "Type": "Pass",
        "Result": "World",
        "End": true
      }
    }
  }

工作流 YAML

  ---
  # Hello world example of steps using Google Cloud Workflows syntax
  main:
      steps:
      - Hello:
          next: World
      - World:
          next: end
  ...

工作流 JSON

  {
    "main": {
      "steps": [
        {
          "Hello": {
            "next": "World"
          }
        },
        {
          "World": {
            "next": "end"
          }
        }
      ]
    }
  }

比较概览

本部分对这两种产品进行了更详细的比较。

步骤函数Workflows
语法JSON(工具中的 YAML) YAML 或 JSON
控制流状态之间的转换 包含步骤的命令式流控制
工作器资源 (ARN) HTTP 请求和连接器
可靠性捕获/重试 捕获/重试
并行处理支持 有效期至:experimental.executions.map
状态数据状态沿袭 工作流变量
AuthenticationIAM IAM
用户体验Workflow Studio、CLI、SDK、IaC Google Cloud Console、CLI、SDK、IaC
价格 Step Functions 价格 工作流价格
Syntax

Step Functions 主要使用 JSON 定义函数,不直接支持 YAML;但是,在适用于 Visual Studio Code 的 AWS 工具包和 AWS CloudFormation 中,您可以使用 YAML 来定义 Step Functions。

您可以使用 Workflows 语法描述 Workflows 步骤,它们可以使用 YAML 或 JSON 编写。大多数工作流都采用 YAML。本页面上的示例展示了 YAML 的优点(包括易于读写),以及对注释的原生支持。 如需详细了解 Workflows 语法,请参阅语法参考文档

控制流

Workflows 和 Step Functions 对工作流进行建模为一系列任务:Workflows 中的步骤以及步骤函数中的状态。两者都允许任务指示下一个要执行的任务,并支持类似开关的条件,以根据当前状态选择下一个工作单元。一个主要的区别在于,Step Functions 要求每个状态定义下一个状态,而 Workflows 则按照其指定的顺序(包括备用的后续步骤)执行步骤。如需了解详情,请参阅条件步骤

工作器

这两种产品都会编排函数、容器和其他网络服务以执行任务。在步骤函数中,工作器由在语法上是 URI 的 Resource 字段标识。用于标识工作器资源的 URI 采用 Amazon 资源名称 (ARN) 格式,用户无法直接调用任意 HTTP 端点。

工作流可以将 HTTP 请求发送到任意 HTTP 端点并获得响应。借助连接器,您可以更轻松地在工作流中连接到其他 Google Cloud API,并将您的工作流与其他 Google Cloud 产品集成,如 Pub/Sub、BigQuery 或 Cloud Build。连接器可为您处理请求的格式,提供调用方法和参数,让您不必知道 Google Cloud API 的详细信息,从而简化调用服务。您还可以配置超时和轮询政策。

可靠性

如果任务失败,您的工作流必须能够进行适当的重试,必要时捕获异常并在必要时重新路由工作流。步骤函数和工作流都使用类似的机制来实现可靠性:通过重试捕获异常,以及最终分派给工作流中的其他位置。有关详情,请参阅工作流程错误

并行处理

您可能希望工作流并行编排多个任务。步骤函数提供两种方法来实现这一目标:可以获取一个数据项,并将其与多个不同的任务并行传递;或者,您也可以使用数组并将其元素传递给同一任务。

您可以使用 Workflows 实验性功能 experimental.executions.map 来支持并行工作。

状态数据

工作流引擎的一项好处是,系统会为您维护状态数据,而无需外部数据存储区。在步骤函数中,状态数据以 JSON 结构从一种状态传递给另一种状态。

在 Workflows 中,您可以将状态数据保存在全局变量中。由于您最多可以执行一年的执行时长,因此只要实例仍在运行,就可以保留状态数据。

Authentication

这两种产品都依赖于底层 Identity and Access Management (IAM) 系统进行身份验证和访问权限控制。例如,您可以使用 IAM 角色来调用 Step Functions。

在 Workflows 中,您可以使用服务帐号来调用工作流;您可以使用 OAuth 2.0 或 OIDC 来连接 Google Cloud API;也可以使用授权请求标头来与第三方 API 进行身份验证。如需了解详情,请参阅授予工作流访问 Google Cloud 资源的权限从工作流发出经过身份验证的请求

用户体验

您可以使用命令行工具或基础架构即代码 (IaC)(例如 Terraform)来定义和管理步骤函数和工作流。

此外,Workflows 支持使用客户端库、在 Cloud Console 中、通过 Google Cloud CLI,或者通过向 Workflows REST API 发送请求来执行工作流。如需了解详情,请参阅执行工作流

价格

这两种产品均享有免费层级。如需了解详情,请参阅它们各自的价格页面:Step Functions 价格Workflows 价格

将状态类型映射到步骤

Step Functions 中有八种状态类型。状态是状态机中的各种元素,这些元素可以根据输入做出决策,执行操作,并将输出传递给其他状态。在从 Step Functions 迁移到 Workflows 之前,请确保您了解如何将每种状态类型转换为 Workflows 步骤。

Choice

Choice 状态用于向状态机添加分支逻辑。

在工作流中,您可以将 switch 代码块作为一种选择机制,以允许表达式的值控制工作流的执行流程。如果值匹配,则执行该条件的语句。 如需了解详情,请参阅条件

步骤函数

  "ChoiceState": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.foo",
        "NumericEquals": 1,
        "Next": "FirstMatchState"
      },
      {
        "Variable": "$.foo",
        "NumericEquals": 2,
        "Next": "SecondMatchState"
      }
    ],
    "Default": "DefaultState"
  }

工作流 YAML

  switch:
    - condition: ${result.body.SomeField < 10}
      next: small
    - condition: ${result.body.SomeField < 100}
      next: medium
    - condition: true
      next: default_step

工作流 JSON

  {
    "switch": [
      {
        "condition": "${result.body.SomeField < 10}",
        "next": "small"
      },
      {
        "condition": "${result.body.SomeField < 100}",
        "next": "medium"
      },
      {
        "condition": true,
        "next": "default_step"
      }
    ]
  }

Fail

Fail 状态会停止状态机的执行,并将其标记为失败。

在 Workflows 中,您可以使用 raise 语法来引发自定义错误,并可以使用 try/except 代码块捕捉并处理错误。如需了解详情,请参阅引发错误

步骤函数

  "FailState": {
      "Type": "Fail",
      "Error": "ErrorA",
      "Cause": "Kaiju attack"
  }

工作流 YAML

  raise:
      code: 55
      message: "Something went wrong."

工作流 JSON

  {
    "raise": {
      "code": 55,
      "message": "Something went wrong."
    }
  }

Map

Map 状态可用于为输入数组的每个元素运行一组步骤。

在 Workflows 中,您可以使用 for 循环进行迭代

步骤函数

  { "StartAt": "ExampleMapState",
    "States": {
      "ExampleMapState": {
        "Type": "Map",
        "Iterator": {
           "StartAt": "CallLambda",
           "States": {
             "CallLambda": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
               "End": true
             }
           }
        }, "End": true
      }
    }
  }

工作流 YAML

  - assignStep:
      assign:
        - map:
            1: 10
            2: 20
            3: 30
        - sum: 0
  - loopStep:
      for:
          value: key
          in: ${keys(map)}
          steps:
            - sumStep:
                assign:
                  - sum: ${sum + map[key]}
  - returnStep:
      return: ${sum}

工作流 JSON

  [
    {
      "assignStep": {
        "assign": [
          {
            "map": {
              "1": 10,
              "2": 20,
              "3": 30
            }
          },
          {
            "sum": 0
          }
        ]
      }
    },
    {
      "loopStep": {
        "for": {
          "value": "key",
          "in": "${keys(map)}",
          "steps": [
            {
              "sumStep": {
                "assign": [
                  {
                    "sum": "${sum + map[key]}"
                  }
                ]
              }
            }
          ]
        }
      }
    },
    {
      "returnStep": {
        "return": "${sum}"
      }
    }
  ]

Parallel

Parallel 状态可用于在状态机中创建并行执行分支。在以下 Step Functions 示例中,地址和电话查询是并行执行的。

在 Workflows 中,您可以使用实验性功能 experimental.executions.map 来支持并行工作。

步骤函数

  { "StartAt": "LookupCustomerInfo",
    "States": {
      "LookupCustomerInfo": {
        "Type": "Parallel",
        "End": true,
        "Branches": [
          {
           "StartAt": "LookupAddress",
           "States": {
             "LookupAddress": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder",
               "End": true
             }
           }
         },
         {
           "StartAt": "LookupPhone",
           "States": {
             "LookupPhone": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder",
               "End": true
             }
           }
         }
        ]
      }
    }
  }

工作流 YAML

  - parallel-executor:
      call: experimental.executions.map
      args:
        workflow_id: LookupCustomerInfo
        arguments: [{"LookupAddress":"${cust_id}"},{"LookupPhone":"${cust_id}"}]
      result: result

工作流 JSON

  [
    {
      "parallel-executor": {
        "call": "experimental.executions.map",
        "args": {
          "workflow_id": "LookupCustomerInfo",
          "arguments": [
            {
              "LookupAddress": "${cust_id}"
            },
            {
              "LookupPhone": "${cust_id}"
            }
          ]
        },
        "result": "result"
      }
    }
  ]

Pass

Pass 状态会将其输入传递到输出,而不执行工作。这通常用于操控 JSON 中的状态数据。

由于 Workflows 不会以这种方式传递数据,因此您可以将状态保留为空操作,也可以使用分配步骤来修改变量。如需了解详情,请参阅分配变量

步骤函数

  "No-op": {
    "Type": "Pass",
    "Result": {
      "x-datum": 0.38,
      "y-datum": 622.22
    },
    "ResultPath": "$.coords",
    "Next": "End"
  }

工作流 YAML

  assign:
      - number: 5
      - number_plus_one: ${number+1}
      - other_number: 10
      - string: "hello"

工作流 JSON

  {
    "assign": [
      {
        "number": 5
      },
      {
        "number_plus_one": "${number+1}"
      },
      {
        "other_number": 10
      },
      {
        "string": "hello"
      }
    ]
  }

成功

Succeed 状态可成功停止执行。

在 Workflows 中,您可以在主工作流中使用 return 来停止工作流的执行。您还可通过完成最后一步(假设相应步骤不跳到其他步骤)来完成工作流;或者,如果您不想返回任何值,也可以使用 next: end 停止工作流的执行。如需了解详情,请参阅完成工作流的执行

步骤函数

  "SuccessState": {
    "Type": "Succeed"
  }

工作流 YAML

  return: "Success!"
  next: end

工作流 JSON

  {
    "return": "Success!",
    "next": "end"
  }

Task

Task 状态表示状态机执行的单个工作单元。在以下 Step Functions 示例中,它会调用 Lambda 函数。 (activity 是一种 AWS Step Functions 功能,可让您在状态机中执行在其他位置执行任务的任务。)

在 Workflows 示例中,调用 HTTP 端点以调用 Cloud Functions 函数。您还可以使用连接器轻松访问其他 Google Cloud 产品。此外,您还可以暂停工作流并轮询数据。或者,您也可以使用回调端点向工作流发出通知,指明发生了特定事件,并在不进行轮询的情况下等待该事件。

步骤函数

  "HelloWorld": {
    "Type": "Task",
    "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
    "End": true
  }

工作流 YAML

  - HelloWorld:
      call: http.get
      args:
          url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld
      result: helloworld_result

工作流 JSON

  [
    {
      "HelloWorld": {
        "call": "http.get",
        "args": {
          "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld"
        },
        "result": "helloworld_result"
      }
    }
  ]

Wait

Wait 状态会使状态机延迟指定时间继续运行。

您可以使用 Workflows sys.sleep 标准库函数将执行的给定秒数暂停到最多 31536000(一年)。

步骤函数

  "wait_ten_seconds" : {
    "Type" : "Wait",
    "Seconds" : 10,
    "Next": "NextState"
  }

工作流 YAML

  - someSleep:
      call: sys.sleep
      args:
          seconds: 10

工作流 JSON

  [
    {
      "someSleep": {
        "call": "sys.sleep",
        "args": {
          "seconds": 10
        }
      }
    }
  ]

示例:编排微服务

以下“步骤函数”示例会检查股价、确定买入或卖出以及报告结果。该示例中的状态机通过传递参数与 AWS Lambda 集成,使用 Amazon SQS 队列请求人工审批,并使用 Amazon SNS 主题返回查询结果。

{
      "StartAt": "Check Stock Price",
      "Comment": "An example of integrating Lambda functions in Step Functions state machine",
      "States": {
          "Check Stock Price": {
              "Type": "Task",
              "Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
              "Next": "Generate Buy/Sell recommendation"
          },
          "Generate Buy/Sell recommendation": {
              "Type": "Task",
              "Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
              "ResultPath": "$.recommended_type",
              "Next": "Request Human Approval"
          },
          "Request Human Approval": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
              "Parameters": {
                  "QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
                  "MessageBody": {
                      "Input.$": "$",
                      "TaskToken.$": "$$.Task.Token"
                  }
              },
              "ResultPath": null,
              "Next": "Buy or Sell?"
          },
          "Buy or Sell?": {
              "Type": "Choice",
              "Choices": [
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "buy",
                      "Next": "Buy Stock"
                  },
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "sell",
                      "Next": "Sell Stock"
                  }
              ]
          },
          "Buy Stock": {
              "Type": "Task",
              "Resource": "BUY_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Sell Stock": {
              "Type": "Task",
              "Resource": "SELL_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Report Result": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sns:publish",
              "Parameters": {
                  "TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
                  "Message": {
                      "Input.$": "$"
                  }
              },
              "End": true
          }
      }
  }

迁移到 Workflows

如需将上述步骤函数示例迁移到工作流,您可以通过以下方式创建等效的工作流步骤:集成 Cloud Functions,支持等待 HTTP 请求到达该端点的回调端点,然后使用工作流连接器发布到 Pub/Sub 主题来代替 Amazon SNS 主题:

  1. 完成创建工作流的步骤,但先不要部署。

  2. 在工作流的定义中,添加一个用于创建人工输入的回调端点的步骤,以及使用 Workflows 连接器发布到 Pub/Sub 主题的步骤。例如:

    工作流 YAML

      ---
      main:
        steps:
          - init:
              assign:
                - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'
                - region: LOCATION
                - topic: PUBSUB_TOPIC_NAME
          - Check Stock Price:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"}
                auth:
                  type: OIDC
              result: stockPriceResponse
          - Generate Buy/Sell Recommendation:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"}
                auth:
                  type: OIDC
                query:
                  price: ${stockPriceResponse.body.stock_price}
              result: recommendResponse
          - Create Approval Callback:
              call: events.create_callback_endpoint
              args:
                  http_callback_method: "GET"
              result: callback_details
          - Print Approval Callback Details:
              call: sys.log
              args:
                  severity: "INFO"
                  text: ${"Listening for callbacks on " + callback_details.url}
          - Await Approval Callback:
              call: events.await_callback
              args:
                  callback: ${callback_details}
                  timeout: 3600
              result: approvalResponse
          - Approval?:
              try:
                switch:
                  - condition: ${approvalResponse.http_request.query.response[0] == "yes"}
                    next: Buy or Sell?
              except:
                as: e
                steps:
                  - unknown_response:
                      raise: ${"Unknown response:" + e.message}
                      next: end
          - Buy or Sell?:
              switch:
                - condition: ${recommendResponse.body == "buy"}
                  next: Buy Stock
                - condition: ${recommendResponse.body == "sell"}
                  next: Sell Stock
                - condition: true
                  raise: ${"Unknown recommendation:" + recommendResponse.body}
          - Buy Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Sell Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Report Result:
              call: googleapis.pubsub.v1.projects.topics.publish
              args:
                topic: ${"projects/" + projectId + "/topics/" + topic}
                body:
                  messages:
                  - data: '${base64.encode(json.encode(message))}'
              next: end
      ...

    工作流 JSON

      {
        "main": {
          "steps": [
            {
              "init": {
                "assign": [
                  {
                    "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                  },
                  {
                    "region": "LOCATION"
                  },
                  {
                    "topic": [
                      "PUBSUB_TOPIC_NAME"
                    ]
                  }
                ]
              }
            },
            {
              "Check Stock Price": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}",
                  "auth": {
                    "type": "OIDC"
                  }
                },
                "result": "stockPriceResponse"
              }
            },
            {
              "Generate Buy/Sell Recommendation": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "query": {
                    "price": "${stockPriceResponse.body.stock_price}"
                  }
                },
                "result": "recommendResponse"
              }
            },
            {
              "Create Approval Callback": {
                "call": "events.create_callback_endpoint",
                "args": {
                  "http_callback_method": "GET"
                },
                "result": "callback_details"
              }
            },
            {
              "Print Approval Callback Details": {
                "call": "sys.log",
                "args": {
                  "severity": "INFO",
                  "text": "${\"Listening for callbacks on \" + callback_details.url}"
                }
              }
            },
            {
              "Await Approval Callback": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "approvalResponse"
              }
            },
            {
              "Approval?": {
                "try": {
                  "switch": [
                    {
                      "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}",
                      "next": "Buy or Sell?"
                    }
                  ]
                },
                "except": {
                  "as": "e",
                  "steps": [
                    {
                      "unknown_response": {
                        "raise": "${\"Unknown response:\" + e.message}",
                        "next": "end"
                      }
                    }
                  ]
                }
              }
            },
            {
              "Buy or Sell?": {
                "switch": [
                  {
                    "condition": "${recommendResponse.body == \"buy\"}",
                    "next": "Buy Stock"
                  },
                  {
                    "condition": "${recommendResponse.body == \"sell\"}",
                    "next": "Sell Stock"
                  },
                  {
                    "condition": true,
                    "raise": "${\"Unknown recommendation:\" + recommendResponse.body}"
                  }
                ]
              }
            },
            {
              "Buy Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Sell Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Report Result": {
                "call": "googleapis.pubsub.v1.projects.topics.publish",
                "args": {
                  "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}",
                  "body": {
                    "messages": [
                      {
                        "data": "${base64.encode(json.encode(message))}"
                      }
                    ]
                  }
                },
                "next": "end"
              }
            }
          ]
        }
      }

    替换以下内容:

    • LOCATION:受支持的 Google Cloud 区域。 例如 us-central1
    • PUBSUB_TOPIC_NAME:Pub/Sub 主题的名称。例如 my_stock_example
  3. 部署并执行工作流

  4. 在工作流执行期间,它会暂停并等待您调用回调端点。您可以使用 curl 命令执行此操作。例如:

    curl -H "Authorization: Bearer $(gcloud auth print-access-token)"
    https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
    

    CALLBACK_URL 替换为回调端点的剩余路径。

  5. 工作流成功完成后,您可以接收来自 Pub/Sub 订阅的消息。例如:

    gcloud pubsub subscriptions pull stock_example-sub  --format="value(message.data)" | jq
    

    输出消息应类似于以下内容(buysell):

    {
      "body": "buy",
      "code": 200,
      "headers": {
      [...]
      }
    

后续步骤