コールバックを使用して待機する

コールバックを使用すると、ワークフローの実行では、別のサービスがコールバック エンドポイントにリクエストを行うことを待ち、そのリクエストによってワークフローの実行を再開できます。

コールバックを使用すると、ポーリングせずに指定したイベントを待機して、その発生をワークフローに知らせることが可能です。たとえば、商品の在庫が補充されたときや商品が発送されたときに通知を受け取るワークフローを作成できます。あるいは、注文の確認や翻訳の確認など、人間による操作ができるように待機することも可能です。

このページでは、コールバック エンドポイントをサポートし、外部プロセスからの HTTP リクエストがそのエンドポイントに到着するまで待機するワークフローを作成する方法について説明します。コールバックと Eventarc トリガーを使用してイベントを待機することもできます。

コールバックでは、次の 2 つの標準ライブラリ組み込み関数を使用する必要があります。

  • events.create_callback_endpoint - 指定された HTTP メソッドを想定するコールバック エンドポイントを作成します。
  • events.await_callback - コールバックが指定されたエンドポイントで受信されるのを待ちます。

コールバック リクエストを受信するエンドポイントを作成する

そのエンドポイントに到着する HTTP リクエストを受信できるコールバック エンドポイントを作成します。

  1. 手順に沿って新しいワークフローを作成するか、既存のワークフローを選択して更新しますが、まだデプロイはしないでください。
  2. ワークフローの定義で、コールバック エンドポイントを作成するステップを追加します。

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "METHOD"
            result: callback_details
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "METHOD"
              },
              "result": "callback_details"
            }
          }
        ]
          

    METHOD は、予想される HTTP メソッド(GETHEADPOSTPUTDELETEOPTIONSPATCH のいずれか)に置き換えます。デフォルト値は POST です。

    結果のマップは callback_details で、作成したエンドポイントの URL を格納する url フィールドが含まれます。

    これで、コールバック エンドポイントは、指定された HTTP メソッドを使用して受信リクエストを受信できるようになりました。作成したエンドポイントの URL を使用して、ワークフロー外部のプロセスからのコールバックをトリガーできます。たとえば、URL を Cloud Functions の関数に渡します。

  3. ワークフローの定義で、コールバック リクエストを待つステップを追加します。

    YAML

        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: TIMEOUT
            result: callback_request
        

    JSON

        [
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": TIMEOUT
              },
              "result": "callback_request"
            }
          }
        ]
          

    TIMEOUT は、ワークフローがリクエストを待機する最大秒数に置き換えます。デフォルトは 43200 (12 時間) です。リクエストを受信する前に時間が経過した場合は、TimeoutError が発生します。

    最大実行時間があるので注意してください。詳細については、リクエストの上限をご覧ください。

    引数には、以前の create_callback ステップの callback_details マップが渡されます。

  4. ワークフローをデプロイして、作成または更新を完了します。

    リクエストを受信すると、リクエストのすべての詳細が callback_request マップに保存されます。そうすると、ヘッダー、本文、クエリ パラメータの query マップなど、HTTP リクエスト全体にアクセスできます。例:

    YAML

        http_request:
          body:
          headers: {...}
          method: GET
          query: {}
          url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
        received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667
        type: HTTP
        

    JSON

        {
           "http_request":{
              "body":null,
              "headers":{
                 ...
              },
              "method":"GET",
              "query":{
              },
              "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2"
           },
           "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667",
           "type":"HTTP"
        }
          

    HTTP 本文がテキストまたは JSON の場合、ワークフローは本文のデコードを試みます。それ以外の場合は、生のバイトが返されます。

コールバック エンドポイントへのリクエストを承認する

リクエストをコールバック エンドポイントに送信するには、Cloud Run、Cloud Functions、Google Cloud サービスなどの Google Cloud サービス、サードパーティのサービスに対して、適切な Identity and Access Management(IAM)権限(具体的には Workflows 起動元ロールに含まれている workflows.callbacks.send)を設定して、処理を行うことを許可する必要があります。

直接リクエストを行う

有効期間が短いサービス アカウントの認証情報を作成する最も簡単な方法は、直接リクエストを作成することです。このフローには、呼び出し元と認証情報が作成されるサービス アカウントの 2 つの ID が含まれます。このページの基本的なワークフローの呼び出しは、直接リクエストの例です。詳細については、IAM を使用してアクセスを制御する直接リクエスト権限をご覧ください。

OAuth 2.0 アクセス トークンを生成する

アプリケーションがコールバック エンドポイントを呼び出すことを承認するには、ワークフローに関連付けられたサービス アカウントの OAuth 2.0 アクセス トークンを生成します。必要な権限(Workflows EditorWorkflows Admin と、Service Account Token Creator ロール)が付与されている場合は、generateAccessToken メソッドを実行して自分でトークンを生成することもできます。

generateAccessToken リクエストが成功すると、返されたレスポンス本文に OAuth 2.0 アクセス トークンと有効期限が含まれます(デフォルトでは、OAuth 2.0 アクセス トークンは最大 1 時間有効です)。次に例を示します。

  {
  "accessToken": "eyJ0eXAi...NiJ9",
  "expireTime": "2020-04-07T15:01:23.045123456Z"
  }
次の例のように、accessToken コードは、コールバック エンドポイント URL への curl 呼び出しで使用できます。
  curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
  curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL

Cloud Functions の関数の OAuth トークンを生成する

ワークフローと同じプロジェクト内で同じサービス アカウントを使用して Cloud Functions からコールバックを呼び出す場合は、関数自体で OAuth アクセス トークンを生成できます。次に例を示します。

const {GoogleAuth} = require('google-auth-library');
const auth = new GoogleAuth();
const token = await auth.getAccessToken();
console.log("Token", token);

try {
  const resp = await fetch(url, {
      method: 'POST',
      headers: {
          'accept': 'application/json',
          'content-type': 'application/json',
          'authorization': `Bearer ${token}`
      },
      body: JSON.stringify({ approved })
  });
  console.log("Response = ", JSON.stringify(resp));

  const result = await resp.json();
  console.log("Outcome = ", JSON.stringify(result));

さらに詳しい内容については、コールバックを使用した人間参加型ワークフローの作成に関するチュートリアルをご覧ください。

オフライン アクセスをリクエストする

アクセス トークンは定期的に期限が切れ、関連する API リクエストでは無効な認証情報になります。トークンに関連付けられたスコープへのオフライン アクセスをリクエストした場合は、ユーザーに権限を求めることなくアクセス トークンを更新できます。ユーザーがいないときに Google API にアクセスする必要があるアプリケーションには、オフライン アクセスのリクエストが必須です。詳細については、アクセス トークンの更新(オフライン アクセス)をご覧ください。

基本的なコールバック ワークフローを試す

基本的なワークフローを作成し、そのワークフローのコールバック エンドポイントの呼び出しを curl を使用してテストできます。ワークフローが存在するプロジェクトに対して、Workflows Editor または Workflows Admin の権限を持っている必要があります。

  1. 次のワークフローを作成してデプロイした後、それを実行します。

    YAML

        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            call: events.await_callback
            args:
                callback: ${callback_details}
                timeout: 3600
            result: callback_request
        - print_callback_request:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
        - return_callback_result:
            return: ${callback_request.http_request}
        

    JSON

        [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "call": "events.await_callback",
              "args": {
                "callback": "${callback_details}",
                "timeout": 3600
              },
              "result": "callback_request"
            }
          },
          {
            "print_callback_request": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          },
          {
            "return_callback_result": {
              "return": "${callback_request.http_request}"
            }
          }
        ]
          

    ワークフローの実行後、コールバック リクエストを受信するかタイムアウトが経過するまで、ワークフローの実行状態は ACTIVE になります。

  2. 実行状態を確認して、コールバック URL を取得します。

    コンソール

    1. Google Cloud コンソールの [ワークフロー] ページに移動します。

      [ワークフロー] に移動
    2. 実行したワークフローの名前をクリックします。

      ワークフローの実行状態が表示されます。

    3. [Logs] タブをクリックします。
    4. 次のようなログエントリを探します。

      Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
      
    5. 次のコマンドで使用するコールバック URL をコピーします。

    gcloud

    1. まず、実行 ID を取得します。
      gcloud logging read "Listening for callbacks" --freshness=DURATION
      
      DURATION は、返されるログエントリを制限する適切な時間に置き換えます(ワークフローを複数回実行している場合)。

      たとえば、--freshness=t10m は、10 分以内のログエントリを返します。詳細については、gcloud topic datetimes を参照してください:

      実行 ID が返されます。コールバック URL も textPayload フィールドで返されます。次の手順で使用するために両方の値をコピーします。

    2. 次のコマンドを実行します。
      
                    ワークフロー実行状態が返されます。
  3. これで、curl コマンドを使用してコールバック エンドポイントを呼び出せます。
    curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
    

    なお、POST エンドポイントには、Content-Type 表現ヘッダーを使用する必要があります。次に例を示します。

    curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
    

    CALLBACK_URL を前のステップでコピーした URL に置き換えます。

  4. Google Cloud Console または Google Cloud CLI を使用して、ワークフローの実行の状態が SUCCEEDED になったことを確認します。
  5. 次のような textPayload が返されているログエントリを探します。
    Received {"body":null,"headers":...
    

ここでは、構文の例を示します。

タイムアウト エラーをキャッチする

このサンプルでは、タイムアウト エラーをキャッチしてシステムログにエラーを書き込むコードを、前のサンプルに追加します。

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 3600
                result: callback_request
            except:
                as: e
                steps:
                    - log_error:
                        call: sys.log
                        args:
                            severity: "ERROR"
                            text: ${"Received error " + e.message}
                        next: end
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 3600
                },
                "result": "callback_request"
              },
              "except": {
                "as": "e",
                "steps": [
                  {
                    "log_error": {
                      "call": "sys.log",
                      "args": {
                        "severity": "ERROR",
                        "text": "${\"Received error \" + e.message}"
                      },
                      "next": "end"
                    }
                  }
                ]
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      }
    }
      

再試行ループでコールバックを待機する

このサンプルは、再試行ステップを実装して前のサンプルを変更します。 ワークフローは、カスタム再試行述語を使用して、タイムアウトが発生したときに警告をログに記録し、コールバック エンドポイントで待機を再試行します(最大 5 回)。 コールバックを受信する前に再試行割り当てを使い切ると、最後のタイムアウト エラーによりワークフローは失敗します。

YAML

    main:
      steps:
        - create_callback:
            call: events.create_callback_endpoint
            args:
                http_callback_method: "GET"
            result: callback_details
        - print_callback_details:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Listening for callbacks on " + callback_details.url}
        - await_callback:
            try:
                call: events.await_callback
                args:
                    callback: ${callback_details}
                    timeout: 60.0
                result: callback_request
            retry:
                predicate: ${log_timeout}
                max_retries: 5
                backoff:
                    initial_delay: 1
                    max_delay: 10
                    multiplier: 2
        - print_callback_result:
            call: sys.log
            args:
                severity: "INFO"
                text: ${"Received " + json.encode_to_string(callback_request.http_request)}
    log_timeout:
        params: [e]
        steps:
          - when_to_repeat:
              switch:
                - condition: ${"TimeoutError" in e.tags}
                  steps:
                      - log_error_and_retry:
                          call: sys.log
                          args:
                              severity: "WARNING"
                              text: "Timed out waiting for callback, retrying"
                      - exit_predicate:
                          return: true
          - otherwise:
              return: false
    

JSON

    {
      "main": {
        "steps": [
          {
            "create_callback": {
              "call": "events.create_callback_endpoint",
              "args": {
                "http_callback_method": "GET"
              },
              "result": "callback_details"
            }
          },
          {
            "print_callback_details": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Listening for callbacks on \" + callback_details.url}"
              }
            }
          },
          {
            "await_callback": {
              "try": {
                "call": "events.await_callback",
                "args": {
                  "callback": "${callback_details}",
                  "timeout": 60
                },
                "result": "callback_request"
              },
              "retry": {
                "predicate": "${log_timeout}",
                "max_retries": 5,
                "backoff": {
                  "initial_delay": 1,
                  "max_delay": 10,
                  "multiplier": 2
                }
              }
            }
          },
          {
            "print_callback_result": {
              "call": "sys.log",
              "args": {
                "severity": "INFO",
                "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}"
              }
            }
          }
        ]
      },
      "log_timeout": {
        "params": [
          "e"
        ],
        "steps": [
          {
            "when_to_repeat": {
              "switch": [
                {
                  "condition": "${\"TimeoutError\" in e.tags}",
                  "steps": [
                    {
                      "log_error_and_retry": {
                        "call": "sys.log",
                        "args": {
                          "severity": "WARNING",
                          "text": "Timed out waiting for callback, retrying"
                        }
                      }
                    },
                    {
                      "exit_predicate": {
                        "return": true
                      }
                    }
                  ]
                }
              ]
            }
          },
          {
            "otherwise": {
              "return": false
            }
          }
        ]
      }
    }
      

次のステップ