October 15, 2009
Thanks to the Task Queue API released in SDK 1.2.3, it's easier than ever to do work 'offline', separate from user serving requests. In some cases, however, setting up a handler for each distinct task you want to run can be cumbersome, as can serializing and deserializing complex arguments for the task - particularly if you have many diverse but small tasks that you want to run on the queue.
Fortunately, a new library in release 1.2.5 of the SDK makes these ad-hoc
tasks much easier to write and execute. This library is found in
google.appengine.ext.deferred, and from here on in we'll refer to it as the
'deferred' library. The deferred library lets you bypass all the work of
setting up dedicated task handlers and serializing and deserializing your
parameters by exposing a simple function,
call a function later, simply pass the function and its arguments to
deferred.defer, like this:
from google.appengine.ext import deferred def do_something_expensive(a, b, c=None): logging.info("Doing something expensive!") # Do your work here # Somewhere else deferred.defer(do_something_expensive, "Hello, world!", 42, c=True)
That's all there is to it. The deferred library will package up your
function call and its arguments, and add it to the task queue. When the task
gets executed, the deferred library will execute
do_something_expensive("Hello, world!", 42, c=True). There are two more
things you need to do to get this working - you need to add the deferred
library's task and URL handlers to your
app.yaml. Just add the following entry to the
section of your
- deferred: on
and the following entry to the handlers section of the same file:
- url: /_ah/queue/deferred script: google.appengine.ext.deferred.deferred.application login: admin
The capabilities of the deferred library aren't limited to simple function
calls with straightforward arguments. In fact, you can use nearly any python
'callable', including functions, methods, class methods and callable objects.
deferred.defer supports task arguments used in the task queue API, such as
countdown, eta, and name, but in order to use them, you need to prefix them
with an underscore, to prevent them conflicting with keyword arguments to your
own function. For example, to run
do_something_expensive in 30
seconds, with a custom queue name, you could do this:
deferred.defer(do_something_expensive, "Foobie bletch", 12, _countdown=30, _queue="myqueue")
Example: A datastore mapper
To demonstrate how powerful the deferred library can be, we're going to use an example from the Mapper class. This class will make it easy to iterate over a large set of entities, making changes or calculating totals, but won't require an external computer to run it on.
Here's our example Mapper implementation:
from google.appengine.ext import deferred from google.appengine.runtime import DeadlineExceededError class Mapper(object): # Subclasses should replace this with a model class (eg, model.Person). KIND = None # Subclasses can replace this with a list of (property, value) tuples to filter by. FILTERS =  def __init__(self): self.to_put =  self.to_delete =  def map(self, entity): """Updates a single entity. Implementers should return a tuple containing two iterables (to_update, to_delete). """ return (, ) def finish(self): """Called when the mapper has finished, to allow for any final work to be done.""" pass def get_query(self): """Returns a query over the specified kind, with any appropriate filters applied.""" q = self.KIND.query() for prop, value in self.FILTERS: q = q.filter(prop == value) q = q.order("__key__") return q def run(self, batch_size=100): """Starts the mapper running.""" self._continue(None, batch_size) def _batch_write(self): """Writes updates and deletes entities in a batch.""" if self.to_put: ndb.put_multi(self.to_put) self.to_put =  if self.to_delete: ndb.delete_multi(self.to_delete) self.to_delete =  def _continue(self, start_key, batch_size): q = self.get_query() # If we're resuming, pick up where we left off last time. if start_key: key_prop = getattr(self.KIND, '_key') q = q.filter(key_prop > start_key) # Keep updating records until we run out of time. try: # Steps over the results, returning each entity and its index. for i, entity in enumerate(q): map_updates, map_deletes = self.map(entity) self.to_put.extend(map_updates) self.to_delete.extend(map_deletes) # Do updates and deletes in batches. if (i + 1) % batch_size == 0: self._batch_write() # Record the last entity we processed. start_key = entity.key self._batch_write() except DeadlineExceededError: # Write any unfinished updates to the datastore. self._batch_write() # Queue a new task to pick up where we left off. deferred.defer(self._continue, start_key, batch_size) return self.finish()
The main loop is wrapped in a try/except clause, catching
DeadlineExceededError. This allows us to process as many results
as we can in the time we have available; when we're out of time, the runtime
DeadlineExceededError, which gives us enough time to
queue the next task before returning. We're also no longer fetching results in
batches - instead, we iterate over the query, which lets us fetch as many
results as we have time to process. Updates and deletes are still batched,
however, and whenever we've processed enough records, we update the datastore,
and record the current entity as the point to continue from.
The other difference is that we've added a
This gets called when every matching entity has been processed, and allows us
to use the Mapper class to do things like calculate totals.
Using the mapper
Here's the example mapper that adds 'bar' to every guestbook entry containing 'foo', rewritten for our new mapper class:
class GuestbookUpdater(Mapper): KIND = Greeting def map(self, entity): if entity.content.lower().find('foo') != -1: entity.content += ' Bar!' return ([entity], ) return (, ) mapper = GuestbookUpdater() deferred.defer(mapper.run)
As you can see, the mapper subclass itself is unchanged; the only change is how we invoke it - by 'deferring' the mapper.run method.
The new finish method makes it easy to calculate totals, too. Suppose we have a class that records files and how many times they've been downloaded, and we want to count up the total number of files and the total number of downloads. We could implement it like this:
class File(ndb.Model): name = ndb.StringProperty(required=True) download_count = ndb.IntegerProperty(required=True, default=0) class DailyTotal(ndb.Model): date = ndb.DateProperty(required=True, auto_now_add=True) file_count = ndb.IntegerProperty(required=True) download_count = ndb.IntegerProperty(required=True) class DownloadCountMapper(Mapper): KIND = File def __init__(self): self.file_count = 0 self.download_count = 0 def map(self, file): self.file_count += 1 self.download_count += file.download_count def finish(self): total = DailyTotal(file_count=self.file_count, download_count=self.download_count) total.put() mapper = DownloadCountMapper() deferred.defer(mapper.run)
In just a few lines of code, we've written a mapper that can be run from a cron job each day, and will count up the total number of files and downloads, storing them to another entity. This will still run even if we have many more File entities than we could count up in a single request, and it'll automatically retry in the event of timeouts and other transient errors. And at no point did we have to write a task queue handler, or map URLs. Our Mapper class can be put in a library and used in multiple applications, even if they use different frameworks!
The Mapper framework we wrote above isn't full-featured, of course, and it could do with a few improvements to make it more robust and versatile, but it serves to demonstrate the power of the deferred API.
Deferred tips and tricks
As always, there are a few tips and tricks you should be aware of in order to make optimal use of the deferred library.
Make tasks as small as possible
Task Queue items are limited to 100 KB of associated data. This means that when the deferred library serializes the details of your call, it must amount to less than 100 KB in order to fit on the Task Queue directly. No need to panic, though: if you try to enqueue a task that is too big to fit on the queue by itself, the deferred library will automatically create a new Entity in the datastore to hold information about the task, and will delete the entity once the task has been run. This means that, in practice, your function call can be up to 1 MB once serialized.
As you might expect, however, inserting and deleting datastore entities for each task adds some overhead, so, if you can, make the arguments to your call (and the object itself, if you're calling a method) as small as possible to avoid the extra overhead.
Don't pass entities to deferred.defer
If you're working with datastore entities, you might be tempted to do something like this:
def do_something(entity): # Do something with entity entity.put() entity = MyModel.get_by_id(123) deferred.defer(do_something, entity, _countdown=60)
This is a bad idea, because the deferred library will package up the entity itself and store it to the task queue; when the time comes to execute your task, it will deserialize the entity and run the code. When your code writes the entity back to the datastore, it'll overwrite any changes that have happened since you enqueued the task with deferred.defer!
Instead of passing entities around, then, you should pass keys. For example:
def do_something_with_key(k): entity = k.get() # Do something with entity entity.put() k = ndb.Key('MyModel', 123) deferred.defer(do_something_with_key, k, _countdown=60)
Dealing with task failures
Tasks created using deferred.defer get retried in case of failure just like regular Task Queue tasks. 'Failure' in the case of a deferred task is defined as your task throwing any uncaught exception. Normally, automatic retries are what you want - for example, if the task does datastore operations, and the datastore threw a Timeout exception. If you want to deliberately cause your task to fail so it will be retried, you can simply throw an exception.
Sometimes, though, you know your task will never succeed, and you want it to fail permanently. In such situations, you have two options:
- Return from the deferred function normally. The handler will think the task executed successfully, and will not try again. This is best in the case of non-critical failures - for example, when you deferred something for later, and discovered it's already been done.
- Raise a
deferred.PermanentTaskFailureexception. This is a special exception, and is treated as a permanent failure. It is logged as an exception, so it will show up in the Google Cloud Platform Console as one, but causes the task to not be retried again.
Handling import path manipulation
Some applications, or the frameworks they use, rely on manipulating the Python import path in order to make all the libraries they need available. While this is a perfectly legitimate technique, the deferred library has no way of knowing what path manipulations you've engaged in, so if the task you're deferring relies on modules that aren't on the import path by default, you need to give it a helping hand. Failing to do this can result in your tasks failing to run - or worse, only failing intermittently.
Fortunately, handling this is easy. Make sure your code that changes the
import path is in a module all of its own, such as '
Such a module might look like this:
import os import sys sys.path.append(os.path.join(os.path.dirname(__file__), 'lib'))
Then, import the '
fix_path' module along with your usual
imports, anywhere you rely on the modified path, such as in the module you
defined the functions you're calling with
Limitations of the deferred library
There are a few limitations on what you can pass to deferred.defer:
- All arguments must be picklable. That means you can't pass exotic things like instances of nested classes as function arguments. If you're calling an instance method, the instance must be picklable too. If you're not familiar with what 'pickling' is, don't worry - most stuff that you're likely to be passing to deferred.defer will work just fine.
- You can't pass nested functions, or methods of nested classes.
- You can't pass lambda functions (but you probably wouldn't want to anyway).
- You can't pass a static method.
- You can't pass a method defined in the request handler module.
The last point above deserves special attention: passing a method defined in the request handler module - the module specified as a request handler in app.yaml - will not work. You can call deferred.defer from the request handler module, but the function you are passing to it must be defined elsewhere!
When to use ext.deferred
You may be wondering when to use ext.deferred, and when to stick with the built-in task queue API. Here are our suggestions.
You may want to use the deferred library if:
- You only use the task queue lightly.
- You want to refactor existing code to run on the Task Queue with a minimum of changes.
- You're writing a one off maintenance task, such as schema migration.
- Your app has many different types of background tasks, and writing a separate handler for each would be burdensome.
- Your task requires complex arguments that aren't easily serialized without using Pickle.
- You are writing a library for other apps that needs to do background work.
You may want to use the Task Queue API if:
- You need complete control over how tasks are queued and executed.
- You need better queue management or monitoring than deferred provides.
- You have high throughput, and overhead is important.
- You are building larger abstractions and need direct control over tasks.
- You like the webhook model better than the RPC model.
Naturally, you can use both the Task Queue API and the deferred library side-by-side, if your app has requirements that fit into both groups.
If you've come up with your own uses for the deferred API, please let us and the rest of the community know about them in the discussion groups.