google.appengine.ext.ndb.tasklets 模块

摘要

tasklet 修饰器。

Tasklet 用于在没有线程的情况下编写并发运行的函数;tasklet 由事件循环执行,可以使用 yield 语句暂停自身,以阻止 I/O 或一些其他操作。阻止操作的概念抽象为 Future 类,但是 tasklet 也可能产生远程过程调用 (RPC),以等待该 RPC 完成。

@tasklet 修饰器封装生成器函数,以便在调用它时,在事件循环执行生成器时返回 Future。在 tasklet 中,Future 的任何 yield 都会等待并返回 Future 的结果。例如:

@tasklet
def foo():
  a = yield <some Future>
  b = yield <another Future>
  raise Return(a + b)

def main():
  f = foo()
  x = f.get_result()
  print x

请注意,使用 get_result() 获取 Future 的结果之前执行阻塞的效率有些低下(但不是很显著,它不会忙于等待)。在大多数情况下,应将此类代码重写为 tasklet:

@tasklet
def main_tasklet():
  f = foo()
  x = yield f
  print x

调用 tasklet 会使用事件循环自动安排该任务:

def main():
  f = main_tasklet()
  eventloop.run()  # Run until no tasklets left to do
  f.done()  # Returns True

作为一项特殊功能,如果封装的函数不是生成器函数,则其返回值通过 Future 返回。这使得以下两者是等效的:

@tasklet
def foo():
  return 42

@tasklet
def foo():
  if False: yield  # The presence of 'yield' makes foo a generator
  raise Return(42)  # Or, after PEP 380, return 42

如果您要实现接受 tasklet 的接口但不需要暂停,那么这个功能(受 Monocle 启发)非常好用。您无需插入虚拟 yield 即可使 tasklet 成为生成器。

目录

google.appengine.ext.ndb.tasklets.Return

StopIteration 的别名

google.appengine.ext.ndb.tasklets.tasklet(func)源代码
google.appengine.ext.ndb.tasklets.synctasklet(func)源代码

调用时将函数作为 tasklet 运行的修饰器。

使用它可封装将由某个 Web 应用框架(例如 Django 视图函数或 webapp.RequestHandler.get 方法)调用的请求处理程序函数。

google.appengine.ext.ndb.tasklets.toplevel(func)源代码

用于设置全新默认上下文的同步 tasklet。

将其用于顶级视图函数(例如 webapp.RequestHandler.get())或 Django 视图函数。

google.appengine.ext.ndb.tasklets.sleep(dt)源代码

用于休眠一段时间的公共函数。

示例

yield tasklets.sleep(0.5) # 休眠半秒。

google.appengine.ext.ndb.tasklets.add_flow_exception(exc)源代码

添加不应记录的异常。

该参数必须是 Exception 的子类。

google.appengine.ext.ndb.tasklets.get_return_value(err)源代码
google.appengine.ext.ndb.tasklets.get_context()源代码
google.appengine.ext.ndb.tasklets.set_context(new_context)源代码
google.appengine.ext.ndb.tasklets.make_default_context()源代码
google.appengine.ext.ndb.tasklets.make_context(*args, **kwds)源代码
class google.appengine.ext.ndb.tasklets.Future(info=None)源代码

代码库:对象

一个 Future 有 0 个或 0 个以上的回调。

结果准备就绪后,系统会调用回调。

注意:这受到一些启发,但不符合 PEP 3148 定义的 Future 接口。它还受到 App Engine 特定 UserRPC 和 MultiRpc 类的启发(并试图在某种程度上兼容)。

FINISHING = 2
IDLE = 0
RUNNING = 1
add_callback(callback, *args, **kwds)源代码
add_immediate_callback(callback, *args, **kwds)源代码
check_success()源代码
done()源代码
dump()源代码
dump_stack()源代码
get_exception()源代码
get_result()源代码
get_traceback()源代码
set_exception(exc, tb=None)源代码
set_result(result)源代码
state
wait()源代码
classmethod wait_all(futures)源代码
classmethod wait_any(futures)源代码
class google.appengine.ext.ndb.tasklets.MultiFuture(info=None)源代码

基类:google.appengine.ext.ndb.tasklets.Future

依赖于多个其他 Future 的一个 Future。

它由“v1, v2, … = yield f1, f2, …”在内部使用;语义(例如错误处理)受该用例的限制。

调用方 POV 中的协议为:

mf = MultiFuture()
mf.add_dependent(<some other Future>)  -OR- mf.putq(<some value>)
mf.add_dependent(<some other Future>)  -OR- mf.putq(<some value>)
  .
  . (More mf.add_dependent() and/or mf.putq() calls)
  .
mf.complete()  # No more dependents will be added.
  .
  . (Time passes)
  .
results = mf.get_result()

现在,results 是所有作为依赖项的 Future 中按照添加顺序显示的结果列表。

多次添加相同的依赖项属于合法做法。

您可以随时添加回调。

作为依赖项的 Future POV 无需执行任何操作:系统会自动向每个作为依赖项的 Future 添加回调,这将向 MultiFuture 发出完成的信号。

错误处理:如果任何作为依赖项的 future 引发错误,则它会传播到 mf。要强制引发早期错误,您可以调用 mf.set_exception() 而非 mf.complete()。在此之后,您将无法再调用 mf.add_dependenciesent() 或 mf.putq()。

add_dependent(fut)源代码
complete()源代码
putq(value)源代码
set_exception(exc, tb=None)源代码
class google.appengine.ext.ndb.tasklets.QueueFuture(info=None)源代码

基类:google.appengine.ext.ndb.tasklets.Future

与 MultiFuture 遵循相同协议的队列。

不过,您无需将结果作为列表返回,而是可以在结果准备就绪后立即使用 getq() 检索结果。当最后一个结果准备就绪时(无论是否被检索),Future 本身的完成结果为 None。

getq() 方法会返回一个 Future,其会在下一个结果准备就绪之前阻塞,然后返回该结果。每次 getq() 调用都会检索一个唯一结果。最后一个结果之后的额外 getq() 调用已返回 EOFError 作为其 Future 的异常。(即 q.getq() 像往常一样返回 Future,但产生该 Future 会引发 EOFError。)

注意:值也可以通过 .putq(value) 直接推送。但是,没有流控制;如果生产方比使用方快,则队列将无限增长。

add_dependent(fut)源代码
complete()源代码
getq()源代码
putq(value)源代码
set_exception(exc, tb=None)源代码
class google.appengine.ext.ndb.tasklets.SerialQueueFuture(info=None)源代码

基类:google.appengine.ext.ndb.tasklets.Future

此方法与 QueueFuture 类似,但它会保持插入顺序。

此类供查询操作使用。

不变量:

  • _queue 和 _waiting 中至少有一个为空。

  • _waiting 中的 Future 始终处于待处理状态。

(_queue 中的 Future 可能处于待处理或已完成状态。)

在下面的讨论中,add_dependent() 的处理方式与 putq() 相同。

如果 putq() 放在 getq() 之前,则情况如下:

putq() v

_queue: [f1, f2, …]; _waiting: [] ^ getq()

此处,putq() 会将一个 Future 附加到 _queue 的右侧,并且 getq() 会从左侧移除一个 Future。

如果 getq() 放在 putq() 之前,则情况如下:

putq() v

_queue: []; _waiting: [f1, f2, …]

^ getq()

此处,putq() 会从 _waiting 的左侧移除一个 Future,并且 getq() 会在右侧附加一个 Future。

当两者都为空时,putq() 会将一个 Future 附加到 _queue 的右侧,并且 getq() 会在 _waiting 的右侧附加一个 Future。

_full 标志表示不再进行 putq() 调用;可通过调用 complete() 或 set_exception() 来设置此标志。

调用 complete() 表示不再进行 putq() 调用。 如果 getq() 滞后,后续的 getq() 调用将消耗 _queue,直到其为空,之后便会返回用于传递 EOFError 的 Future(请注意,getq() 本身绝不会引发 EOFError)。如果在调用 complete() 时 getq() 提前,则 _waiting 中的 Future 都将被传递一个 EOFError 异常(因此会消耗 _waiting)。

如果调用了 set_exception(),而不是 complete(),则系统将使用该处设置的异常和 traceback,而不是引发 EOFError。

add_dependent(fut)源代码
complete()源代码
getq()源代码
putq(value)源代码
set_exception(exc, tb=None)源代码
类 google.appengine.ext.ndb.tasklets.ReducingFuture(reducer, info=None, batch_size=20)源代码

基类:google.appengine.ext.ndb.tasklets.Future

与 MultiFuture 遵循相同协议的队列。

但是,结果(并非相关 Future 的结果列表)将通过调用“缩减器”tasklet 计算得出。缩减器 tasklet 接受值列表并返回单个值。可以对值的子列表多次调用此操作,其行为应类似于 sum()。

注意:与缩减器输入值添加到队列的顺序相比,系统可能会对其重新排序。

add_dependent(fut)源代码
complete()源代码
putq(value)源代码
set_exception(exc, tb=None)源代码