Leasing Pull Tasks

Once tasks are in a pull queue, a worker can lease them. After the tasks are processed the worker must delete them.

Before you begin

Important context

  • This method is only applicable to workers that are running within a service in the standard environment.
  • When you use pull queues, you are responsible for scaling your workers based on your processing volume.

Leasing tasks

After the tasks are in the queue, a worker can lease one or more of them using the taskqueue.Lease method. There may be a short delay before tasks recently added using taskqueue.Add become available via taskqueue.Lease.

When you request a lease, you specify the number of tasks to lease (up to a maximum of 1,000 tasks) and the duration of the lease in seconds (up to a maximum of one week). The lease duration needs to be long enough to ensure that the slowest task will have time to finish before the lease period expires. You can modify a task lease using taskqueue.ModifyLease.

Leasing a task makes it unavailable for processing by another worker, and it remains unavailable until the lease expires.

The following code sample leases 100 tasks from the queue pull-queue for one hour:

tasks, err := taskqueue.Lease(ctx, 100, "pull-queue", 3600)

Batching with task tags

Not all tasks are alike; your code can "tag" tasks and then choose tasks to lease by tag. The tag acts as a filter. The following code sample demonstrates how to tag tasks and then lease by tags:

_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("parse"), Method: "PULL", Tag: "parse",
}, "pull-queue")
_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("render"), Method: "PULL", Tag: "render",
}, "pull-queue")

// leases render tasks, but not parse
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "render")

// Leases up to 100 tasks that have same tag.
// Tag is that of "oldest" task by ETA.
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "")

Regulating polling rates

Workers that poll the queue for tasks to lease should detect whether they are attempting to lease tasks faster than the queue can supply them. If this failure occurs, a back-off error will be returned from taskqueue.Lease.

Your code must handle these errors, back off from calling taskqueue.Lease, and then try again later. To avoid this problem, consider setting a higher RPC deadline when calling taskqueue.Lease. You should also back off when a lease request returns an empty list of tasks. If you generate more than 10 LeaseTasks requests per queue per second, only the first 10 requests will return results. If requests exceed this limit, OK is returned with zero results.

Monitoring tasks in the Google Cloud console

To view information about all the tasks and queues in your application:

  1. Open the Cloud Tasks page in the Google Cloud console and look for the Pull value in column Type.

    Go to Cloud Tasks

  2. Click on the name of the queue in which you are interested, opening the queue details page. It displays all of the tasks in the selected queue.

Deleting tasks

Once a worker completes a task, it needs to delete the task from the queue. If you see tasks remaining in a queue after a worker finishes processing them, it is likely that the worker failed; in this case, the tasks will be processed by another worker.

You can delete a list of tasks, such as that returned by taskqueue.Lease, by passing it to taskqueue.DeleteMulti:

tasks, err = taskqueue.Lease(ctx, 100, "pull-queue", 3600)
// Perform some work with the tasks here

taskqueue.DeleteMulti(ctx, tasks, "pull-queue")