google.appengine.ext.ndb.tasklets module

Summary

A tasklet decorator.

Tasklets are a way to write concurrently running functions without threads; tasklets are executed by an event loop and can suspend themselves blocking for I/O or some other operation using a yield statement. The notion of a blocking operation is abstracted into the Future class, but a tasklet may also yield an RPC in order to wait for that RPC to complete.

The @tasklet decorator wraps generator function so that when it is called, a Future is returned while the generator is executed by the event loop. Within the tasklet, any yield of a Future waits for and returns the Future’s result. For example:

@tasklet
def foo():
  a = yield <some Future>
  b = yield <another Future>
  raise Return(a + b)

def main():
  f = foo()
  x = f.get_result()
  print x

Note that blocking until the Future’s result is available using get_result() is somewhat inefficient (though not vastly – it is not busy-waiting). In most cases such code should be rewritten as a tasklet instead:

@tasklet
def main_tasklet():
  f = foo()
  x = yield f
  print x

Calling a tasklet automatically schedules it with the event loop:

def main():
  f = main_tasklet()
  eventloop.run()  # Run until no tasklets left to do
  f.done()  # Returns True

As a special feature, if the wrapped function is not a generator function, its return value is returned via the Future. This makes the following two equivalent:

@tasklet
def foo():
  return 42

@tasklet
def foo():
  if False: yield  # The presence of 'yield' makes foo a generator
  raise Return(42)  # Or, after PEP 380, return 42

This feature (inspired by Monocle) is handy in case you are implementing an interface that expects tasklets but you have no need to suspend – there’s no need to insert a dummy yield in order to make the tasklet into a generator.

Contents

google.appengine.ext.ndb.tasklets.Return

alias of StopIteration

google.appengine.ext.ndb.tasklets.tasklet(func)source
google.appengine.ext.ndb.tasklets.synctasklet(func)source

Decorator to run a function as a tasklet when called.

Use this to wrap a request handler function that will be called by some web application framework (e.g. a Django view function or a webapp.RequestHandler.get method).

google.appengine.ext.ndb.tasklets.toplevel(func)source

A sync tasklet that sets a fresh default Context.

Use this for toplevel view functions such as webapp.RequestHandler.get() or Django view functions.

google.appengine.ext.ndb.tasklets.sleep(dt)source

Public function to sleep some time.

Example

yield tasklets.sleep(0.5) # Sleep for half a sec.

google.appengine.ext.ndb.tasklets.add_flow_exception(exc)source

Add an exception that should not be logged.

The argument must be a subclass of Exception.

google.appengine.ext.ndb.tasklets.get_return_value(err)source
google.appengine.ext.ndb.tasklets.get_context()source
google.appengine.ext.ndb.tasklets.set_context(new_context)source
google.appengine.ext.ndb.tasklets.make_default_context()source
google.appengine.ext.ndb.tasklets.make_context(*args, **kwds)source
class google.appengine.ext.ndb.tasklets.Future(info=None)source

Bases: object

A Future has 0 or more callbacks.

The callbacks will be called when the result is ready.

NOTE: This is somewhat inspired but not conformant to the Future interface defined by PEP 3148. It is also inspired (and tries to be somewhat compatible with) the App Engine specific UserRPC and MultiRpc classes.

FINISHING = 2
IDLE = 0
RUNNING = 1
add_callback(callback, *args, **kwds)source
add_immediate_callback(callback, *args, **kwds)source
check_success()source
done()source
dump()source
dump_stack()source
get_exception()source
get_result()source
get_traceback()source
set_exception(exc, tb=None)source
set_result(result)source
state
wait()source
classmethod wait_all(futures)source
classmethod wait_any(futures)source
class google.appengine.ext.ndb.tasklets.MultiFuture(info=None)source

Bases: google.appengine.ext.ndb.tasklets.Future

A Future that depends on multiple other Futures.

This is used internally by ‘v1, v2, … = yield f1, f2, …’; the semantics (e.g. error handling) are constrained by that use case.

The protocol from the caller’s POV is:

mf = MultiFuture()
mf.add_dependent(<some other Future>)  -OR- mf.putq(<some value>)
mf.add_dependent(<some other Future>)  -OR- mf.putq(<some value>)
  .
  . (More mf.add_dependent() and/or mf.putq() calls)
  .
mf.complete()  # No more dependents will be added.
  .
  . (Time passes)
  .
results = mf.get_result()

Now, results is a list of results from all dependent Futures in the order in which they were added.

It is legal to add the same dependent multiple times.

Callbacks can be added at any point.

From a dependent Future POV, there’s nothing to be done: a callback is automatically added to each dependent Future which will signal its completion to the MultiFuture.

Error handling: if any dependent future raises an error, it is propagated to mf. To force an early error, you can call mf.set_exception() instead of mf.complete(). After this you can’t call mf.add_dependent() or mf.putq() any more.

add_dependent(fut)source
complete()source
putq(value)source
set_exception(exc, tb=None)source
class google.appengine.ext.ndb.tasklets.QueueFuture(info=None)source

Bases: google.appengine.ext.ndb.tasklets.Future

A Queue following the same protocol as MultiFuture.

However, instead of returning results as a list, it lets you retrieve results as soon as they are ready, one at a time, using getq(). The Future itself finishes with a result of None when the last result is ready (regardless of whether it was retrieved).

The getq() method returns a Future which blocks until the next result is ready, and then returns that result. Each getq() call retrieves one unique result. Extra getq() calls after the last result is already returned return EOFError as their Future’s exception. (I.e., q.getq() returns a Future as always, but yieding that Future raises EOFError.)

NOTE: Values can also be pushed directly via .putq(value). However there is no flow control – if the producer is faster than the consumer, the queue will grow unbounded.

add_dependent(fut)source
complete()source
getq()source
putq(value)source
set_exception(exc, tb=None)source
class google.appengine.ext.ndb.tasklets.SerialQueueFuture(info=None)source

Bases: google.appengine.ext.ndb.tasklets.Future

Like QueueFuture but maintains the order of insertion.

This class is used by Query operations.

Invariants:

  • At least one of _queue and _waiting is empty.

  • The Futures in _waiting are always pending.

(The Futures in _queue may be pending or completed.)

In the discussion below, add_dependent() is treated the same way as putq().

If putq() is ahead of getq(), the situation is like this:

putq() v

_queue: [f1, f2, …]; _waiting: [] ^ getq()

Here, putq() appends a Future to the right of _queue, and getq() removes one from the left.

If getq() is ahead of putq(), it’s like this:

putq() v

_queue: []; _waiting: [f1, f2, …]

^ getq()

Here, putq() removes a Future from the left of _waiting, and getq() appends one to the right.

When both are empty, putq() appends a Future to the right of _queue, while getq() appends one to the right of _waiting.

The _full flag means that no more calls to putq() will be made; it is set by calling either complete() or set_exception().

Calling complete() signals that no more putq() calls will be made. If getq() is behind, subsequent getq() calls will eat up _queue until it is empty, and after that will return a Future that passes EOFError (note that getq() itself never raises EOFError). If getq() is ahead when complete() is called, the Futures in _waiting are all passed an EOFError exception (thereby eating up _waiting).

If, instead of complete(), set_exception() is called, the exception and traceback set there will be used instead of EOFError.

add_dependent(fut)source
complete()source
getq()source
putq(value)source
set_exception(exc, tb=None)source
class google.appengine.ext.ndb.tasklets.ReducingFuture(reducer, info=None, batch_size=20)source

Bases: google.appengine.ext.ndb.tasklets.Future

A Queue following the same protocol as MultiFuture.

However the result, instead of being a list of results of dependent Futures, is computed by calling a ‘reducer’ tasklet. The reducer tasklet takes a list of values and returns a single value. It may be called multiple times on sublists of values and should behave like e.g. sum().

NOTE: The reducer input values may be reordered compared to the order in which they were added to the queue.

add_dependent(fut)source
complete()source
putq(value)source
set_exception(exc, tb=None)source