Using Pull Queues in Go

Pull queues allow you to design your own system to consume App Engine tasks. The task consumer can be part of your App Engine app (such as a module) or a system outside of App Engine (using the Task Queue REST API). The task consumer leases a specific number of tasks for a specific duration, then processes and deletes them before the lease ends.

Using pull queues requires your application to handle some functions that are automated in push queues:

  • Your application needs to scale the number of workers based on processing volume. If your application does not handle scaling, you risk wasting computing resources if there are no tasks to process; you also risk latency if you have too many tasks to process.
  • Your application also needs to explicitly delete tasks after processing. In push queues, App Engine deletes the tasks for you. If your application does not delete pull queue tasks after processing, another worker might re-process the task. This wastes computing resources and risks errors if tasks are not idempotent.

Pull queues require a specific configuration in queue.yaml. For more information, see Defining Pull Queues.

The following sections describe the process of enqueuing, leasing, and deleting tasks using pull queues.

Pull queue overview

Pull queues allow a task consumer to process tasks outside of App Engine's default task processing system. If the task consumer is a part of your App Engine app, you can manipulate tasks using simple API calls from the appengine/taskqueue package.

The process works like this:

  1. The task consumer leases tasks, either via the Task Queue API (if the consumer is internal to App Engine) or the Task Queue REST API (if the consumer is external to App Engine).
  2. App Engine sends task data to the consumer.
  3. The consumer processes the tasks. If the task fails to execute before the lease expires, the consumer can lease it again. This counts as a retry attempt, and you can configure the maximum number of retry attempts before the system deletes the task.
  4. After a task executes successfully, the task consumer must delete it.
  5. The task consumer is responsible for scaling instances based on processing volume.

Pulling tasks within App Engine

You can use pull queues within the App Engine environment using simple API calls to add tasks to a pull queue, lease them, and delete them after processing.

Defining pull queues

You can specify any named queue as a pull queue by adding the mode: pull directive to the queue.yaml configuration file.

queue:
- name: pull-queue
  mode: pull

If you are using the Task Queue REST API, you also need to create an access control list (ACL) using the acl directive. This directive allows you to restrict access to user email addresses corresponding to an account hosted by Google.

The acl element has two available parameters:

  • user_email: enables the user to list, get, lease, delete, and update tasks.
  • writer_email: enables the user to insert tasks.

In order to access all functions of the API, a developer's email address must be specified both as a user_email and a writer_email. The following code snippet creates a pull queue named pull-queue with two users in the ACL. The email account bar@foo.com can access all API calls:

queue:
- name: pull-queue
  mode: pull
  acl:
  - user_email: bar@foo.com      # can list, get, lease, delete, and update tasks
  - writer_email: user@gmail.com # can insert tasks
  - writer_email: bar@foo.com    # can insert tasks, in addition to rights granted by being a user_email above

Adding tasks to a pull queue

To add tasks to a pull queue, get the queue using the queue name defined in queue.yaml, and set the Method to PULL. The following example enqueues tasks in a pull queue named pull-queue:

t := &taskqueue.Task{
	Payload: []byte("hello world"),
	Method:  "PULL",
}
_, err := taskqueue.Add(ctx, t, "pull-queue")

Leasing tasks

Once you have added tasks to a pull queue, you can lease one or more tasks using taskqueue.Lease. There may be a short delay before tasks recently added using taskqueue.Add become available via taskqueue.Lease. When you request a lease, you specify the number of tasks to lease (up to a maximum of 1,000 tasks) and the duration of the lease in seconds (up to a maximum of one week). The lease duration needs to be long enough to ensure that the slowest task will have time to finish before the lease period expires. You can extend a task lease using taskqueue.ModifyLease.

Leasing a task makes it unavailable for processing by another worker, and it remains unavailable until the lease expires. If you lease an individual task, the API selects the task from the front of the queue. If no such task is available, an empty list is returned.

The following code sample leases 100 tasks from the queue pull-queue for one hour:

tasks, err := taskqueue.Lease(ctx, 100, "pull-queue", 3600)

Not all tasks are alike; your code can "tag" tasks and then choose tasks to lease by tag. The tag acts as a filter. The following code sample demonstrates how to tag tasks and then lease by tags:

_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("parse"), Method: "PULL", Tag: "parse",
}, "pull-queue")
_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("render"), Method: "PULL", Tag: "render",
}, "pull-queue")

// leases render tasks, but not parse
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "render")

// Leases up to 100 tasks that have same tag.
// Tag is that of "oldest" task by ETA.
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "")

Deleting tasks

In general, once a worker completes a task, it needs to delete the task from the queue. If you see tasks remaining in a queue after a worker finishes processing them, it is likely that the worker failed; in this case, the tasks need to be processed by another worker.

You can delete a list of tasks, such as that returned by taskqueue.Lease, by passing it to taskqueue.DeleteMulti:

tasks, err = taskqueue.Lease(ctx, 100, "pull-queue", 3600)
// Perform some work with the tasks here

taskqueue.DeleteMulti(ctx, tasks, "pull-queue")

Pulling tasks to a module

You can use App Engine Modules as workers to lease and process pull queue tasks. Modules allow you to process more work without having to worry about request deadlines and other restrictions normally imposed by App Engine. Using modules with pull queues gives you processing efficiencies by allowing you to batch task processing using leases.

For more information about using modules, check out the Modules documentation.

Pulling tasks from outside App Engine

If you need to use pull queues from outside App Engine, you must use the Task Queue REST API. The REST API is a Google web service accessible at a globally-unique URI of the form:

https://www.googleapis.com/taskqueue/v1beta2/projects/taskqueues

Google provides the following client libraries that you can use to call the Task Queue methods remotely:

In the tables below, the first column shows each library's stage of development (note that some are in early stages), and links to documentation for the library. The second column links to available samples for each library.

Documentation Samples
Google API Client Library for Java Java samples
Google API Client Library for JavaScript (beta) JavaScript samples
Google API Client Library for .NET .NET samples
Google API Client Library for Objective-C for REST Objective-C samples
Google API Client Library for PHP (beta) PHP samples
Google API Client Library for Python Python samples

These early-stage libraries are also available:

Documentation Samples
Google APIs Client Libraries for Dart (beta) Dart samples
Google API Client Library for Go (alpha) Go samples
Google API Client Library for Node.js (alpha) Node.js samples
Google API Client Library for Ruby (alpha) Ruby samples

Prerequisites

The REST API uses OAuth as the authorization mechanism. When you configure your pull queue, make sure that your queue.yaml file supplies the email addresses of the users that can access the queue using the REST API. The OAuth scope for all methods is https://www.googleapis.com/auth/taskqueue.

Setting storage limits for all queues

You can use queue.yaml to define the total amount of storage that task data can consume over all queues. To define the total storage limit, include an element named total_storage_limit at the top level:

# Set the total storage limit for all queues to 120MB
total_storage_limit: 120M
queue:
- name: foo
  rate: 35/s

The value is a number followed by a unit: B for bytes, K for kilobytes, M for megabytes, G for gigabytes, T for terabytes. For example, 100K specifies a limit of 100 kilobytes. If adding a task would cause the queue to exceed its storage limit, the call to add the task will fail. The default limit is 500M (500 megabytes) for free apps. For billed apps there is no limit until you explicitly set one. You can use this limit to protect your app from a fork bomb programming error in which each task adds multiple other tasks during its execution. If your app is receiving errors for insufficient quota when adding tasks, increasing the total storage limit can help. If you are using this feature, we strongly recommend setting a limit that corresponds to the storage required for several days' worth of tasks. In this way, your app is robust to its queues being temporarily backed up and can continue to accept new tasks while working through the backlog while still being protected from a fork bomb programming error.

Configuring retry attempts for failed tasks

Tasks executing in the task queue can fail for many reasons. If a task fails to execute by returning any HTTP status code outside of the range 200–299, App Engine retries the task until it succeeds. By default, the system gradually reduces the retry rate to avoid flooding your application with too many requests, but schedules retry attempts to recur at a maximum of once per hour until the task succeeds.

In pull queues, you can specify the number of times to retry a task using the retry_parameters directive with the task_retry_limit field. The system counts each time you lease a task using taskqueue.Lease. When the count exceeds the task_retry_limit, the system deletes the task automatically. If you don't specify a task_retry_limit, the system never deletes a task automatically.

The following code sample shows how to specify a pull queue limited to seven retry attempts:

queue:
- name: pull-queue
  mode: pull
  retry_parameters:
    task_retry_limit: 7

Quotas and limits for pull queues

Enqueuing a task counts counts toward the following quotas:

  • Task Queue Stored Task Count
  • Task Queue API Calls
  • Task Queue Stored Task Bytes

Leasing a task counts toward the following quotas:

  • Task Queue API Calls
  • Outgoing Bandwidth (if using the REST API)

The Task Queue Stored Task Bytes quota is configurable in queue.yaml by setting the total_storage_limit. This quota counts towards your Stored Data (billable) quota.

The following limits apply to the use of pull queues:

Pull Queue Limits
Maximum task size1MB
Maximum countdown/ETA for a task 30 days from the current date and time
Maximum number of tasks that can be added in a batch 100 tasks
Maximum number of tasks that can be added in a transaction 5 tasks
Maximum number of tasks that you can lease in a single operation 1000 tasks
Maximum payload size when leasing a batch of tasks32MB (1MB when using the REST API)
Default maximum number of task queues100 queues. Contact Support to request an increase.

Send feedback about...

App Engine standard environment for Go