콜백을 사용하여 대기

콜백을 사용하면 다른 서비스가 콜백 엔드포인트에 워크플로 실행을 재개하는 요청을 보낼 때까지 워크플로 실행을 대기시킬 수 있습니다.

콜백을 통해 지정된 이벤트가 발생했음을 워크플로에 알리고 폴링 없이 이벤트를 기다릴 수 있습니다. 예를 들어 제품이 다시 입고되거나 상품이 배송되었을 때 알림을 제공하거나 주문 검토, 번역 검증과 같은 인적 상호작용을 위해 대기하는 워크플로를 만들 수 있습니다.

이 페이지에서는 콜백 엔드포인트를 지원하여 외부 프로세스의 HTTP 요청이 해당 엔드포인트에 도달할 때까지 기다리는 워크플로를 만드는 방법을 보여줍니다. 또한 콜백 및 Eventarc 트리거를 사용하여 이벤트를 대기할 수 있습니다.

콜백에는 2가지 표준 라이브러리 기본 제공 함수를 사용해야 합니다.

콜백 요청을 수신하는 엔드포인트 만들기

이 엔드포인트에 도달하라는 HTTP 요청을 수신할 수 있는 콜백 엔드포인트를 만듭니다.

  1. 새 워크플로 생성 단계를 따르거나 업데이트할 기존 워크플로를 선택합니다. 하지만 아직 배포하지는 않습니다.
  2. 워크플로 정의에서 콜백 엔드포인트를 만드는 단계를 추가합니다.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    METHOD를 예상되는 HTTP 메서드(GET, HEAD, POST, PUT, DELETE, OPTIONS 또는 PATCH 중 하나)로 바꿉니다. 기본값은 POST입니다.

    생성된 엔드포인트의 URL을 저장하는 url 필드가 있는 callback_details 맵이 결과로 반환됩니다.

    이제 콜백 엔드포인트에서 지정된 HTTP 메서드로 수신 요청을 받을 준비가 되었습니다. URL을 Cloud Run 함수로 전달하는 등 생성된 엔드포인트의 URL을 사용하여 워크플로 외부의 프로세스에서 콜백을 트리거할 수 있습니다.

  3. 워크플로 정의에서 콜백 요청을 기다리는 단계를 추가합니다.

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    TIMEOUT을 워크플로가 요청을 대기해야 하는 최대 시간(초)으로 바꿉니다. 기본값은 43200(12시간)입니다. 요청이 수신되기 전에 시간이 경과하면 TimeoutError가 발생합니다.

    최대 실행 기간이 있습니다. 자세한 내용은 요청 한도를 참조하세요.

    이전 create_callback 단계의 callback_details 맵이 인수로 전달됩니다.

  4. 워크플로를 배포하여 생성 또는 업데이트를 완료합니다.

    요청이 수신되면 요청의 모든 세부정보가 callback_request 맵에 저장됩니다. 그런 다음 헤더, 본문, 쿼리 매개변수의 query 맵을 포함한 전체 HTTP 요청에 액세스할 수 있습니다. 예를 들면 다음과 같습니다.

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    HTTP 본문이 텍스트 또는 JSON이면 Workflows에서 본문 디코딩을 시도합니다. 그 밖의 경우에는 원시 바이트가 반환됩니다.

콜백 엔드포인트에 대한 요청 승인

콜백 엔드포인트로 요청을 전송하려면 Cloud Run 및 Cloud Run 함수와 같은 Google Cloud 서비스는 물론 타사 서비스에서 Identity and Access Management(IAM) 권한, 특히 workflows.callbacks.send(워크플로 호출자 역할에 포함)를 보유하여 요청에 대한 승인을 받아야 합니다.

직접 요청 보내기

서비스 계정에 단기 사용자 인증 정보를 만드는 가장 간단한 방법은 직접 요청하는 것입니다. 이 흐름에서는 2개의 ID, 즉 호출자와 사용자 인증 정보를 생성할 서비스 계정이 필요합니다. 이 페이지의 기본 워크플로 호출은 직접 요청의 예시입니다. 자세한 내용은 IAM을 사용하여 액세스 제어직접 요청 권한을 참조하세요.

OAuth 2.0 액세스 토큰 생성

애플리케이션이 콜백 엔드포인트를 호출하도록 승인하려면 워크플로와 연결된 서비스 계정의 OAuth 2.0 액세스 토큰을 생성하면 됩니다. (Workflows Editor 또는 Workflows AdminService Account Token Creator 역할에 대해) 필요한 권한이 있다면 generateAccessToken 메서드를 실행하여 직접 토큰을 생성할 수도 있습니다.

generateAccessToken 요청이 성공하면 반환된 응답 본문에 OAuth 2.0 액세스 토큰과 만료 시간이 포함됩니다. (기본적으로 OAuth 2.0 액세스 토큰은 최대 1시간 동안 유효합니다.) 예를 들면 다음과 같습니다.

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
그런 다음 아래 예시와 같이 콜백 엔드포인트 URL에 대한 curl 호출에 accessToken 코드를 사용할 수 있습니다.
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Cloud Run 함수의 OAuth 토큰 생성

워크플로와 동일한 서비스 계정을 사용해 Cloud Run 함수에서 콜백을 호출할 때 동일 프로젝트에서 OAuth 액세스 토큰이 함수에서 생성됩니다. 예를 들면 다음과 같습니다.

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

자세한 내용은 콜백을 사용한 인간 참여형(Human-In-The-Loop) 워크플로 만들기 튜토리얼을 참조하세요.

오프라인 액세스 요청

액세스 토큰은 주기적으로 만료되어 관련 API 요청에 대한 잘못된 사용자 인증 정보가 됩니다. 토큰과 연결된 범위의 오프라인 액세스를 요청한 경우에는 사용자에게 권한을 요청하지 않고 액세스 토큰을 갱신할 수 있습니다. 오프라인 액세스 요청은 사용자가 없을 때 Google API에 액세스해야 하는 모든 애플리케이션의 요구사항입니다. 자세한 내용은 액세스 토큰 갱신(오프라인 액세스)을 참조하세요.

콜백을 사용하여 단 한 번에 워크플로 호출

콜백은 완전히 멱등적입니다. 즉, 예기치 않은 결과나 부작용이 발생하지 않고 콜백이 실패하면 재시도할 수 있습니다.

콜백 엔드포인트를 만들면 URL이 수신 요청을 받을 준비가 되며 일반적으로 상응하는 await_callback 호출이 실행되기 전에 호출자에게 반환됩니다. 그러나 await_callback 단계가 실행될 때 콜백 URL이 아직 수신되지 않은 경우 엔드포인트가 수신될 때까지(또는 시간 초과가 발생할 때까지) 워크플로 실행이 차단됩니다. 응답을 수신하면 워크플로 실행이 재개되고 콜백이 처리됩니다.

create_callback_endpoint 단계를 실행하고 콜백 엔드포인트를 만든 후에는 워크플로에서 단일 콜백 슬롯을 사용할 수 있습니다. 콜백 요청이 수신되면 콜백이 처리될 수 있을 때까지 이 슬롯은 콜백 페이로드로 채워집니다. await_callback 단계가 실행되면 콜백이 처리되고 슬롯이 비워지고 다른 콜백에 사용할 수 있게 됩니다. 그런 다음 콜백 엔드포인트를 재사용하고 await_callback을 다시 호출할 수 있습니다.

await_callback이 한 번만 호출되고 두 번째 콜백이 수신되면 다음 시나리오 중 하나가 발생하고 적절한 HTTP 상태 코드가 반환됩니다.

  • HTTP 429: Too Many Requests는 첫 번째 콜백이 성공적으로 수신되었지만 처리되지 않았음을 나타냅니다. 처리 대기 중입니다. 두 번째 콜백은 워크플로에서 거부됩니다.

  • HTTP 200: Success는 첫 번째 콜백이 성공적으로 수신되었고 응답이 반환되었음을 나타냅니다. await_callback이 두 번째 호출되지 않는 한 두 번째 콜백이 저장만 되고 처리되지 않을 수 있습니다. 그 전에 워크플로가 종료되면 두 번째 콜백 요청은 처리되지 않고 삭제됩니다.

  • HTTP 404: Page Not Found는 워크플로가 더 이상 실행되고 있지 않음을 나타냅니다. 첫 번째 콜백이 처리되고 워크플로가 완료되었거나 워크플로가 실패했습니다. 이를 확인하려면 워크플로 실행 상태를 쿼리해야 합니다.

병렬 콜백

단계가 동시에 실행되고 상위 스레드에서 콜백이 생성되고 하위 단계에서 대기되는 경우 앞에서 설명한 것과 동일한 패턴이 적용됩니다.

다음 예에서는 create_callback_endpoint 단계가 실행될 때 콜백 슬롯이 하나 생성됩니다. await_callback에 대한 이후의 각 호출은 새 콜백 슬롯을 엽니다. 콜백 요청이 실행되기 전에 모든 스레드가 실행되고 대기 중이면 10개의 콜백을 동시에 실행할 수 있습니다. 추가 콜백을 수행할 수 있지만 저장만 되고 처리되지 않습니다.

YAML

  - createCallbackInParent:
    call: events.create_callback_endpoint
    args:
      http_callback_method: "POST"
    result: callback_details
  - parallelStep:
    parallel:
        for:
            range: [1, 10]
            value: loopValue
            steps:
              - waitForCallbackInChild:
                  call: events.await_callback
                  args:
                      callback: ${callback_details}

JSON

  [
    {
      "createCallbackInParent": {
        "call": "events.create_callback_endpoint",
        "args": {
          "http_callback_method": "POST"
        },
        "result": "callback_details"
      }
    },
    {
      "parallelStep": {
        "parallel": {
          "for": {
            "range": [
              1,
              10
            ],
            "value": "loopValue",
            "steps": [
              {
                "waitForCallbackInChild": {
                  "call": "events.await_callback",
                  "args": {
                    "callback": "${callback_details}"
                  }
                }
              }
            ]
          }
        }
      }
    }
  ]

콜백은 각 호출이 await_callback에 대한 브랜치에 의해 수행된 순서와 동일한 순서로 처리됩니다. 그러나 브랜치의 실행 순서는 확정적이지 않으며 다양한 경로를 사용하여 결과에 도달할 수 있습니다. 자세한 내용은 병렬 단계를 참조하세요.

기본 콜백 워크플로 사용해 보기

기본 워크플로를 만든 후 curl을 사용하여 워크플로의 콜백 엔드포인트에 대한 호출을 테스트할 수 있습니다. 워크플로가 있는 프로젝트에 필요한 Workflows Editor 또는 Workflows Admin 권한이 있어야 합니다.

  1. 다음 워크플로를 만들고 배포한 후 실행합니다.

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    워크플로를 실행한 후 콜백 요청을 수신하거나 제한 시간이 경과할 때까지 워크플로 실행은 ACTIVE 상태입니다.

  2. 실행 상태를 확인하고 콜백 URL을 가져옵니다.

    콘솔

    1. Google Cloud 콘솔에서 Workflows 페이지로 이동합니다.

      Workflows로 이동
    2. 방금 실행한 워크플로의 이름을 클릭합니다.

      워크플로 실행 상태가 표시됩니다.

    3. 로그 탭을 클릭합니다.
    4. 다음과 같은 로그 항목을 찾습니다.

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
    5. 콜백 URL을 복사해 다음 명령어에 사용합니다.

    gcloud

    1. 먼저 실행 ID를 가져옵니다.
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      DURATION을 반환된 로그 항목을 제한할 적절한 시간으로 바꿉니다(워크플로를 여러 번 실행한 경우).

      예를 들어 --freshness=t10m은 10분 이전의 로그 항목을 반환합니다. 자세한 내용은 gcloud topic datetimes를 참조하세요.

      실행 ID가 반환됩니다. 콜백 URL도 textPayload 필드에 반환됩니다. 다음 단계에서 사용할 수 있도록 두 값을 모두 복사합니다.

    2. 다음 명령어를 실행합니다.
      gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
      워크플로 실행 상태가 반환됩니다.
  3. 이제 curl 명령어를 사용하여 콜백 엔드포인트를 호출하면 됩니다.
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL

    POST 엔드포인트의 경우 Content-Type 표현 헤더를 사용해야 합니다. 예를 들면 다음과 같습니다.

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL

    CALLBACK_URL을 이전 단계에서 복사한 URL로 바꿉니다.

  4. Google Cloud Console 또는 Google Cloud CLI를 통해 워크플로 실행 상태가 현재 SUCCEEDED인지 확인합니다.
  5. 반환된 textPayload가 있는 다음과 비슷한 로그 항목을 찾습니다.
    Received {"body":null,"headers":...

샘플

이러한 샘플은 문법을 보여줍니다.

시간 초과 오류 포착

시간 초과 오류를 포착하고 시스템 로그에 오류를 작성하여 이 샘플을 이전 샘플에 추가합니다.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

재시도 루프에서 콜백 대기

이 샘플에서는 재시도 단계를 구현하여 이전 샘플을 수정합니다. 커스텀 재시도 조건자를 사용하면 워크플로에서 시간 초과 발생 시 경고를 로깅한 후 콜백 엔드포인트에서 대기를 최대 5회까지 재시도합니다. 콜백을 수신하기 전에 재시도 할당량이 소진되면 최종 시간 초과 오류로 인해 워크플로가 실패합니다.

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

다음 단계