租用拉取任务

任务进入拉取队列后,就可供工作器租用。处理完任务后,工作器必须将其删除。

准备工作

重要的上下文信息

  • 此方法仅适用于在标准环境下的服务中运行的工作器。
  • 使用拉取队列时,您需要根据处理量来增减工作器。

租用任务

任务进入队列后,工作器可以使用 lease_tasks() 方法租用其中的一项或多项任务。对于最近使用 add() 添加的任务,可能要延迟一小段时间才可通过 lease_tasks() 租用。

在申请租用时,您可以指定要租用的任务数(最多 1000 个任务)和租用时长(以秒为单位,最长一周)。租用时长必须足够长,以确保速度最慢的任务也能在租约到期前完成。您可以使用 modify_task_lease() 修改任务租用。

某一任务被租用后,其他工作器就无法再对其进行处理;在租期到期之前,该任务将始终不可用。

lease_tasks() 方法会返回一个 Task 对象,该对象包含从队列中租用的一系列任务。

以下代码示例从队列 pull-queue 中租用了 100 个任务,租用期为 1 小时。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

使用任务标记执行批处理

并非所有任务都是一样的;您的代码可以“标记”任务,然后按标记选择要租用的任务。标记可以充当过滤条件。

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.

调节轮询速率

如果工作器轮询队列以获取要租用的任务,则应检测尝试租用任务的速度是否快于队列提供任务的速度。如果发生此故障,则 lease_tasks() 可能会出现以下异常:

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


您的代码必须捕获这些异常,通过调用 lease_tasks() 来实现退避,然后再重试。要避免此问题,请考虑在调用 lease_tasks() 时设置较长的 RPC 截止时间。如果租用请求返回的任务列表为空,您也应执行退避。

如果每个队列每秒生成 10 个以上的 LeaseTasks 请求,那么只有前 10 个请求将返回结果。如果请求数超过此限值,系统将返回 OK,且结果为零。

在 Google Cloud 控制台中监控任务

要查看有关应用中所有任务和队列的信息,请执行以下操作:

  1. 打开 Google Cloud 控制台中的 Cloud Tasks 页面,在类型列中查找拉取值。

    转至 Cloud Tasks

  2. 点击您想了解的队列的名称,以打开队列详情页面。该页面会显示所选队列中的所有任务。

删除任务

工作器完成某项任务后,需要将该任务从队列中删除。如果您发现工作器已完成处理的任务仍停留在队列中,则说明工作器可能发生故障;在这种情况下,任务将由另一个工作器处理。

要删除一系列任务(例如 lease_task() 返回的一系列任务),只需将其传递到 delete_tasks() 即可:

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

拉取队列的端到端示例

请参阅 appengine-pullqueue-counter,通过一个简单但全面的端到端示例来了解如何在 Python 中使用拉取队列。