Using Pull Queues in Java

Pull queues allow you to design your own system to consume App Engine tasks. The task consumer can be part of your App Engine app (such as a module) or a system outside of App Engine (using the Task Queue REST API). The task consumer leases a specific number of tasks for a specific duration, then processes and deletes them before the lease ends.

Using pull queues requires your application to handle some functions that are automated in push queues:

  • Your application needs to scale the number of workers based on processing volume. If your application does not handle scaling, you risk wasting computing resources if there are no tasks to process; you also risk latency if you have too many tasks to process.
  • Your application also needs to explicitly delete tasks after processing. In push queues, App Engine deletes the tasks for you. If your application does not delete pull queue tasks after processing, another worker might re-process the task. This wastes computing resources and risks errors if tasks are not idempotent.

Pull queues require a specific configuration in queue.xml. For more information, see Defining Pull Queues.

The following sections describe the process of enqueuing, leasing, and deleting tasks using pull queues.

  1. Pull queue overview
  2. Pulling tasks within App Engine
  3. Pulling tasks to a module
  4. Pulling tasks from outside App Engine
  5. Defining pull queue processing rate
  6. Setting storage limits for all queues
  7. Configuring the maximum number of concurrent requests
  8. Configuring retry attempts for failed tasks
  9. Quotas and limits for pull queues

Pull queue overview

Pull queues allow a task consumer to process tasks outside of App Engine's default task processing system. If the task consumer is a part of your App Engine app, you can manipulate tasks using simple API calls from the com.google.appengine.api.taskqueue package. Task consumers outside of App Engine can pull tasks using the Task Queue REST API.

The process works like this:

  1. The task consumer leases tasks, either via the Task Queue API (if the consumer is internal to App Engine) or the Task Queue REST API (if the consumer is external to App Engine).
  2. App Engine sends task data to the consumer.
  3. The consumer processes the tasks. If the task fails to execute before the lease expires, the consumer can lease it again. This counts as a retry attempt, and you can configure the maximum number of retry attempts before the system deletes the task.
  4. After a task executes successfully, the task consumer must delete it.
  5. The task consumer is responsible for scaling instances based on processing volume.

Pulling tasks within App Engine

You can use pull queues within the App Engine environment using simple API calls to add tasks to a pull queue, lease them, and delete them after processing.

Defining pull queues

You can specify any named queue as a pull queue by adding the <mode>pull</mode> directive to the queue.xml configuration file.

<?xml version="1.0" encoding="UTF-8"?>
<queue-entries>
  <queue>
    <name>pull-queue</name>
    <mode>pull</mode>
  </queue>
</queue-entries>

If you are using the Task Queue REST API, you also need to create an access control list (ACL) using the acl directive. This directive allows you to restrict access to user email addresses corresponding to an account hosted by Google.

The acl element has two available parameters:

  • user_email: enables the user to list, get, lease, delete, and update tasks.
  • writer_email: enables the user to insert tasks.

In order to access all functions of the API, a developer's email address must be specified both as a user_email and a writer_email. The following code snippet creates a pull queue named pull-queue with two users in the ACL. The email account bar@foo.com can access all API calls:

<?xml version="1.0" encoding="UTF-8"?>
<queue-entries>
  <queue>
    <name>pull-queue</name>
    <mode>pull</mode>
    <acl>
      <user-email>bar@foo.com</user-email>
      <writer-email>user@gmail.com</writer-email>
      <writer-email>bar@foor.com</writer-email>
    </acl>
  </queue>
</queue-entries>

Adding tasks to a pull queue

To add tasks to a pull queue, get the queue using the queue name defined in queue.xml, and use TaskOptions.Method.PULL. The following example enqueues tasks in a pull queue named pull-queue:

First, get the queue using the queue name defined in the queue.xml:

Then use the queue's add() method with TaskOptions.Method.PULL to enqueue tasks in a pull queue named pull-queue:

Leasing tasks

Once you have added tasks to a pull queue, you can lease one or more tasks using leaseTasks(). There may be a short delay before tasks recently added using add() become available via leaseTasks(). When you request a lease, you specify the number of tasks to lease (up to a maximum of 1,000 tasks) and the duration of the lease in seconds (up to a maximum of one week). The lease duration needs to be long enough to ensure that the slowest task will have time to finish before the lease period expires. You can extend a task lease using modifyTaskLease().

Leasing a task makes it unavailable for processing by another worker, and it remains unavailable until the lease expires. If you lease an individual task, the API selects the task from the front of the queue. If no such task is available, an empty list is returned.

The following code sample leases 100 tasks from the queue pull-queue for one hour:

Not all tasks are alike; your code can "tag" tasks and then choose tasks to lease by tag. The tag acts as a filter.

Deleting tasks

In general, once a worker completes a task, it needs to delete the task from the queue. If you see tasks remaining in a queue after a worker finishes processing them, it is likely that the worker failed; in this case, the tasks need to be processed by another worker.

You can delete an individual task or a list of tasks using deleteTask(). You must know the name of a task in order to delete it. If you are deleting tasks from a pull queue, you can find task names in the Task object returned by leaseTasks(). If you are deleting tasks from a push queue, you need to know the task name through some other means (for example, if you created the task explicitly).

The following code sample demonstrates how to delete a task from the queue named pull-queue:

Pulling tasks to a module

You can use App Engine Modules as workers to lease and process pull queue tasks. Modules allow you to process more work without having to worry about request deadlines and other restrictions normally imposed by App Engine. Using modules with pull queues gives you processing efficiencies by allowing you to batch task processing using leases.

For more information about using modules, check out the Modules documentation.

Pulling tasks from outside App Engine

If you need to use pull queues from outside App Engine, you must use the Task Queue REST API. The REST API is a Google web service accessible at a globally-unique URI of the form:

https://www.googleapis.com/taskqueue/v1beta2/projects/taskqueues

Google provides the following client libraries that you can use to call the Task Queue methods remotely:

In the tables below, the first column shows each library's stage of development (note that some are in early stages), and links to documentation for the library. The second column links to available samples for each library.

Documentation Samples
Google API Client Library for Java Java samples
Google API Client Library for JavaScript (beta) JavaScript samples
Google API Client Library for .NET .NET samples
Google API Client Library for Objective-C Objective-C samples
Google API Client Library for PHP (beta) PHP samples
Google API Client Library for Python Python samples

These early-stage libraries are also available:

Documentation Samples
Google APIs Client Libraries for Dart (beta) Dart samples
Google API Client Library for Go (alpha) Go samples
Google API Client Library for Node.js (alpha) Node.js samples
Google API Client Library for Ruby (alpha) Ruby samples

Prerequisites

The REST API uses OAuth as the authorization mechanism. When you configure your pull queue, make sure that your queue.xml file supplies the email addresses of the users that can access the queue using the REST API. The OAuth scope for all methods is https://www.googleapis.com/auth/taskqueue.

Using the task queue REST API with the Java Google API library

This section demonstrates the use of the REST API in an application. The sample allows you to interact with the REST API via the command line and also provides an example of how to lease and delete tasks. The sections below show the basics of using the Client Library with relevant code snippets.

Importing the client library for Java

Most of the functionality of the client library is encapsulated in an HttpTransport object. This object stores your authentication header and the default headers to be used with every request. After the authentication is complete, we set up an HttpTransport instance that will be used for all requests:

The HttpTransport instance can now be used to make requests to the REST API. For example, here is the code from TaskQueueSample.java that calls TaskQueues.get and displays information about the queue:

The following sections describe the two most common functions used with the Task Queue API, allowing you to lease and delete tasks.

Leasing tasks

The Java library provides methods that invoke the Tasks.lease method in the REST API. When you create a lease, you need to specify the number of tasks to lease (up to a maximum of 1,000 tasks); the API returns the specified number of tasks in order of the oldest task ETA.

You also need to specify the duration of the lease in seconds (up to a maximum of one week). The lease must be long enough so you can finish all the leased tasks, yet short enough so that tasks can be made available for leasing if your consumer crashes. Similarly, if you lease too many tasks at once and your client crashes, a large number of tasks will become unavailable until the lease expires.

The following code shows how to lease tasks:

This code enables the command-line tool to lease a specified number of tasks for a set duration:

mvn -q exec:java -Dexec.args="your-project-id pull-queue 30 100"

When run, this command-line tool constructs the following URI call to the REST API:

POST https://www.googleapis.com/taskqueue/v1beta2/projects/your-project-id/taskqueues/pull-queue/tasks/lease?leaseSecs=30&numTasks=100

This request returns an array of 100 tasks with the following JSON structure:

{
  "kind": "taskqueues#tasks",
  "items": [
    {
      "kind": "taskqueues#task",
      "id": string,
      "queueName": string,
      "payloadBase64": string,
      "enqueueTimestamp": number,
      "leaseTimestamp": number,
      "tag": string
    }
    ...
  ]
}

After processing each task, you need to delete it, as described in the following section.

Deleting tasks

In general, once a worker completes a task, it needs to delete the task from the queue. If you see tasks remaining in a queue after a worker finishes processing, it is likely that the worker failed; in this case, the tasks need to be processed by another worker.

You can delete an individual task or a list of tasks using the REST method Task.delete. You must know the name of a task in order to delete it. You can get the task name from the id field of the Task object returned by Task.lease.

Call delete if you have finished a task, even if you have exceeded the lease time. Tasks should be idempotent, so even if a task lease expires and another client leases the task, performing the same task twice should not cause an error.

The following code snippet deletes tasks from a queue:

If the delete command is successful, the REST API returns an HTTP 200 response. If deletion fails, the API returns an HTTP failure code.

Defining the pull queue processing rate

You can control the rate at which tasks are processed in each of your queues by defining other directives, such as <rate>, <bucket-size>, and <max-concurrent-requests>.

The task queue uses token buckets to control the rate of task execution. Each named queue has a token bucket that holds a certain number of tokens, defined by the <bucket-size> directive. Each time your application executes a task, it uses a token. Your app continues processing tasks in the queue until the queue's bucket runs out of tokens. App Engine refills the bucket with new tokens continuously based on the <rate> that you specified for the queue.

If your queue contains tasks to process, and the queue's bucket contains tokens, App Engine processes as many tasks as there are tokens remaining in the bucket. This can lead to bursts of processing, consuming system resources and competing with user-serving requests.

If you want to prevent too many tasks from running at once or to prevent datastore contention, you use <max-concurrent-requests>.

The following samples shows how to set <max-concurrent-requests> to limit tasks and also shows how to adjust the bucket size and rate based on your application's needs and available resources:

<?xml version="1.0" encoding="UTF-8"?>
<queue-entries>
  <queue>
    <name>optimize-queue</name>
    <rate>20/s</rate>
    <bucket-size>40</bucket-size>
    <max-concurrent-requests>10</max-concurrent-requests>
  </queue>
</queue-entries>

Setting storage limits for all queues

You can use queue.xml to define the total amount of storage that task data can consume over all queues. To define the total storage limit, include an element named <total-storage-limit> at the top level:

<?xml version="1.0" encoding="UTF-8"?>
<queue-entries>
  <total-storage-limit>120M</total-storage-limit>
  <queue>
    <name>foor</name>
    <rate>35/s</rate>
  </queue>
</queue-entries>

The value is a number followed by a unit: B for bytes, K for kilobytes, M for megabytes, G for gigabytes, T for terabytes. For example, 100K specifies a limit of 100 kilobytes. If adding a task would cause the queue to exceed its storage limit, the call to add the task will fail. The default limit is 500M (500 megabytes) for free apps. For billed apps there is no limit until you explicitly set one. You can use this limit to protect your app from a fork bomb programming error in which each task adds multiple other tasks during its execution. If your app is receiving errors for insufficient quota when adding tasks, increasing the total storage limit can help. If you are using this feature, we strongly recommend setting a limit that corresponds to the storage required for several days' worth of tasks. In this way, your app is robust to its queues being temporarily backed up and can continue to accept new tasks while working through the backlog while still being protected from a fork bomb programming error.

Configuring the maximum number of concurrent requests

You can further control the processing rate by setting <max-concurrent-requests>, which limits the number of tasks that can execute simultaneously.

If your application queue has a rate of 20/s and a bucket size of 40, tasks in that queue execute at a rate of 20/s and can burst up to 40/s briefly. These settings work fine if task latency is relatively low; however, if latency increases significantly, you'll end up processing significantly more concurrent tasks. This extra processing load can consume extra instances and slow down your application.

For example, let's assume that your normal task latency is 0.3 seconds. At this latency, you'll process at most around 40 tasks simultaneously. But if your task latency increases to 5 seconds, you could easily have over 100 tasks processing at once. This increase forces your application to consume more instances to process the extra tasks, potentially slowing down the entire application and interfering with user requests.

You can avoid this possibility by setting <max-concurrent-requests> to a lower value. For example, if you set <max-concurrent-requests> to 10, our example queue maintains about 20 tasks/second when latency is 0.3 seconds. However, when the latency increases over 0.5 seconds, this setting throttles the processing rate to ensure that no more than 10 tasks run simultaneously.

<?xml version="1.0" encoding="utf-8"?>
<queue-entries>
  <queue>
    <name>optimize-queue</name>
    <rate>20/s</rate>
    <bucket-size>40</bucket-size>
    <max-concurrent-requests>10</max-concurrent-requests>
  </queue>
</queue-entries>

Configuring retry attempts for failed tasks

Tasks executing in the task queue can fail for many reasons. If a task fails to execute by returning any HTTP status code outside of the range 200–299, App Engine retries the task until it succeeds. By default, the system gradually reduces the retry rate to avoid flooding your application with too many requests, but schedules retry attempts to recur at a maximum of once per hour until the task succeeds.

In pull queues, you can specify the number of times to retry a task using the retry_parameters directive with the task_retry_limit field. The system counts each time you lease a task using leaseTasks(). When the count exceeds the task_retry_limit, the system deletes the task automatically. If you don't specify a task_retry_limit, the system never deletes a task automatically.

The following code sample shows how to specify a pull queue limited to seven retry attempts:

queue:
- name: pull-queue
  mode: pull
  retry_parameters:
    task_retry_limit: 7

Quotas and limits for pull queues

Enqueuing a task counts counts toward the following quotas:

  • Task Queue Stored Task Count
  • Task Queue API Calls
  • Task Queue Stored Task Bytes

Leasing a task counts toward the following quotas:

  • Task Queue API Calls
  • Outgoing Bandwidth (if using the REST API)

The Task Queue Stored Task Bytes quota is configurable in queue.xml by setting the <total-storage-limit>. This quota counts towards your Stored Data (billable) quota.

The following limits apply to the use of pull queues:

Pull Queue Limits
Maximum task size1MB
Maximum countdown/ETA for a task 30 days from the current date and time
Maximum number of tasks that can be added in a batch 100 tasks
Maximum number of tasks that can be added in a transaction 5 tasks
Maximum number of tasks that you can lease in a single operation 1000 tasks
Maximum payload size when leasing a batch of tasks32MB (1MB when using the REST API)

Send feedback about...