回调允许工作流执行操作等待其他服务向回调端点发出请求;该请求将继续执行工作流。
借助回调,您可以告知工作流指定事件已发生,然后等待该事件而不进行轮询。例如,您可以 创建工作流程,以便在商品补货或 商品已发货;或等待进行人际互动(例如审核 订单或验证翻译。
本页面介绍了如何创建支持回调端点的工作流, 该过程会等待来自外部进程的 HTTP 请求到达 端点。您还可以 使用回调和 Eventarc 触发器等待事件。
回调需要使用两个标准库内置函数:
events.create_callback_endpoint
- 创建需要指定 HTTP 方法的回调端点events.await_callback
- 等待在给定端点接收回调
创建接收回调请求的端点
创建一个可以接收到达相应端点的 HTTP 请求的回调端点。
- 按照相应步骤创建新工作流,或选择现有工作流进行更新,但请勿进行部署。
- 在工作流的定义中,添加一个步骤来创建回调端点:
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "METHOD" result: callback_details
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "METHOD" }, "result": "callback_details" } } ]
将
METHOD
替换为所需的 HTTP 方法,即GET
、HEAD
、POST
、PUT
、DELETE
、OPTIONS
或PATCH
之一。默认值为POST
。结果是映射
callback_details
,其中包含一个用于存储已创建端点的网址的url
字段。回调端点现在可以使用指定的 HTTP 方法接收传入请求。创建的端点的网址可用于触发 来自工作流外部的进程的回调;例如,通过传递 Cloud Run 函数的网址。
- 在工作流的定义中,添加一个步骤来等待回调请求:
YAML
- await_callback: call: events.await_callback args: callback: ${callback_details} timeout: TIMEOUT result: callback_request
JSON
[ { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": TIMEOUT }, "result": "callback_request" } } ]
将
TIMEOUT
替换为工作流应等待请求的秒数上限。默认值为 43200(12 小时)。如果在收到请求之前超过此时间,则会引发TimeoutError
。请注意,执行时长存在上限。如需了解详情,请参阅 请求限制。
前面
create_callback
步骤中的callback_details
映射作为参数传递。 - 部署您的工作流以完成创建或更新。
收到请求后,请求的所有详细信息都会存储在
callback_request
映射中。然后,您可以访问整个 HTTP 请求,包括其标头、正文以及任何查询参数的query
映射。例如:YAML
http_request: body: headers: {...} method: GET query: {} url: "/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" received_time: 2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667 type: HTTP
JSON
{ "http_request":{ "body":null, "headers":{ ... }, "method":"GET", "query":{ }, "url":"/v1/projects/350446661175/locations/us-central1/workflows/workflow-1/executions/46804f42-dc83-46d6-87e4-93962866ed81/callbacks/49c80102-74d2-49cd-a70e-805a9fded94f_2de9b413-6332-412d-99c3-d7e9b6eeeda2" }, "received_time":"2021-06-24 12:49:16.988072651 -0700 PDT m=+742581.005780667", "type":"HTTP" }
如果 HTTP 正文是文本或 JSON,则 Workflows 将尝试对正文进行解码;否则,系统会返回原始字节。
对向回调端点发出的请求进行授权
如需向回调端点发送请求,您必须通过具备适当的 Identity and Access Management (IAM) 权限(尤其是 workflows.callbacks.send
,包含在 Workflows Invoker 角色中)来授权 Google Cloud 服务(例如 Cloud Run 和 Cloud Run Functions)以及第三方服务执行此操作。
发出直接请求
为服务账号创建短期有效凭据的最简单方法是发出直接请求。此流程涉及两种身份:调用者以及为其创建凭据的服务账号。本页面上的基本工作流程调用是一个直接请求示例。如需了解详情,请参阅使用 IAM 控制访问权限和直接请求权限。
生成 OAuth 2.0 访问令牌
如需授权应用调用回调端点,您可以为与工作流关联的服务账号生成 OAuth 2.0 访问令牌。假设您拥有所需的权限(Workflows Editor
或 Workflows Admin
和 Service Account Token Creator
角色对应的权限),您还可以通过运行 generateAccessToken
方法自行生成令牌。
如果 generateAccessToken
请求成功,则返回的响应正文会包含 OAuth 2.0 访问令牌和到期时间。(默认情况下,OAuth 2.0 访问令牌的有效期最长为 1 小时。)例如:
{ "accessToken": "eyJ0eXAi...NiJ9", "expireTime": "2020-04-07T15:01:23.045123456Z" }
accessToken
代码,如以下示例所示:
curl -X GET -H "Authorization: Bearer ACCESS_TOKEN_STRING" CALLBACK_URL
curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer ACCESS_TOKEN_STRING" -d '{"foo" : "bar"}' CALLBACK_URL
为 Cloud Run 函数生成 OAuth 令牌
如果您要使用与工作流相同的服务账号并在同一项目中从 Cloud Run 函数调用回调,则可以在该函数本身中生成 OAuth 访问令牌。例如:
如需了解更多背景信息,请参阅有关使用回调创建人机协同工作流的教程。
请求离线访问权限
访问令牌会定期过期,并成为相关 API 请求的无效凭据。如果您请求离线访问与令牌关联的范围,则可以刷新访问令牌,而不提示用户授予权限。在用户离线时,所有需要访问 Google API 的应用都必须请求离线访问。如需了解详情,请参阅刷新访问令牌(离线访问)。
使用回调调用工作流一次
回调是完全幂等的,这意味着,如果回调失败,您可以重试,而不会产生意外结果或副作用。
在您创建回调端点后,该网址就可以接收传入了
请求,并且通常会在相应调用之前返回给调用方
await_callback
创建完成。不过,如果在执行 await_callback
步骤时尚未收到回调网址,则工作流执行将被阻塞,直到收到端点(或发生超时)为止。收到回调后,工作流执行会恢复,并处理回调。
执行 create_callback_endpoint
步骤并创建回调后
端点,那么工作流只可使用一个回调槽。收到回调请求后,此槽会填充回调载荷,直到可以处理回调为止。执行 await_callback
步骤时,
回调得到处理,并且相应槽位已清空,以供其他
回调。然后,您可以重复使用回调端点并再次调用 await_callback
。
如果 await_callback
仅被调用一次,但收到第二个回调,则将返回一个回调
会出现以下情况,相应的 HTTP 状态代码就是
返回:
HTTP
429: Too Many Requests
表示成功收到了第一个回调,但尚未处理;它仍在等待处理。第二个回调会被工作流程拒绝。HTTP
200: Success
表示已收到第一个回调 并且返回了响应系统会存储第二个回调, 可能永远不会被处理,除非再次调用await_callback
。如果工作流在此之前结束,则系统将永远不会处理第二个回调请求,并将其舍弃。HTTP
404: Page Not Found
表示工作流不再运行。 第一个回调已处理完毕且工作流已完成,或者 工作流失败。要确定这一点,您需要查询 执行状态。
并行回调
当步骤并行执行且父线程创建回调并在子步骤中等待时,会遵循与前面所述相同的模式。
在以下示例中,执行 create_callback_endpoint
步骤时,
一个回调槽位。后续每次调用 await_callback
都会打开一个
新的回调槽位。可以同时进行十个回调,前提是所有线程都
运行并等待,然后再发出回调请求。可以发出其他回调,但这些回调会被存储,而不会被处理。
YAML
- createCallbackInParent: call: events.create_callback_endpoint args: http_callback_method: "POST" result: callback_details - parallelStep: parallel: for: range: [1, 10] value: loopValue steps: - waitForCallbackInChild: call: events.await_callback args: callback: ${callback_details}
JSON
[ { "createCallbackInParent": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "POST" }, "result": "callback_details" } }, { "parallelStep": { "parallel": { "for": { "range": [ 1, 10 ], "value": "loopValue", "steps": [ { "waitForCallbackInChild": { "call": "events.await_callback", "args": { "callback": "${callback_details}" } } } ] } } } } ]
请注意,回调的处理顺序与
分支到 await_callback
。不过,分支的执行顺序是不确定的,可以使用各种路径得出结果。有关
请参阅
并行步骤。
试用基本的回调工作流
您可以创建基本工作流,然后使用 curl 测试对该工作流回调端点的调用。您必须对工作流所在的项目具有必要的 Workflows Editor
或 Workflows Admin
权限。
- 创建和部署以下工作流,然后执行该工作流。
YAML
- create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request - print_callback_request: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} - return_callback_result: return: ${callback_request.http_request}
JSON
[ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" } }, { "print_callback_request": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\\"Received \\" + json.encode_to_string(callback_request.http_request)}" } } }, { "return_callback_result": { "return": "${callback_request.http_request}" } } ]
执行工作流后,工作流执行状态为
ACTIVE
,直到收到回调请求或超时为止。 - 确认执行状态并检索回调网址:
控制台
-
在 Google Cloud 控制台中,前往 工作流程页面:
转到 Workflows -
点击您刚刚执行的工作流的名称。
随即将显示工作流执行的状态。
- 点击日志标签页。
查找如下日志条目:
Listening for callbacks on https://workflowexecutions.googleapis.com/v1/projects/...
- 复制回调网址,以便在下一个命令中使用。
gcloud
- 首先,检索执行 ID:
将gcloud logging read "Listening for callbacks" --freshness=DURATION
DURATION
替换为适当的金额 限制返回的日志条目(如果您已执行 工作流)。例如,
--freshness=t10m
会返回不早于 10 分钟的日志条目。如需了解详情,请参阅gcloud topic datetimes
。系统会返回执行 ID。请注意,系统还会在
textPayload
字段中返回回调网址。复制这两个值以在以下步骤中使用。 - 运行以下命令:
返回工作流执行的状态。gcloud workflows executions describe WORKFLOW_EXECUTION_ID --workflow=WORKFLOW_NAME
-
- 您现在可以使用 curl 命令调用回调端点:
curl -X GET -H "Authorization: Bearer $(gcloud auth print-access-token)" CALLBACK_URL
请注意,对于
POST
端点,您必须使用Content-Type
表示法标头。例如:curl -X POST -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d '{"foo" : "bar"}' CALLBACK_URL
将
CALLBACK_URL
替换为您在上一步中复制的网址。 - 通过 Google Cloud 控制台或使用 Google Cloud CLI,确认工作流执行状态现在是否为
SUCCEEDED
。 - 查找返回
textPayload
的日志条目,类似于以下内容:Received {"body":null,"headers":...
示例
这些示例演示了语法。
捕获超时错误
此示例通过捕获任何超时错误并将错误写入系统日志来添加到上一个示例中。
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request except: as: e steps: - log_error: call: sys.log args: severity: "ERROR" text: ${"Received error " + e.message} next: end - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)}
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 3600 }, "result": "callback_request" }, "except": { "as": "e", "steps": [ { "log_error": { "call": "sys.log", "args": { "severity": "ERROR", "text": "${\"Received error \" + e.message}" }, "next": "end" } } ] } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] } }
在重试循环中等待回调
此示例通过实现重试步骤来修改前面的示例。使用自定义重试谓词,工作流会在出现超时时记录警告,然后在回调端点上重试等待,最多重试五次。如果在接收回调之前重试配额用尽,则最终超时错误会导致工作流失败。
YAML
main: steps: - create_callback: call: events.create_callback_endpoint args: http_callback_method: "GET" result: callback_details - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: try: call: events.await_callback args: callback: ${callback_details} timeout: 60.0 result: callback_request retry: predicate: ${log_timeout} max_retries: 5 backoff: initial_delay: 1 max_delay: 10 multiplier: 2 - print_callback_result: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} log_timeout: params: [e] steps: - when_to_repeat: switch: - condition: ${"TimeoutError" in e.tags} steps: - log_error_and_retry: call: sys.log args: severity: "WARNING" text: "Timed out waiting for callback, retrying" - exit_predicate: return: true - otherwise: return: false
JSON
{ "main": { "steps": [ { "create_callback": { "call": "events.create_callback_endpoint", "args": { "http_callback_method": "GET" }, "result": "callback_details" } }, { "print_callback_details": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Listening for callbacks on \" + callback_details.url}" } } }, { "await_callback": { "try": { "call": "events.await_callback", "args": { "callback": "${callback_details}", "timeout": 60 }, "result": "callback_request" }, "retry": { "predicate": "${log_timeout}", "max_retries": 5, "backoff": { "initial_delay": 1, "max_delay": 10, "multiplier": 2 } } } }, { "print_callback_result": { "call": "sys.log", "args": { "severity": "INFO", "text": "${\"Received \" + json.encode_to_string(callback_request.http_request)}" } } } ] }, "log_timeout": { "params": [ "e" ], "steps": [ { "when_to_repeat": { "switch": [ { "condition": "${\"TimeoutError\" in e.tags}", "steps": [ { "log_error_and_retry": { "call": "sys.log", "args": { "severity": "WARNING", "text": "Timed out waiting for callback, retrying" } } }, { "exit_predicate": { "return": true } } ] } ] } }, { "otherwise": { "return": false } } ] } }