租用拉取任务

任务进入拉取队列后,就可供工作器租用。处理完任务后,工作器必须将其删除。

准备工作

重要的上下文信息

  • 此方法仅适用于在标准环境下的服务中运行的工作器。
  • 使用拉取队列时,您需要根据处理量来增减工作器。

租用任务

任务进入队列后,工作器可以使用 leaseTasks() 方法租用其中的一项或多项任务。对于最近使用 add() 添加的任务,可能要延迟一小段时间才可通过 leaseTasks() 租用。

在申请租用时,您可以指定要租用的任务数(最多 1000 个任务)和租用时长(以秒为单位,最长一周)。租用时长必须足够长,以确保速度最慢的任务也能在租约到期前完成。您可以使用 modifyTaskLease() 修改任务租用。

某一任务被租用后,其他工作器就无法再对其进行处理;在租期到期之前,该任务将始终不可用。

以下代码示例从队列 pull-queue 中租用了任务,租用期为 1 小时。

List<TaskHandle> tasks = q.leaseTasks(3600, TimeUnit.SECONDS, numberOfTasksToLease);

使用任务标记执行批处理

并非所有任务都是一样的;您的代码可以“标记”任务,然后按标记选择要租用的任务。标记可以充当过滤条件。

q.add(
    TaskOptions.Builder.withMethod(TaskOptions.Method.PULL)
        .payload(content.toString())
        .tag("process".getBytes()));

然后,租用过滤后的任务:

// Lease only tasks tagged with "process"
List<TaskHandle> tasks =
    q.leaseTasksByTag(3600, TimeUnit.SECONDS, numberOfTasksToLease, "process");
// You can also specify a tag to lease via LeaseOptions passed to leaseTasks.

调节轮询速率

如果工作器轮询队列以获取要租用的任务,则应检测尝试租用任务的速度是否快于队列提供任务的速度。如果发生此故障,则 leaseTasks() 可能会出现以下异常:


您的代码必须捕获这些异常,通过调用 leaseTasks() 来实现退避,然后再重试。为了避免此问题,请考虑在调用 leaseTasks() 时设置较长的 RPC 截止时间。如果租用请求返回的任务列表为空,您也应执行退避。

如果每个队列每秒生成 10 个以上的 LeaseTasks 请求,那么只有前 10 个请求将返回结果。如果请求数超过此限值,系统将返回 OK,且结果为零。

在 Cloud Console 中监控任务

要查看有关应用中所有任务和队列的信息,请执行以下操作:

  1. 打开 Cloud Console 中的任务队列页面,然后在页面顶部的菜单栏中选择拉取队列标签页。

    转到拉取队列标签页

  2. 点击您想了解的队列的名称,以打开队列详情页面。该页面会显示所选队列中的所有任务。

删除任务

工作器完成某项任务后,需要将该任务从队列中删除。如果您发现工作器已完成处理的任务仍停留在队列中,则说明工作器可能发生故障;在这种情况下,任务将由另一个工作器处理。

您可以使用 deleteTask() 删除单个任务或一列任务。必须知道任务的名称才能将其删除。您可以在由 leaseTasks() 返回的 Task object 中找到任务名称。

以下代码示例演示了如何删除队列中的任务:

q.deleteTask(task);