NDB 异步操作

在优化应用性能时,请考虑 NDB 的使用。 例如,如果应用读取的值不在缓存中,则读取需要一段时间。 您可以通过并行执行 Datastore 操作与其他操作,或者并行执行一些 Datastore 操作来加快应用的速度。

NDB 客户端库提供多种异步函数。 各种函数允许应用向 Datastore 发送请求。 函数会立即返回,并返回 Future 对象。当 Datastore 处理请求时,应用可以执行其他操作。Datastore 处理请求后,应用可以从 Future 对象获取结果。

简介

假设某个应用的请求处理程序需要使用 NDB 来写入内容,可能是为了记录请求。它还需要执行一些其他 NDB 操作,可能是为了获取一些数据。

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

将对 put() 的调用替换为对等效异步函数 put_async() 的调用,如此应用便可立即执行其他操作,而不会在 put() 上发生阻塞。

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

这允许在 Datastore 写入数据时进行其他 NDB 函数和模板渲染。从 Datastore 获取数据之前,应用不会阻塞 Datastore。

在在本示例中,调用 future.get_result 没太大意义,因为应用从不使用 NDB 的结果。该代码在此处的作用只是确保请求处理程序在 NDB put 完成前不会退出;如果请求处理程序过早退出,可能永远都不会执行到 put。为方便起见,您可以使用 @ndb.toplevel 修饰请求处理程序。这会指示处理程序在异步请求完成之前不要退出。反过来说,这允许您发送请求,而且不用担心结果。

您可以将整个 WSGIApplication 指定为 ndb.toplevel。这可确保 WSGIApplication 的每个处理程序等待所有异步请求完成后再返回。(它不会将 WSGIApplication 的所有处理程序都设置为“toplevel”。)


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

使用 toplevel 应用比使用其所有处理程序函数更为方便。但是如果一个处理程序方法使用 yield,那么该方法仍然需要封装在另一个修饰器 @ndb.synctasklet 中;否则,执行到 yield 将停止,且无法完成。

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

使用异步 API 和 Future

几乎每个同步 NDB 函数都有一个对应的 _async 函数。例如,put()put_async() 对应。异步函数的参数始终与其对应的同步函数的参数相同。 异步方法的返回值始终是一个 FutureFuture 列表(对于“多个”函数)。

Future 是一个对象,用于保持已启动但可能尚未完成的操作的状态;所有异步 API 都会返回一个或多个 Futures。您可以调用 Futureget_result() 函数,来请求其操作的结果;然后如有必要,Future 将阻止处理,直到获得结果,再将结果提供给您。get_result() 返回的值与同步 API 返回的值相同。

注意:如果您在某些其他编程语言中使用过 Future,可能以为 Future 可以直接作为结果。但这在此并不适用。 其他语言使用隐式 Future;而 NDB 使用显式 Future。调用 get_result() 即可获取 NDB Future 的结果。

如果此操作引发异常会怎样?这要看出现异常的时间。如果 NDB 在发出请求时发现问题(例如,参数类型错误),则 _async() 方法会引发异常。但如果是由 Datastore 服务器等检测到的异常,_async() 方法会返回 Future,并且在应用调用其 get_result() 时引发异常。您对此不必过于担心,因为这些最终都会顺利执行;最大的区别在于,如果打印了回溯,您会看到一些低级别的异步机器暴露出来。

例如,假设您正在编写留言板应用。 如果用户已登录,您希望显示展示最新留言板帖子的页面。 此页面还应显示用户的昵称。该应用需要两种信息:已登录用户的帐户信息和留言板帖子内容。此应用的“同步”版本可能如下所示:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

此处有两个独立的 I/O 操作:获取 Account 实体和获取最近的 Guestbook 实体。如果使用同步 API,这些操作会依次执行;我们先等待检索帐户信息,然后再获取留言板实体。但是应用不需要立即使用帐户信息。我们可以利用这一点并使用异步 API:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

此版本的代码首先创建两个 Futuresacct_futurerecent_entries_future),然后等待它们运行。服务器会并行处理这两个请求。 每次调用 _async() 函数都会创建一个 Future 对象,并向 Datastore 服务器发送一个请求。服务器可以立即开始处理请求。服务器响应可能以任意顺序返回;Future 对象会将响应与对应的请求关联起来。

同步请求不可重叠,但异步请求可以重叠。
同步与异步请求

异步版本中花费的总(实际)时间约等于所有操作中最耗时操作花费的时间。同步版本中花费的总时间超过操作时间的总和。 如果您可以并行执行更多操作,那么异步操作更实用。

要了解您的应用的查询需要多长时间或每次请求执行多少次 I/O 操作,请考虑使用 AppStats。 此工具可以根据实时应用的仪表检测显示与上图类似的图表。

使用 Tasklet

NDB tasklet 代码可与其他代码并发运行。如果您编写一个 tasklet,您的应用可以像使用异步 NDB 函数一样使用它:应用调用该 tasklet,该 tasklet 返回一个 Future;然后调用 Futureget_result() 方法获取结果。

Tasklet 用于在没有线程的情况下编写并发函数;tasklet 由事件循环执行,可以使用 yield 语句暂停自身,以阻止 I/O 或一些其他操作。阻止操作的概念抽象为 Future 类,但是 tasklet 也可能 yield 远程过程调用 (RPC),以等待该 RPC 完成。当该 tasklet 有结果时,它会 raise ndb.Return 异常;然后,NDB 会将结果与先前 yieldFuture 关联。

当您编写 NDB tasklet 时,您会以非常规方式使用 yieldraise。因此,如果您查找如何使用这些的示例,可能找不到 NDB tasklet 这样的代码。

要将函数转换为 NDB tasklet:

  • @ndb.tasklet 修饰函数,
  • 用异步数据存储区调用的 yield 替换所有同步数据存储区调用,
  • 让函数用 raise ndb.Return(retval)“返回”其返回值(如果函数不返回任何值则不需要)。

应用可以使用 tasklet 更好地控制异步 API。例如,请考虑以下架构:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

显示消息时,显示作者的昵称意义重大。 获取数据以显示消息列表的“同步”方式可能如下所示:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

遗憾的是,这种方法效率低下。如果您在 Appstats 中查看上面的代码,会看到“Get”请求是按逐个执行的。您可能会看到以下“阶梯”模式。

同步“Get”逐个执行
同步“Get”逐个执行。

如果上述“Get”可以重叠,那么本部分程序的执行速度会更快。 您可能会重写上面的代码以使用 get_async,但很难跟踪哪些异步请求与哪些消息相关联。

应用可以通过将“异步”函数作为 tasklet 来定义该“异步”函数。 这让您能够以不易混淆的方式组织代码。

此外,该函数应使用 acct = yield key.get_async(),而非 acct = key.get()acct = key.get_async().get_result()。此 yield 指示 NDB 此处适合暂停此 tasklet 并让其他 tasklet 运行。

@ndb.tasklet 修饰生成器函数使函数返回 Future 而非生成器对象。在 tasklet 中,Future 的任何 yield 都会等待并返回 Future 的结果。

例如:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

请注意,尽管 get_async() 返回 Future,但 tasklet 框架会使 yield 表达式将 Future 的结果返回给变量 acct

map() 多次调用 callback()。 但是 callback() 中的 yield ..._async() 允许 NDB 的调度器在等待任何异步请求完成之前发送多个异步请求。

重叠异步“Get”
重叠异步“Get”

如果您在 AppStats 中查看上面的代码,可能会惊讶地发现多个“Get”不只是重叠—它们均在同一请求中完成。NDB 实施了“自动批处理程序”。自动批处理程序将多个请求捆绑到针对服务器的单个批处理 RPC;其执行该操作的方式为:只要还有更多工作要做(可能运行另一个 callback),它就会收集键。只要需要其中一个结果,自动批处理程序就会发送批处理 RPC。与大多数请求不同,查询不是“批量处理”。

当 tasklet 运行时,它会从生成 tasklet 时的默认命名空间或运行时 tasklet 为其更改的命名空间获取默认命名空间。换言之,默认命名空间与上下文不关联,也不存储在上下文中,并且更改一个 tasklet 中的默认命名空间不会影响其他 tasklet 中的默认命名空间,该 tasklet 生成的除外。

Tasklet、并行查询、并行 Yield

您可以使用 tasklet,方便多个查询同时获取记录。 例如,假设您的应用有一个页面显示购物车内容和特别优惠列表。 架构可能如下所示:

class Account(ndb.Model):
    pass

class InventoryItem(ndb.Model):
    name = ndb.StringProperty()

class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()

class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

获取购物车商品和特别优惠的“同步”函数可能如下所示:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

本示例使用查询来获取购物车商品和优惠列表;然后使用 get_multi() 获取有关库存商品的详细信息。(此函数不直接使用 get_multi() 的返回值。它会调用 get_multi() 将所有库存详细信息提取到缓存中,以便稍后快速读取。)get_multi 将多个“Get”合并到一个请求中。但查询会逐个获取结果。要同时获取查询结果,可将两个查询重叠:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

get_multi() 调用仍然是独立进行的:它取决于查询结果,因此您无法将其与查询合并。

假设此应用有时需要购物车,有时需要优惠,有时两者都需要。您希望组织代码,编写一个获取购车的函数和一个获取优惠的函数。如果您的应用同时调用这些函数,理想情况下其查询可以“重叠”。 要执行此操作,请将这些函数设置为 tasklet:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)

@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)

@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

yield xy 很重要,但很容易被忽视。如果这是两个单独的 yield 语句,则将逐个执行。但是 yield tasklet 元组为并行的 yield:tasklet 可以并行运行,yield 会等待所有 tasklet 完成,然后返回结果。(在某些编程语言中,这称为栅栏。)

当您将一段代码转换为 tasklet 后,您可能会迫不及待地进行更多转换。如果您发现可以与 tasklet 并行运行的“同步”代码,那么最好也将该代码转换为 tasklet。 然后,您可以用并行 yield 并行执行该代码。

如果您编写请求函数(webapp2 请求函数、Django view 函数等)作为 tasklet,函数无法按你的期望运行:该函数会生成结果,但之后会停止运行。在这种情况下,您需要使用 @ndb.synctasklet 修饰该函数。@ndb.synctasklet@ndb.tasklet 类似,但更改为在 tasklet 上调用 get_result()。这会将您的 tasklet 转换为以常规方式返回结果的函数。

Tasklet 中的查询迭代器

要在 tasklet 中循环访问查询结果,请使用以下模式:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

这是 tasklet 友好型代码,等效于以下代码:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

第一个版本中的三行粗体是 tasklet 友好型代码,等效于第二个版本中的单行粗体代码。 Tasklet 只能在 yield 关键字处暂停。 无 yield 的 for 循环不允许其他 tasklet 运行。

您可能想知道为什么此代码使用查询迭代器而不用 qry.fetch_async() 获取所有实体。该应用可能有很多实体,而这些实体不适合 RAM。也许您正在寻找一个实体,一旦找到就可以停止循环访问;但是仅使用查询语言无法表达搜索条件。您可以使用迭代器加载要检查的实体,然后在找到所需内容时终止循环。

使用 NDB 执行异步 Urlfetch

NDB Context 有一个异步 urlfetch() 函数,该函数可以很好地与 NDB tasklet 并行执行,例如:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

网址提取服务有自己的异步请求 API。虽然也很实用,但有时无法很好地与 NDB tasklet 一起使用。

使用异步事务

事务也可以采用异步方式完成。您可以将现有函数传递给 ndb.transaction_async(),或使用 @ndb.transactional_async 修饰器。与其他异步函数一样,这将返回 NDB Future

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

事务也可以用 tasklet 完成。例如,我们可以在等待阻止 RPC 时将 update_counter 代码更改为 yield

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

使用 Future.wait_any()

有时您希望创建多个异步请求,并在第一个请求完成时返回结果。 您可以使用 ndb.Future.wait_any() 类方法完成该操作:

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

遗憾的是,上面的代码无法方便地转换为 tasklet;并行 yield 会等待所有 Future 完成,包括那些您不想等待的 Future。