Once tasks are in a pull queue, a worker can lease them. After the tasks are processed the worker must delete them.
Before you begin
What you understand
- This method is only applicable to workers that are running within a service in the standard environment. To use a worker service not in the standard environment to process your tasks, see the docs for the Cloud Tasks API.
- When you use pull queues, you are responsible for scaling your workers based on your processing volume.
After the tasks are in the queue, a worker can lease one or more of them using the
lease_tasks() method. There may be a short delay before tasks recently added using
add() become available via
When you request a lease, you specify the number of tasks to lease (up to a maximum of 1,000 tasks) and the duration of the lease in seconds (up to a maximum of one week). The lease duration needs to be long enough to ensure that the slowest task will have time to finish before the lease period expires. You can modify a task lease using
Leasing a task makes it unavailable for processing by another worker, and it remains unavailable until the lease expires.
pull-queuefor one hour:
from google.appengine.api import taskqueue q = taskqueue.Queue('pull-queue') q.lease_tasks(3600, 100)
Batching with task tags
Not all tasks are alike; your code can "tag" tasks and then choose tasks to lease by tag. The tag acts as a filter.
from google.appengine.api import taskqueue q = taskqueue.Queue('pull-queue') q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse')) q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse')) q.add(taskqueue.Task(payload='render1', method='PULL', tag='render')) q.add(taskqueue.Task(payload='render2', method='PULL', tag='render')) q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag. # Tag is that of "oldest" task by eta.
Regulating polling rates
Workers that poll the queue for tasks to lease should detect whether they are attempting to lease tasks faster than the queue can supply them. If this failure occurs, the following exceptions from
lease_tasks() can be generated:
Your code must catch these exceptions, back off from calling
lease_tasks(), and then try again later. To avoid this problem, consider setting a higher RPC deadline when calling lease_tasks(). You should also back off when a lease request returns an empty list of tasks.
If you generate more than 10 LeaseTasks requests per queue per second, only the first 10 requests will return results. If requests exceed this limit,
OK is returned with zero results.
Monitoring tasks in the GCP Console
To view information about all the tasks and queues in your application:
Open the Task Queues page in the GCP Console and select the Pull Queues tab in the menu bar at the top of the page.
Click on the name of the queue in which you are interested, opening the queue details page. It displays all of the tasks in the selected queue.
Once a worker completes a task, it needs to delete the task from the queue. If you see tasks remaining in a queue after a worker finishes processing them, it is likely that the worker failed; in this case, the tasks will be processed by another worker.
You can delete a list of tasks, such as that returned by
lease_task(), simply by passing it to
from google.appengine.api import taskqueue q = taskqueue.Queue('pull-queue') tasks = q.lease_tasks(3600, 100) # Perform some work with the tasks here q.delete_tasks(tasks)
An end-to-end example of pull queues
For a simple but complete end-to-end example of using pull queues in Python, see appengine-pullqueue-counter.