Pull queues allow you to design your own system to consume App Engine tasks. The task consumer can be part of your App Engine app (such as a module) or a system outside of App Engine (using the Task Queue REST API). The task consumer leases a specific number of tasks for a specific duration, then processes and deletes them before the lease ends.
Using pull queues requires your application to handle some functions that are automated in push queues:
- Your application needs to scale the number of workers based on processing volume. If your application does not handle scaling, you risk wasting computing resources if there are no tasks to process; you also risk latency if you have too many tasks to process.
- Your application also needs to explicitly delete tasks after processing. In push queues, App Engine deletes the tasks for you. If your application does not delete pull queue tasks after processing, another worker might re-process the task. This wastes computing resources and risks errors if tasks are not idempotent.
Pull queues require a specific configuration in
queue.xml. For more
information, see Defining Pull Queues.
The following sections describe the process of enqueuing, leasing, and deleting tasks using pull queues.
Pull queue overview
Pull queues allow a task consumer to process tasks outside of App Engine's default task processing system. If the task consumer is a part of your App Engine app, you can manipulate tasks using simple API calls from the
com.google.appengine.api.taskqueue package. Task consumers outside of App Engine can pull tasks using the Task Queue REST API.
The process works like this:
- The task consumer leases tasks, either via the Task Queue API (if the consumer is internal to App Engine) or the Task Queue REST API (if the consumer is external to App Engine).
- App Engine sends task data to the consumer.
- The consumer processes the tasks. If the task fails to execute before the lease expires, the consumer can lease it again. This counts as a retry attempt, and you can configure the maximum number of retry attempts before the system deletes the task.
- After a task executes successfully, the task consumer must delete it.
- The task consumer is responsible for scaling instances based on processing volume.
Pulling tasks within App Engine
You can use pull queues within the App Engine environment using simple API calls to add tasks to a pull queue, lease them, and delete them after processing.
Defining pull queues
You can specify any named queue as a pull queue by adding the
<mode>pull</mode> directive to
queue.xml configuration file.
<?xml version="1.0" encoding="UTF-8"?> <queue-entries> <queue> <name>pull-queue</name> <mode>pull</mode> </queue> </queue-entries>
If you are using the Task Queue REST API, you also need to create an access control list (ACL) using the acl directive. This directive allows you to restrict access to user email addresses corresponding to an account hosted by Google.
acl element has two available parameters:
user_email: enables the user to list, get, lease, delete, and update tasks.
writer_email: enables the user to insert tasks.
In order to access all functions of the API, a developer's email address must be
specified both as a
user_email and a
writer_email. The following code
snippet creates a pull queue named pull-queue with two users in the ACL. The
firstname.lastname@example.org can access all API calls:
<?xml version="1.0" encoding="UTF-8"?> <queue-entries> <queue> <name>pull-queue</name> <mode>pull</mode> <acl> <user-email>email@example.com</user-email> <writer-email>firstname.lastname@example.org</writer-email> <writer-email>email@example.com</writer-email> </acl> </queue> </queue-entries>
Adding tasks to a pull queue
First, get the queue using the queue name defined in the
Queue q = QueueFactory.getQueue("pull-queue");
Then use the queue's add() method with TaskOptions.Method.PULL to enqueue tasks in a pull queue named pull-queue:
Once you have added tasks to a pull queue, you can lease one or more tasks using
leaseTasks(). There may be a short delay before tasks recently added using
add() become available via
leaseTasks(). When you request a lease, you specify the number of tasks to lease (up to a maximum of 1,000 tasks) and the duration of the lease in seconds (up to a maximum of one week). The lease duration needs to be long enough to ensure that the slowest task will have time to finish before the lease period expires. You can extend a task lease using
Leasing a task makes it unavailable for processing by another worker, and it remains unavailable until the lease expires. If you lease an individual task, the API selects the task from the front of the queue. If no such task is available, an empty list is returned.
The following code sample leases 100 tasks from the queue
pull-queue for one hour:
List<TaskHandle> tasks = q.leaseTasks(3600, TimeUnit.SECONDS, numberOfTasksToLease);
Not all tasks are alike; your code can "tag" tasks and then choose tasks to lease by tag. The tag acts as a filter.
q.add(TaskOptions.Builder.withMethod(TaskOptions.Method.PULL) .payload(content.toString()) .tag("process".getBytes()));
// Lease only tasks tagged with "process" List<TaskHandle> tasks = q.leaseTasksByTag(3600, TimeUnit.SECONDS, numberOfTasksToLease, "process"); // You can also specify a tag to lease via LeaseOptions passed to leaseTasks.