工作流程執行作業可透過回呼,等待其他服務向回呼端點傳送要求;該要求會繼續執行工作流程。
透過回呼,您可以向工作流程發出信號,表示已發生指定事件,並等待該事件,不必輪詢。舉例來說,您可以建立工作流程,在產品補貨或商品出貨時收到通知;也可以建立工作流程,等待人工介入,例如審查訂單或驗證翻譯內容。
本頁面說明如何建立支援回呼端點的工作流程,並等待外部程序傳送 HTTP 要求至該端點。您也可以使用回呼和 Eventarc 觸發條件等待事件。
回呼需要使用兩個標準程式庫內建函式:
events.create_callback_endpoint
:建立回呼端點,預期使用指定的 HTTP 方法events.await_callback
:等待在指定端點接收回呼
建立接收回呼要求的端點
建立可接收 HTTP 要求的端點。
- 按照步驟建立新工作流程,或選擇現有工作流程更新,但請勿部署。
- 在工作流程定義中,新增建立回呼端點的步驟:
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "METHOD" result: callback_details
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "METHOD" }, "result": "callback_details" } } ]
將
METHOD
替換為預期的 HTTP 方法,可以是GET
、HEAD
、POST
、PUT
、DELETE
、OPTIONS
或PATCH
。預設值為POST
。結果會是地圖
callback_details
,其中包含儲存所建立端點網址的url
欄位。回呼端點現在已準備好接收使用指定 HTTP 方法傳入的要求。建立的端點網址可用於從工作流程外部的程序觸發回呼,例如將網址傳遞至 Cloud Run 函式。
- 在工作流程的定義中,新增等待回呼要求的步驟:
YAML
- await_callback: call: events.await_callback args: callback: ${callback_details} timeout: TIMEOUT result: callback_request
JSON
[ { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": TIMEOUT }, "result": "callback_request" } } ]
將
TIMEOUT
替換為工作流程等待要求的秒數上限。預設值為 43200 (12 小時)。如果在收到要求前經過這段時間,系統會引發TimeoutError
。請注意,執行時間有上限。詳情請參閱要求限制。
先前步驟中的
callback_details
地圖會以引數形式傳遞。create_callback
- 部署工作流程,即可完成建立或更新程序。
收到要求後,系統會將要求的所有詳細資料儲存在
callback_request
對應中。接著,您就能存取整個 HTTP 要求,包括標頭、主體,以及任何查詢參數的query
對應。例如:YAML
http_request: body: headers: {...} method: GET query: {} url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667 type: HTTP
JSON
{ "http_request":{ "body":null, "headers":{ ... }, "method":"GET", "query":{ }, "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" }, "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667", "type":"HTTP" }
如果 HTTP 主體是文字或 JSON,工作流程會嘗試解碼主體;否則會傳回原始位元組。
授權對回呼端點的要求
如要將要求傳送至回呼端點,Cloud Run 和 Cloud Run 函式等服務,以及第三方服務,必須具備適當的 Identity and Access Management (IAM) 權限 (具體來說,是 workflows.callbacks.send
,包含在 Workflows 叫用者角色中),才能獲得授權。 Google Cloud
直接提出要求
為服務帳戶建立短期憑證最簡單的方法,就是提出直接要求。本流程涉及兩個身分:呼叫者和建立憑證的服務帳戶。這個頁面上的「基本工作流程」呼叫就是直接要求的範例。詳情請參閱「使用 IAM 控管存取權」和「直接要求權限」。
產生 OAuth 2.0 存取權杖
如要授權應用程式呼叫回呼端點,可以為與工作流程相關聯的服務帳戶產生 OAuth 2.0 存取權杖。假設您具備必要權限 (適用於 Workflows Editor
或 Workflows Admin
和 Service Account Token Creator
角色),您也可以執行 generateAccessToken
方法,自行產生權杖。
如果 generateAccessToken
要求成功,傳回的回應主體會包含 OAuth 2.0 存取權杖和到期時間。(根據預設,OAuth 2.0 存取權杖的有效期最長為 1 小時)。例如:
{ "accessToken": "eyJ0eXAi...NiJ9", "expireTime": "2020-04-07T15:01:23.045123456Z" }
accessToken
程式碼,呼叫回呼端點網址,如下列範例所示:curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL
產生 Cloud Run 函式的 OAuth 權杖
如果您是從 Cloud Run 函式 (使用與工作流程相同的服務帳戶,且位於相同專案中) 叫用回呼,則可以在函式本身產生 OAuth 存取權杖。例如:
如需更多背景資訊,請參閱使用回呼建立人為介入工作流程的教學課程。
要求離線存取權
存取權杖會定期過期,並成為相關 API 要求的無效憑證。如果您要求離線存取與權杖相關聯的範圍,即可重新整理存取權杖,不必提示使用者提供權限。如果應用程式需要在使用者不在時存取 Google API,就必須要求離線存取權。詳情請參閱「重新整理存取權杖 (離線存取)」。
使用回呼準確叫用工作流程一次
回呼完全等冪,也就是說,如果回呼失敗,您可以重試回呼,不會產生非預期的結果或副作用。
建立回呼端點後,網址即可接收傳入的要求,且通常會在對 await_callback
進行相應的呼叫前,傳回給呼叫端。不過,如果在執行 await_callback
步驟時尚未收到回呼網址,工作流程執行作業就會遭到封鎖,直到收到端點 (或發生逾時) 為止。收到後,工作流程執行作業就會繼續,並處理回呼。
執行 create_callback_endpoint
步驟並建立回呼端點後,工作流程會提供單一回呼位置。收到回呼要求時,系統會將回呼酬載填入這個時段,直到可以處理回呼為止。執行 await_callback
步驟時,系統會處理回呼,並清空插槽,以供其他回呼使用。接著,您可以重複使用回呼端點,並再次呼叫 await_callback
。
如果 await_callback
只呼叫一次,但收到第二次回呼,就會發生下列其中一種情況,並傳回適當的 HTTP 狀態碼:
HTTP
429: Too Many Requests
表示系統已順利收到第一個回呼,但尚未處理,仍處於等待處理狀態。工作流程拒絕了第二個回呼。HTTP
200: Success
表示已成功收到第一個回呼並傳回回應。第二個回呼會儲存起來,除非再次呼叫await_callback
,否則可能永遠不會處理。如果工作流程在此之前結束,系統就不會處理第二個回呼要求,並會捨棄該要求。HTTP
404: Page Not Found
表示工作流程已停止執行。 第一個回呼已處理完畢,工作流程也已完成,或是工作流程失敗。如要判斷這點,您需要查詢工作流程執行狀態。
平行回呼
如果步驟是平行執行,且父項執行緒建立回呼,並在子項步驟中等待,則會遵循先前所述的相同模式。
在以下範例中,執行 create_callback_endpoint
步驟時,會建立一個回呼插槽。後續每次呼叫 await_callback
都會開啟新的回呼時段。如果所有執行緒都在執行並等待,則在提出回呼要求前,最多可同時進行十次回呼。可能會建立其他回呼,但會儲存這些回呼,且絕不會處理。
YAML
- createCallbackInParent: call: events.create_callback_endpoint args: http_callback_method: "POST" result: callback_details - parallelStep: parallel: for: range: [1, 10] value: loopValue steps: - waitForCallbackInChild: call: events.await_callback args: callback: ${callback_details}
JSON
[ { "createCallbackInParent": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "POST" }, "result": "callback_details" } }, { "parallelStep": { "parallel": { "for": { "range": [ 1, 10 ], "value": "loopValue", "steps": [ { "waitForCallbackInChild": { "call": "events.await_callback", "args": { "callback": "${callback_details}" } } } ] } } } } ]
請注意,回呼的處理順序與分支對 await_callback
進行的呼叫順序相同。不過,分支的執行順序不確定,且可透過各種路徑得出結果。詳情請參閱平行步驟。
試用基本回呼工作流程
您可以建立基本工作流程,然後使用 curl 測試對該工作流程回呼端點的呼叫。您必須具備工作流程所在專案的必要 Workflows Editor
或 Workflows Admin
權限。
-
建立及部署下列工作流程,然後執行該工作流程。
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request - print_callback_request: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} - return_callback_result: return: ${callback_request.http_request}
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" } }, { "print_callback_request": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}" } } }, { "return_callback_result": { "return": "${callback_request.http_request}" } } ]
執行工作流程後,工作流程執行的狀態會是
ACTIVE
,直到收到回呼要求或逾時為止。 - 確認執行狀態並擷取回呼網址:
控制台
-
前往 Google Cloud 控制台的「Workflows」頁面:
前往「Workflows」頁面 -
按一下您剛執行的工作流程名稱。
畫面上會顯示工作流程的執行狀態。
- 按一下「記錄」分頁標籤。
尋找類似下列內容的記錄項目:
Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
- 複製回呼網址,以便在下一個指令中使用。
gcloud
- 首先,請擷取執行作業 ID:
將gcloud logging read "Listening for callbacks" --freshness=DURATION
DURATION
替換成適當的時間量,限制傳回的記錄項目 (如果您已多次執行工作流程)。舉例來說,
--freshness=t10m
會傳回不超過 10 分鐘的記錄項目。詳情請參閱gcloud topic datetimes
。系統會傳回執行作業 ID。請注意,回呼網址也會在
textPayload
欄位中傳回。複製這兩個值,以供後續步驟使用。 - 請執行下列指令:
傳回工作流程執行的狀態。gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
-
- 現在可以使用 curl 指令呼叫回呼端點:
curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
請注意,如果是
POST
端點,您必須使用Content-Type
表示法標頭。例如:curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
將
CALLBACK_URL
替換為上一個步驟複製的網址。 - 透過 Google Cloud 控制台或使用 Google Cloud CLI,確認工作流程執行的狀態現在為
SUCCEEDED
。 - 尋找傳回的
textPayload
記錄項目,該項目類似於下列內容:Received {"body":null,"headers":...
範例
這些範例說明語法。
擷取逾時錯誤
這個範例會擷取任何逾時錯誤,並將錯誤寫入系統記錄,是上一個範例的延伸版本。
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request except: as: e steps: - log_error: call: sys.log args: severity: "ERROR" text: ${"Received error " + e.message} next: end - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)}
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" }, "except": { "as": "e", "steps": [ { "log_error": { "call": "sys.log", "args": { "severity": "ERROR", "text": "${\"Received error \" + e.message}" }, "next": "end" } } ] } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] } }
在重試迴圈中等待回呼
這個範例會實作重試步驟,修改先前的範例。工作流程會使用自訂重試述詞,在發生逾時時記錄警告,然後重試等待回呼端點,最多五次。如果在收到回呼之前用盡重試配額,最終逾時錯誤會導致工作流程失敗。
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 60.0 result: callback_request retry: predicate: ${log_timeout} max_retries: 5 backoff: initial_delay: 1 max_delay: 10 multiplier: 2 - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} log_timeout: params: [e] steps: - when_to_repeat: switch: - condition: ${"TimeoutError" in e.tags} steps: - log_error_and_retry: call: sys.log args: severity: "WARNING" text: "Timed out waiting for callback, retrying" - exit_predicate: return: true - otherwise: return: false
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 60 }, "result": "callback_request" }, "retry": { "predicate": "${log_timeout}", "max_retries": 5, "backoff": { "initial_delay": 1, "max_delay": 10, "multiplier": 2 } } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] }, "log_timeout": { "params": [ "e" ], "steps": [ { "when_to_repeat": { "switch": [ { "condition": "${\"TimeoutError\" in e.tags}", "steps": [ { "log_error_and_retry": { "call": "sys.log", "args": { "severity": "WARNING", "text": "Timed out waiting for callback, retrying" } } }, { "exit_predicate": { "return": true } } ] } ] } }, { "otherwise": { "return": false } } ] } }