租用拉取任务

任务进入拉取队列后,就可供工作器租用。处理完任务后,工作器必须将其删除。

准备工作

重要的上下文信息

  • 此方法仅适用于在标准环境下的服务中运行的工作器。
  • 使用拉取队列时,您需要根据处理量来增减工作器。

租用任务

任务进入队列后,工作器可以使用 taskqueue.Lease 方法租用其中的一项或多项任务。对于最近使用 taskqueue.Add 添加的任务,可能要延迟一小段时间才可通过 taskqueue.Lease 租用。

在申请租用时,您可以指定要租用的任务数(最多 1000 个任务)和租用时长(以秒为单位,最长一周)。租用时长必须足够长,以确保速度最慢的任务也能在租约到期前完成。您可以使用 taskqueue.ModifyLease 修改任务租用。

某一任务被租用后,其他工作器就无法再对其进行处理;在租期到期之前,该任务将始终不可用。

以下代码示例从队列 pull-queue 中租用了 100 个任务,租用期为 1 小时。

tasks, err := taskqueue.Lease(ctx, 100, "pull-queue", 3600)

使用任务标记执行批处理

并非所有任务都是一样的;您的代码可以“标记”任务,然后按标记选择要租用的任务。标记可以充当过滤条件。以下代码示例演示了如何标记任务并随后按标记租用任务:

_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("parse"), Method: "PULL", Tag: "parse",
}, "pull-queue")
_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("render"), Method: "PULL", Tag: "render",
}, "pull-queue")

// leases render tasks, but not parse
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "render")

// Leases up to 100 tasks that have same tag.
// Tag is that of "oldest" task by ETA.
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "")

调节轮询速率

如果工作器轮询队列以获取要租用的任务,则应检测尝试租用任务的速度是否快于队列提供任务的速度。如果发生此故障,taskqueue.Lease 将返回一个退避错误。

您的代码必须处理这些错误,停止调用 taskqueue.Lease,然后再重试。要避免此问题,请考虑在调用 taskqueue.Lease 时设置较长的 RPC 截止时间。如果租用请求返回的任务列表为空,您也应执行退避。 如果每个队列每秒生成 10 个以上的 LeaseTasks 请求,那么只有前 10 个请求将返回结果。如果请求数超过此限值,系统将返回 OK,且结果为零。

在 Google Cloud 控制台中监控任务

要查看有关应用中所有任务和队列的信息,请执行以下操作:

  1. 打开 Google Cloud 控制台中的 Cloud Tasks 页面,在类型列中查找拉取值。

    转至 Cloud Tasks

  2. 点击您想了解的队列的名称,以打开队列详情页面。该页面会显示所选队列中的所有任务。

删除任务

工作器完成某项任务后,需要将该任务从队列中删除。如果您发现工作器已完成处理的任务仍停留在队列中,则说明工作器可能发生故障;在这种情况下,任务将由另一个工作器处理。

通过将任务列表传递给 taskqueue.DeleteMulti,您可以删除此列表(例如,由 taskqueue.Lease 返回的任务列表):

tasks, err = taskqueue.Lease(ctx, 100, "pull-queue", 3600)
// Perform some work with the tasks here

taskqueue.DeleteMulti(ctx, tasks, "pull-queue")