Once tasks are in a pull queue, a worker can lease them. After the tasks are processed the worker must delete them.
Before you begin
- Create a pull queue.
- Create tasks and add them to the pull queue.
Important context
- This method is only applicable to workers that are running within a service in the standard environment.
- When you use pull queues, you are responsible for scaling your workers based on your processing volume.
Leasing tasks
After the tasks are in the queue, a worker can lease one or more of them using the
lease_tasks()
method. There may be a short delay before tasks recently added
using
add()
become available via
lease_tasks()
.
When you request a lease, you specify the number of tasks to lease (up to a maximum
of 1,000 tasks) and the duration of the lease in seconds (up to a maximum of one
week). The lease duration needs to be long enough to ensure that the slowest task
will have time to finish before the lease period expires. You can modify a task
lease using
modify_task_lease()
.
Leasing a task makes it unavailable for processing by another worker, and it remains unavailable until the lease expires.
The
lease_tasks()
method returns a Task object containing a list of tasks leased from the queue.
pull-queue
for one hour:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
Batching with task tags
Not all tasks are alike; your code can "tag" tasks and then choose tasks to lease by tag. The tag acts as a filter.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
Regulating polling rates
Workers that poll the queue for tasks to lease should detect whether they are
attempting to lease tasks faster than the queue can supply them. If this failure
occurs, the following exceptions from
lease_tasks()
can be generated:
google.appengine.api.taskqueue.TransientError
google.appengine.runtime.apiproxy_errors.DeadlineExceededError
Your code must catch these exceptions, back off from calling
lease_tasks()
,
and then try again later. To avoid this problem, consider setting a higher RPC
deadline when calling
lease_tasks()
. You should also back off when a lease
request returns an empty list of tasks.
If you generate more than 10
LeaseTasks requests per queue per second, only the first 10 requests will return
results. If requests exceed this limit, OK
is returned with zero results.
Monitoring tasks in the Google Cloud console
To view information about all the tasks and queues in your application:
Open the Cloud Tasks page in the Google Cloud console and look for the Pull value in column Type.
Click on the name of the queue in which you are interested, opening the queue details page. It displays all of the tasks in the selected queue.
Deleting tasks
Once a worker completes a task, it needs to delete the task from the queue. If you see tasks remaining in a queue after a worker finishes processing them, it is likely that the worker failed; in this case, the tasks will be processed by another worker.
You can delete a list of tasks, such as that returned by
lease_tasks()
,
simply by passing it to delete_tasks()
:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
An end-to-end example of pull queues
For a simple but complete end-to-end example of using pull queues in Python, see appengine-pullqueue-counter.