AWS Step Functions에서 Workflows로 마이그레이션

이 페이지에서는 Amazon Web Services(AWS) Step Functions에서 Google Cloud의 Workflows로 마이그레이션을 준비할 수 있도록 두 제품 간의 중요한 유사점과 차이점에 대해 설명합니다. 이 정보는 Step Functions에 이미 익숙한 사용자가 Workflows를 사용해서 비슷한 아키텍처를 구현하는 것을 돕기 위한 것입니다.

Step Functions와 마찬가지로 Workflows는 완전 관리형 상태 기반 조정 플랫폼이며, 사용자가 정의한 순서인 워크플로에 따라 서비스를 실행합니다. 이러한 워크플로는 Cloud Run 또는 Cloud Run 함수에서 호스팅되는 커스텀 서비스, Cloud Vision AI 및 BigQuery와 같은 Google Cloud 서비스, 모든 HTTP 기반 API를 포함한 서비스를 결합할 수 있습니다.

Step Functions Express Workflows는 Express Workflow 기간이 제한적이고 워크플로를 정확히 한 번만 실행하는 것이 지원되지 않기 때문에 여기에서 고려되지 않는 AWS Step Functions 워크플로 유형입니다.

Hello World

Step Functions에서 상태 머신은 워크플로이고, 태스크는 다른 AWS 서비스가 수행하는 단일 작업 단위를 나타내는 워크플로의 상태입니다. Step Functions에서는 모든 상태에서 다음 상태를 정의해야 합니다.

Workflows에서는 Workflows 문법을 사용하는 일련의 단계로 실행할 태스크를 기술합니다. Workflows는 단계가 순서가 지정된 목록에 있는 것처럼 단계를 처리하고 모든 단계가 실행될 때까지 한 번에 하나씩 실행합니다.

다음 'Hello World' 샘플은 Step Functions에서의 상태 및 Workflows에서의 단계 사용을 보여줍니다.

Step Functions

  {
    "Comment": "Hello world example of Pass states in Amazon States Language",
    "StartAt": "Hello",
    "States": {
      "Hello": {
        "Type": "Pass",
        "Result": "Hello",
        "Next": "World"
      },
      "World": {
        "Type": "Pass",
        "Result": "World",
        "End": true
      }
    }
  }

Workflows YAML

  ---
  # Hello world example of steps using Google Cloud Workflows syntax
  main:
      steps:
      - Hello:
          next: World
      - World:
          next: end
  ...

Workflows JSON

  {
    "main": {
      "steps": [
        {
          "Hello": {
            "next": "World"
          }
        },
        {
          "World": {
            "next": "end"
          }
        }
      ]
    }
  }

비교 개요

이 섹션에서는 두 제품을 더 자세히 비교해서 보여줍니다.

Step Functions워크플로
구문JSON(도구 내 YAML) YAML 또는 JSON
제어 흐름상태 간 전환 단계가 포함된 필수 흐름 제어
작업자리소스(ARN) HTTP 요청 및 커넥터
안정성캐치/재시도 캐치/재시도
동시 로드지원됨 지원됨
상태 데이터상태가 함께 전달됨 Workflows 변수
인증IAM IAM
사용자 환경Workflow Studio, CLI, SDK, IaC Google Cloud 콘솔, CLI, SDK, IaC
가격 책정 Step Functions 가격 책정 Workflows 가격 책정
구문

Step Functions는 주로 JSON을 사용해서 함수를 정의하며, YAML을 직접 지원하지 않습니다. 하지만 Visual Studio Code용 AWS Toolkit 및 AWS CloudFormation에서 Step Functions 정의에 YAML을 사용할 수 있습니다.

Workflows 문법을 사용해서 Workflows 단계를 설명할 수 있으며, YAML 또는 JSON으로 기록될 수 있습니다. 대부분의 워크플로는 YAML 형식입니다. 이 페이지의 예시에서는 간편한 읽기 및 쓰기와 주석 고유 지원을 포함한 YAML의 장점을 보여줍니다. Workflows 문법에 대한 자세한 설명은 문법 참조를 참조하세요.

제어 흐름

Workflows 및 Step Functions 모두 워크플로를 일련의 태스크로 모델링합니다. Workflows에서는 단계이고 Step Functions에서는 상태입니다. 두 제품 모두 다음에 실행할 태스크를 나타낼 수 있고 현재 상태를 기준으로 다음 작업 단위를 선택하는 스위치와 비슷한 조건이 지원됩니다. 한 가지 중요한 차이점은 Step Functions의 경우 모든 상태에서 다음 상태를 정의해야 하지만, Workflows의 경우 다음 대체 단계를 포함해서 지정된 순서로 단계를 실행한다는 것입니다(다음 대체 단계 포함). 자세한 내용은 조건단계를 참조하세요.

작업자

두 제품 모두 함수, 컨테이너, 기타 웹 서비스와 같은 컴퓨팅 리소스를 조정하여 작업을 수행합니다. Step Functions에서 작업자는 문법적으로 URI인 Resource 필드로 식별됩니다. 작업자 리소스 식별에 사용되는 URI는 Amazon 리소스 이름(ARN) 형식이며 사용자가 임의로 HTTP 엔드포인트를 직접 호출할 수 없습니다.

Workflows는 임의 HTTP 엔드포인트로 HTTP 요청을 전송하고 응답을 받을 수 있습니다. 커넥터는 워크플로 내에서 다른 Google Cloud API에 쉽게 연결하고 Pub/Sub, BigQuery, Cloud Build와 같은 다른 Google Cloud 제품에 워크플로를 통합할 수 있게 해줍니다. 커넥터는 Google Cloud API의 세부정보를 알 필요가 없도록 요청의 형식 지정을 처리하여 메서드와 인수를 제공하므로 호출 서비스를 단순화합니다. 또한 제한 시간 및 폴링 정책을 구성할 수 있습니다.

안정성

태스크가 실패하는 경우 워크플로가 적절하게 재시도를 수행하고, 필요할 때 예외를 포착하고, 필요에 따라 워크플로 경로를 재지정할 수 있어야 합니다. Step Functions 및 Workflows 모두 재시도를 사용한 예외 포착, 워크플로 내 모든 장소로의 결과적인 디스패치와 같은 비슷한 메커니즘을 통해 안정성을 확보합니다. 자세한 내용은 워크플로 오류를 참조하세요.

동시 로드

워크플로에서 여러 태스크를 동시에 조정해야 할 수 있습니다. Step Functions는 이를 위해 두 가지 방법을 제공합니다. 데이터 항목 하나를 가져와서 다른 여러 태스크에 병렬로 전달하거나, 배열을 사용해서 해당 요소를 동일한 태스크에 전달할 수 있습니다.

Workflows에서 두 개 이상의 단계가 동시에 실행될 수 있는 워크플로 부분을 정의할 수 있습니다. 동시에 실행되는 브랜치를 정의하거나 반복이 동시에 실행되는 루프를 정의할 수 있습니다. 자세한 내용은 워크플로 단계 병렬 실행을 참조하세요.

상태 데이터

워크플로 엔진의 한 가지 이점은 외부 Datastore 없이도 상태 데이터가 유지된다는 것입니다. Step Functions에서 상태 데이터는 하나의 상태에서 다른 상태로 JSON 구조로 전달됩니다.

Workflows에서는 상태 데이터를 전역 변수에 저장할 수 있습니다. 최대 1년의 실행 기간이 허용되므로 인스턴스가 실행 중이기만 하면 상태 데이터를 유지할 수 있습니다.

인증

두 제품 모두 인증 및 액세스 제어를 위해 기본 Identity and Access Management(IAM) 시스템이 사용됩니다. 예를 들어 IAM 역할을 사용해서 Step Functions를 호출할 수 있습니다.

Workflows에서는 서비스 계정을 사용해서 워크플로를 호출할 수 있습니다. OAuth 2.0 또는 OIDC를 사용해서 Google Cloud API에 연결할 수 있고, 승인 요청 헤더를 사용해서 타사 API에 인증할 수 있습니다. 자세한 내용은 워크플로에 Google Cloud 리소스 액세스 권한 부여워크플로에서 인증된 요청 수행을 참조하세요.

사용자 환경

명령줄 도구 또는 Terraform과 같은 코드형 인프라(IaC)를 사용해서 Step Functions 및 Workflows를 모두 정의하고 관리할 수 있습니다.

또한 Workflows는 클라이언트 라이브러리, Google Cloud 콘솔, Google Cloud CLI를 사용하거나 Workflows REST API에 요청을 전송하는 방식의 워크플로 실행을 지원합니다. 자세한 내용은 워크플로 실행을 참조하세요.

가격 책정

두 제품 모두 무료 등급이 있습니다. 자세한 내용은 해당 가격 책정 페이지 Step Functions 가격 책정Workflows 가격 책정을 참조하세요.

단계에 상태 유형 매핑

Step Functions에는 8가지 상태 유형이 있습니다. 상태는 상태 머신에서 입력,에 따라 결정을 내리고, 작업을 수행하고, 출력을 다른 상태로 전달할 수 있는 요소입니다. Step Functions에서 Workflows로 마이그레이션하기 전 각 상태 유형을 Workflows 단계로 변환하는 방법을 확인해야 합니다.

Choice

Choice 상태는 상태 머신에 분기 논리를 추가합니다.

Workflows에서는 표현식의 값을 허용하는 선택 메커니즘으로 switch 블록을 사용하여 워크플로의 실행 흐름을 제어할 수 있습니다. 값이 일치하면 조건 문이 실행됩니다. 자세한 내용은 조건을 참조하세요.

Step Functions

  "ChoiceState": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.foo",
        "NumericEquals": 1,
        "Next": "FirstMatchState"
      },
      {
        "Variable": "$.foo",
        "NumericEquals": 2,
        "Next": "SecondMatchState"
      }
    ],
    "Default": "DefaultState"
  }

Workflows YAML

  switch:
    - condition: ${result.body.SomeField < 10}
      next: small
    - condition: ${result.body.SomeField < 100}
      next: medium
    - condition: true
      next: default_step

Workflows JSON

  {
    "switch": [
      {
        "condition": "${result.body.SomeField < 10}",
        "next": "small"
      },
      {
        "condition": "${result.body.SomeField < 100}",
        "next": "medium"
      },
      {
        "condition": true,
        "next": "default_step"
      }
    ]
  }

Fail

Fail 상태는 상태 머신의 실행을 중지하고 이를 실패로 표시합니다.

Workflows에서는 raise 문법을 사용해서 커스텀 오류를 일으키고, try/except 블록을 사용해서 오류를 포착 및 처리할 수 있습니다. 자세한 내용은 오류 발생을 참조하세요.

Step Functions

  "FailState": {
      "Type": "Fail",
      "Error": "ErrorA",
      "Cause": "Kaiju attack"
  }

Workflows YAML

  raise:
      code: 55
      message: "Something went wrong."

Workflows JSON

  {
    "raise": {
      "code": 55,
      "message": "Something went wrong."
    }
  }

Map

Map 상태는 입력 배열의 각 요소에 대해 단계 집합을 실행하도록 사용될 수 있습니다.

Workflows에서 iterations에 대해 for 루프를 사용할 수 있습니다.

Step Functions

  { "StartAt": "ExampleMapState",
    "States": {
      "ExampleMapState": {
        "Type": "Map",
        "Iterator": {
           "StartAt": "CallLambda",
           "States": {
             "CallLambda": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
               "End": true
             }
           }
        }, "End": true
      }
    }
  }

Workflows YAML

  - assignStep:
      assign:
        - map:
            1: 10
            2: 20
            3: 30
        - sum: 0
  - loopStep:
      for:
          value: key
          in: ${keys(map)}
          steps:
            - sumStep:
                assign:
                  - sum: ${sum + map[key]}
  - returnStep:
      return: ${sum}

Workflows JSON

  [
    {
      "assignStep": {
        "assign": [
          {
            "map": {
              "1": 10,
              "2": 20,
              "3": 30
            }
          },
          {
            "sum": 0
          }
        ]
      }
    },
    {
      "loopStep": {
        "for": {
          "value": "key",
          "in": "${keys(map)}",
          "steps": [
            {
              "sumStep": {
                "assign": [
                  {
                    "sum": "${sum + map[key]}"
                  }
                ]
              }
            }
          ]
        }
      }
    },
    {
      "returnStep": {
        "return": "${sum}"
      }
    }
  ]

Parallel

Parallel 상태는 상태 머신에서 실행의 병렬 분기를 만드는 데 사용될 수 있습니다. 다음 Step Functions 예시에서 주소 및 전화 번호 조회는 병렬로 수행됩니다.

Workflows에서 parallel 단계를 사용하여 두 개 이상의 단계가 동시에 실행될 수 있는 워크플로 부분을 정의할 수 있습니다. 자세한 내용은 병렬 단계를 참조하세요.

Step Functions

  { "StartAt": "LookupCustomerInfo",
    "States": {
      "LookupCustomerInfo": {
        "Type": "Parallel",
        "End": true,
        "Branches": [
          {
           "StartAt": "LookupAddress",
           "States": {
             "LookupAddress": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:AddressFinder",
               "End": true
             }
           }
         },
         {
           "StartAt": "LookupPhone",
           "States": {
             "LookupPhone": {
               "Type": "Task",
               "Resource": "arn:aws:lambda:us-east-1:123456789012:function:PhoneFinder",
               "End": true
             }
           }
         }
        ]
      }
    }
  }

Workflows YAML

  main:
     params: [args]
     steps:
     - init:
         assign:
         - workflow_id: "lookupAddress"
         - customer_to_lookup:
             - address: ${args.customerId}
             - phone: ${args.customerId}
         - addressed: ["", ""] # to write to this variable, you must share it
     - parallel_address:
         parallel:
             shared: [addressed]
             for:
                 in: ${customer_to_lookup}
                 index: i # optional, use if index is required
                 value: arg
                 steps:
                 - address:
                     call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run
                     args:
                         workflow_id: ${workflow_id}
                         argument: ${arg}
                     result: r
                 - set_result:
                     assign:
                     - addressed[i]: ${r}
     - return:
             return: ${addressed}

Workflows JSON

  {
    "main": {
      "params": [
        "args"
      ],
      "steps": [
        {
          "init": {
            "assign": [
              {
                "workflow_id": "lookupAddress"
              },
              {
                "customer_to_lookup": [
                  {
                    "address": "${args.customerId}"
                  },
                  {
                    "phone": "${args.customerId}"
                  }
                ]
              },
              {
                "addressed": [
                  "",
                  ""
                ]
              }
            ]
          }
        },
        {
          "parallel_address": {
            "parallel": {
              "shared": [
                "addressed"
              ],
              "for": {
                "in": "${customer_to_lookup}",
                "index": "i",
                "value": "arg",
                "steps": [
                  {
                    "address": {
                      "call": "googleapis.workflowexecutions.v1.projects.locations.workflows.executions.run",
                      "args": {
                        "workflow_id": "${workflow_id}",
                        "argument": "${arg}"
                      },
                      "result": "r"
                    }
                  },
                  {
                    "set_result": {
                      "assign": [
                        {
                          "addressed[i]": "${r}"
                        }
                      ]
                    }
                  }
                ]
              }
            }
          }
        },
        {
          "return": {
            "return": "${addressed}"
          }
        }
      ]
    }
  }

Pass

Pass 상태는 작업을 수행하지 않고 입력을 해당 출력으로 전달합니다. 이것은 일반적으로 JSON으로 상태 데이터를 조작하기 위해 사용됩니다.

Workflows가 이 방식으로 데이터를 전달하지 않으므로, 상태를 no-op으로 두거나, 할당 단계를 사용해서 변수를 수정할 수 있습니다. 자세한 내용은 변수 할당을 참조하세요.

Step Functions

  "No-op": {
    "Type": "Pass",
    "Result": {
      "x-datum": 0.38,
      "y-datum": 622.22
    },
    "ResultPath": "$.coords",
    "Next": "End"
  }

Workflows YAML

  assign:
      - number: 5
      - number_plus_one: ${number+1}
      - other_number: 10
      - string: "hello"

Workflows JSON

  {
    "assign": [
      {
        "number": 5
      },
      {
        "number_plus_one": "${number+1}"
      },
      {
        "other_number": 10
      },
      {
        "string": "hello"
      }
    ]
  }

Succeed

Succeed 상태는 실행을 성공적으로 중지합니다.

Workflows에서는 기본 워크플로에서 return을 사용하여 워크플로 실행을 중지할 수 있습니다. 또한 마지막 단계를 수행하여 워크플로를 완료하거나(단계가 다른 단계로 이동되지 않는 경우), 값을 반환할 필요가 없으면 next: end를 사용해서 워크플로 실행을 중지할 수 있습니다. 자세한 내용은 워크플로 실행 완료를 참조하세요.

Step Functions

  "SuccessState": {
    "Type": "Succeed"
  }

Workflows YAML

  return: "Success!"
  next: end

Workflows JSON

  {
    "return": "Success!",
    "next": "end"
  }

Task

Task 상태는 상태 머신에서 수행되는 단일 작업 단위를 나타냅니다. 다음 Step Functions 예시에서는 Lambda 함수를 호출합니다. (활동은 작업이 수행되는 상태 머신에서 태스크를 지정할 수 있게 해주는 AWS Step Functions 기능입니다.)

Workflows 예시에서는 Cloud Run 함수 호출을 위해 HTTP 엔드포인트 호출이 수행됩니다. 또한 다른 Google Cloud 제품에 쉽게 액세스할 수 있게 해주는 커넥터를 사용할 수 있습니다. 그리고 워크플로를 일시중지하고 데이터를 폴링할 수 있습니다. 또는 콜백 엔드포인트를 사용해서 지정된 이벤트가 발생했음을 워크플로에 신호하고 폴링 없이 해당 이벤트로 대기할 수 있습니다.

Step Functions

  "HelloWorld": {
    "Type": "Task",
    "Resource": "arn:aws:lambda:us-east-1:123456789012:function:HelloFunction",
    "End": true
  }

Workflows YAML

  - HelloWorld:
      call: http.get
      args:
          url: https://REGION-PROJECT_ID.cloudfunctions.net/helloworld
      result: helloworld_result

Workflows JSON

  [
    {
      "HelloWorld": {
        "call": "http.get",
        "args": {
          "url": "https://REGION-PROJECT_ID.cloudfunctions.net/helloworld"
        },
        "result": "helloworld_result"
      }
    }
  ]

Wait

Wait 상태는 지정된 시간 동안 지속되도록 상태 머신을 지연시킵니다.

Workflows sys.sleep 표준 라이브러리 함수를 사용해서 지정된 시간(초)부터 최대 31,536,000(1년)까지 실행을 일시중지할 수 있습니다.

Step Functions

  "wait_ten_seconds" : {
    "Type" : "Wait",
    "Seconds" : 10,
    "Next": "NextState"
  }

Workflows YAML

  - someSleep:
      call: sys.sleep
      args:
          seconds: 10

Workflows JSON

  [
    {
      "someSleep": {
        "call": "sys.sleep",
        "args": {
          "seconds": 10
        }
      }
    }
  ]

예시: 마이크로서비스 조정

다음 Step Functions 예시에서는 주식 가격을 확인하고 매수 또는 매도 여부를 결정하고 결과를 보고합니다. 이 샘플에서 상태 머신은 매개변수를 전달하여 AWS Lambda와 통합되고, Amazon SQS 큐를 사용해서 사용자 승인을 요청하고, Amazon SNS 주제를 사용해서 쿼리 결과를 반환합니다.

{
      "StartAt": "Check Stock Price",
      "Comment": "An example of integrating Lambda functions in Step Functions state machine",
      "States": {
          "Check Stock Price": {
              "Type": "Task",
              "Resource": "CHECK_STOCK_PRICE_LAMBDA_ARN",
              "Next": "Generate Buy/Sell recommendation"
          },
          "Generate Buy/Sell recommendation": {
              "Type": "Task",
              "Resource": "GENERATE_BUY_SELL_RECOMMENDATION_LAMBDA_ARN",
              "ResultPath": "$.recommended_type",
              "Next": "Request Human Approval"
          },
          "Request Human Approval": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sqs:sendMessage.waitForTaskToken",
              "Parameters": {
                  "QueueUrl": "REQUEST_HUMAN_APPROVAL_SQS_URL",
                  "MessageBody": {
                      "Input.$": "$",
                      "TaskToken.$": "$$.Task.Token"
                  }
              },
              "ResultPath": null,
              "Next": "Buy or Sell?"
          },
          "Buy or Sell?": {
              "Type": "Choice",
              "Choices": [
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "buy",
                      "Next": "Buy Stock"
                  },
                  {
                      "Variable": "$.recommended_type",
                      "StringEquals": "sell",
                      "Next": "Sell Stock"
                  }
              ]
          },
          "Buy Stock": {
              "Type": "Task",
              "Resource": "BUY_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Sell Stock": {
              "Type": "Task",
              "Resource": "SELL_STOCK_LAMBDA_ARN",
              "Next": "Report Result"
          },
          "Report Result": {
              "Type": "Task",
              "Resource": "arn:PARTITION:states:::sns:publish",
              "Parameters": {
                  "TopicArn": "REPORT_RESULT_SNS_TOPIC_ARN",
                  "Message": {
                      "Input.$": "$"
                  }
              },
              "End": true
          }
      }
  }

Workflows로 마이그레이션

앞의 Step Functions 예시를 Workflows로 마이그레이션하기 위해서는 Cloud Run 함수를 통합하고, 해당 엔드포인트에서 HTTP 요청이 도착하기를 기다리는 콜백 엔드포인트를 지원하고, Amazon SNS 주제 대신 Workflows 커넥터를 사용하여 Pub/Sub 주제로 게시하도록 해서 상응하는 Workflows 단계를 만들 수 있습니다.

  1. 워크플로 만들기 단계를 수행하지만 아직 배포하지 마세요.

  2. 워크플로 정의에서 사용자 입력을 기다리는 콜백 엔드포인트를 만드는 단계와 Workflows 커넥터를 사용해서 Pub/Sub 주제에 게시하는 단계를 추가합니다. 예를 들면 다음과 같습니다.

    Workflows YAML

      ---
      main:
        steps:
          - init:
              assign:
                - projectId: '${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}'
                - region: LOCATION
                - topic: PUBSUB_TOPIC_NAME
          - Check Stock Price:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/CheckStockPrice"}
                auth:
                  type: OIDC
              result: stockPriceResponse
          - Generate Buy/Sell Recommendation:
              call: http.get
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuySellRecommend"}
                auth:
                  type: OIDC
                query:
                  price: ${stockPriceResponse.body.stock_price}
              result: recommendResponse
          - Create Approval Callback:
              call: events.create_callback_endpoint
              args:
                  http_callback_method: "GET"
              result: callback_details
          - Print Approval Callback Details:
              call: sys.log
              args:
                  severity: "INFO"
                  text: ${"Listening for callbacks on " + callback_details.url}
          - Await Approval Callback:
              call: events.await_callback
              args:
                  callback: ${callback_details}
                  timeout: 3600
              result: approvalResponse
          - Approval?:
              try:
                switch:
                  - condition: ${approvalResponse.http_request.query.response[0] == "yes"}
                    next: Buy or Sell?
              except:
                as: e
                steps:
                  - unknown_response:
                      raise: ${"Unknown response:" + e.message}
                      next: end
          - Buy or Sell?:
              switch:
                - condition: ${recommendResponse.body == "buy"}
                  next: Buy Stock
                - condition: ${recommendResponse.body == "sell"}
                  next: Sell Stock
                - condition: true
                  raise: ${"Unknown recommendation:" + recommendResponse.body}
          - Buy Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/BuyStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Sell Stock:
              call: http.post
              args:
                url: ${"https://" + region + "-" + projectId + ".cloudfunctions.net/SellStock"}
                auth:
                  type: OIDC
                body:
                  action: ${recommendResponse.body}
              result: message
          - Report Result:
              call: googleapis.pubsub.v1.projects.topics.publish
              args:
                topic: ${"projects/" + projectId + "/topics/" + topic}
                body:
                  messages:
                  - data: '${base64.encode(json.encode(message))}'
              next: end
      ...

    Workflows JSON

      {
        "main": {
          "steps": [
            {
              "init": {
                "assign": [
                  {
                    "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                  },
                  {
                    "region": "LOCATION"
                  },
                  {
                    "topic": [
                      "PUBSUB_TOPIC_NAME"
                    ]
                  }
                ]
              }
            },
            {
              "Check Stock Price": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/CheckStockPrice\"}",
                  "auth": {
                    "type": "OIDC"
                  }
                },
                "result": "stockPriceResponse"
              }
            },
            {
              "Generate Buy/Sell Recommendation": {
                "call": "http.get",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuySellRecommend\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "query": {
                    "price": "${stockPriceResponse.body.stock_price}"
                  }
                },
                "result": "recommendResponse"
              }
            },
            {
              "Create Approval Callback": {
                "call": "events.create_callback_endpoint",
                "args": {
                  "http_callback_method": "GET"
                },
                "result": "callback_details"
              }
            },
            {
              "Print Approval Callback Details": {
                "call": "sys.log",
                "args": {
                  "severity": "INFO",
                  "text": "${\"Listening for callbacks on \" + callback_details.url}"
                }
              }
            },
            {
              "Await Approval Callback": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "approvalResponse"
              }
            },
            {
              "Approval?": {
                "try": {
                  "switch": [
                    {
                      "condition": "${approvalResponse.http_request.query.response[0] == \"yes\"}",
                      "next": "Buy or Sell?"
                    }
                  ]
                },
                "except": {
                  "as": "e",
                  "steps": [
                    {
                      "unknown_response": {
                        "raise": "${\"Unknown response:\" + e.message}",
                        "next": "end"
                      }
                    }
                  ]
                }
              }
            },
            {
              "Buy or Sell?": {
                "switch": [
                  {
                    "condition": "${recommendResponse.body == \"buy\"}",
                    "next": "Buy Stock"
                  },
                  {
                    "condition": "${recommendResponse.body == \"sell\"}",
                    "next": "Sell Stock"
                  },
                  {
                    "condition": true,
                    "raise": "${\"Unknown recommendation:\" + recommendResponse.body}"
                  }
                ]
              }
            },
            {
              "Buy Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/BuyStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Sell Stock": {
                "call": "http.post",
                "args": {
                  "url": "${\"https://\" + region + \"-\" + projectId + \".cloudfunctions.net/SellStock\"}",
                  "auth": {
                    "type": "OIDC"
                  },
                  "body": {
                    "action": "${recommendResponse.body}"
                  }
                },
                "result": "message"
              }
            },
            {
              "Report Result": {
                "call": "googleapis.pubsub.v1.projects.topics.publish",
                "args": {
                  "topic": "${\"projects/\" + projectId + \"/topics/\" + topic}",
                  "body": {
                    "messages": [
                      {
                        "data": "${base64.encode(json.encode(message))}"
                      }
                    ]
                  }
                },
                "next": "end"
              }
            }
          ]
        }
      }

    다음을 바꿉니다.

    • LOCATION: 지원되는 Google Cloud 리전입니다. 예를 들면 us-central1입니다.
    • PUBSUB_TOPIC_NAME: Pub/Sub 주제의 이름입니다. 예를 들면 my_stock_example입니다.
  3. 배포 후 워크플로를 실행합니다.

  4. 워크플로를 실행하는 동안 일시중지하고 사용자가 콜백 엔드포인트를 호출하도록 기다립니다. 이렇게 하려면 curl 명령어를 사용할 수 있습니다. 예를 들면 다음과 같습니다.

    curl -H "Authorization: Bearer $(gcloud auth print-access-token)"
    https://workflowexecutions.googleapis.com/v1/projects/CALLBACK_URL?response=yes
    

    CALLBACK_URL을 콜백 엔드포인트의 나머지 경로로 바꿉니다.

  5. 워크플로가 성공적으로 완료되면 Pub/Sub 구독에서 메시지를 수신할 수 있습니다. 예를 들면 다음과 같습니다.

    gcloud pubsub subscriptions pull stock_example-sub  --format="value(message.data)" | jq
    

    출력 메시지는 다음과 비슷합니다(buy 또는 sell).

    {
      "body": "buy",
      "code": 200,
      "headers": {
      [...]
      }
    

다음 단계