NDB 异步操作

在优化应用性能时,请考虑 NDB 的使用。 例如,如果应用读取的值不在缓存中,则读取需要一段时间。 您可以通过并行执行 Datastore 操作与其他操作,或者并行执行一些 Datastore 操作来加快应用的速度。

NDB 客户端库提供多种异步函数。 各种函数允许应用向 Datastore 发送请求。 函数会立即返回,并返回 Future 对象。当 Datastore 处理请求时,应用可以执行其他操作。Datastore 处理请求后,应用可以从 Future 对象获取结果。

使用异步 API 和 Future

几乎每个同步 NDB 函数都有一个对应的 _async 函数。例如,put()put_async() 对应。异步函数的参数始终与其对应的同步函数的参数相同。 异步方法的返回值始终是一个 FutureFuture 列表(对于“多个”函数)。

Future 是一个对象,用于保持已启动但可能尚未完成的操作的状态;所有异步 API 都会返回一个或多个 Futures。您可以调用 Futureget_result() 函数,来请求其操作的结果;然后如有必要,Future 将阻止处理,直到获得结果,再将结果提供给您。get_result() 返回的值与同步 API 返回的值相同。

注意:如果您在某些其他编程语言中使用过 Future,可能以为 Future 可以直接作为结果。但这在此并不适用。 其他语言使用隐式 Future;而 NDB 使用显式 Future。调用 get_result() 即可获取 NDB Future 的结果。

如果此操作引发异常会怎样?这要看出现异常的时间。如果 NDB 在发出请求时发现问题(例如,参数类型错误),则 _async() 方法会引发异常。但如果是由 Datastore 服务器等检测到的异常,_async() 方法会返回 Future,并且在应用调用其 get_result() 时引发异常。您对此不必过于担心,因为这些最终都会顺利执行;最大的区别在于,如果打印了回溯,您会看到一些低级别的异步机器暴露出来。

例如,假设您正在编写留言板应用。 如果用户已登录,您希望显示展示最新留言板帖子的页面。 此页面还应显示用户的昵称。该应用需要两种信息:已登录用户的账户信息和留言板帖子内容。此应用的“同步”版本可能如下所示:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

此处有两个独立的 I/O 操作:获取 Account 实体和获取最近的 Guestbook 实体。如果使用同步 API,这些操作会依次执行;我们先等待检索账户信息,然后再获取留言板实体。但是应用不需要立即使用账户信息。我们可以利用这一点并使用异步 API:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

此版本的代码首先创建两个 Futuresacct_futurerecent_entries_future),然后等待它们运行。服务器会并行处理这两个请求。 每次调用 _async() 函数都会创建一个 Future 对象,并向 Datastore 服务器发送一个请求。服务器可以立即开始处理请求。服务器响应可能以任意顺序返回;Future 对象会将响应与对应的请求关联起来。

同步请求不可重叠,但异步请求可以重叠。
同步与异步请求

异步版本中花费的总(实际)时间约等于所有操作中最耗时操作花费的时间。同步版本中花费的总时间超过操作时间的总和。 如果您可以并行执行更多操作,那么异步操作更实用。

要了解您的应用的查询需要多长时间或每次请求执行多少次 I/O 操作,请考虑使用 AppStats。 此工具可以根据实时应用的仪表检测显示与上图类似的图表。

使用 Tasklet

NDB tasklet tasklet代码可与其他代码并发运行。如果您编写一个 tasklet,您的应用可以像使用异步 NDB 函数一样使用它:应用调用该 tasklet,该 tasklet 返回一个 Future;然后调用 Futureget_result() 方法获取结果。

Tasklet 用于在没有线程的情况下编写并发函数;tasklet 由事件循环执行,可以使用 yield 语句暂停自身,以阻止 I/O 或一些其他操作。阻止操作的概念抽象为 Future 类,但是 tasklet 也可能 yield 远程过程调用 (RPC),以等待该 RPC 完成。当该 tasklet 有结果时,它会 raise ndb.Return 异常;然后,NDB 会将结果与先前 yieldFuture 关联。

当您编写 NDB tasklet 时,您会以非常规方式使用 yieldraise。因此,如果您查找如何使用这些的示例,可能找不到 NDB tasklet 这样的代码。

要将函数转换为 NDB tasklet:

  • @ndb.tasklet 修饰函数,
  • 用异步数据存储区调用的 yield 替换所有同步数据存储区调用,
  • 让函数用 raise ndb.Return(retval)“返回”其返回值(如果函数不返回任何值则不需要)。

应用可以使用 tasklet 更好地控制异步 API。例如,请考虑以下架构:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

显示消息时,显示作者的昵称意义重大。 获取数据以显示消息列表的“同步”方式可能如下所示:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

遗憾的是,这种方法效率低下。如果您在 Appstats 中查看上面的代码,会看到“Get”请求是按逐个执行的。您可能会看到以下“阶梯”模式。

同步“Get”逐个执行
同步“Get”逐个执行。

如果上述“Get”可以重叠,那么本部分程序的执行速度会更快。 您可能会重写上面的代码以使用 get_async,但很难跟踪哪些异步请求与哪些消息相关联。

应用可以通过将“异步”函数作为 tasklet 来定义该“异步”函数。 这让您能够以不易混淆的方式组织代码。

此外,该函数应使用 acct = yield key.get_async(),而非 acct = key.get()acct = key.get_async().get_result()。此 yield 指示 NDB 此处适合暂停此 tasklet 并让其他 tasklet 运行。

@ndb.tasklet 修饰生成器函数使函数返回 Future 而非生成器对象。在 tasklet 中,Future 的任何 yield 都会等待并返回 Future 的结果。

例如:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

请注意,尽管 get_async() 返回 Future,但 tasklet 框架会使 yield 表达式将 Future 的结果返回给变量 acct

map() 多次调用 callback()。 但是 callback() 中的 yield ..._async() 允许 NDB 的调度器在等待任何异步请求完成之前发送多个异步请求。

重叠异步“Get”
重叠异步“Get”

如果您在 AppStats 中查看上面的代码,可能会惊讶地发现多个“Get”不只是重叠—它们均在同一请求中完成。NDB 实施了“自动批处理程序”autobatcher。自动批处理程序将多个请求捆绑到针对服务器的单个批处理 RPC;其执行该操作的方式为:只要还有更多工作要做(可能运行另一个 callback),它就会收集键。只要需要其中一个结果,自动批处理程序就会发送批处理 RPC。与大多数请求不同,查询不是“批量处理”。

当 tasklet 运行时,它会从生成 tasklet 时的默认命名空间或运行时 tasklet 为其更改的命名空间获取默认命名空间。换言之,默认命名空间与上下文不关联,也不存储在上下文中,并且更改一个 tasklet 中的默认命名空间不会影响其他 tasklet 中的默认命名空间,该 tasklet 生成的除外。

Tasklet、并行查询、并行 Yield

您可以使用 tasklet,方便多个查询同时获取记录。 例如,假设您的应用有一个页面显示购物车内容和特别优惠列表。 架构可能如下所示:

class Account(ndb.Model):
    pass


class InventoryItem(ndb.Model):
    name = ndb.StringProperty()


class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()


class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

获取购物车商品和特别优惠的“同步”函数可能如下所示:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

本示例使用查询来获取购物车商品和优惠列表;然后使用 get_multi() 获取有关库存商品的详细信息。(此函数不直接使用 get_multi() 的返回值。它会调用 get_multi() 将所有库存详细信息提取到缓存中,以便稍后快速读取。)get_multi 将多个“Get”合并到一个请求中。但查询会逐个获取结果。要同时获取查询结果,可将两个查询重叠:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

get_multi() 调用仍然是独立进行的:它取决于查询结果,因此您无法将其与查询合并。

假设此应用有时需要购物车,有时需要优惠,有时两者都需要。您希望组织代码,编写一个获取购车的函数和一个获取优惠的函数。如果您的应用同时调用这些函数,理想情况下其查询可以“重叠”。 要执行此操作,请将这些函数设置为 tasklet:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)


@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)


@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

yield xy 很重要,但很容易被忽视。如果这是两个单独的 yield 语句,则将逐个执行。但是 yield tasklet 元组为并行的 yield:tasklet 可以并行运行,yield 会等待所有 tasklet 完成,然后返回结果。(在某些编程语言中,这称为栅栏。)

当您将一段代码转换为 tasklet 后,您可能会迫不及待地进行更多转换。如果您发现可以与 tasklet 并行运行的“同步”代码,那么最好也将该代码转换为 tasklet。 然后,您可以用并行 yield 并行执行该代码。

如果您编写请求函数(webapp2 请求函数、Django view 函数等)作为 tasklet,函数无法按你的期望运行:该函数会生成结果,但之后会停止运行。在这种情况下,您需要使用 @ndb.synctasklet 修饰该函数。@ndb.synctasklet@ndb.tasklet 类似,但更改为在 tasklet 上调用 get_result()。这会将您的 tasklet 转换为以常规方式返回结果的函数。

Tasklet 中的查询迭代器

要在 tasklet 中循环访问查询结果,请使用以下模式:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

这是 tasklet 友好型代码,等效于以下代码:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

第一个版本中的三行粗体是 tasklet 友好型代码,等效于第二个版本中的单行粗体代码。 Tasklet 只能在 yield 关键字处暂停。 无 yield 的 for 循环不允许其他 tasklet 运行。

您可能想知道为什么此代码使用查询迭代器而不用 qry.fetch_async() 获取所有实体。该应用可能有很多实体,而这些实体不适合 RAM。也许您正在寻找一个实体,一旦找到就可以停止循环访问;但是仅使用查询语言无法表达搜索条件。您可以使用迭代器加载要检查的实体,然后在找到所需内容时终止循环。

使用 NDB 执行异步 Urlfetch

NDB Context 有一个异步 urlfetch() 函数,该函数可以很好地与 NDB tasklet 并行执行,例如:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

网址提取服务有自己的异步请求 API。虽然也很实用,但有时无法很好地与 NDB tasklet 一起使用。

使用异步事务

事务也可以采用异步方式完成。您可以将现有函数传递给 ndb.transaction_async(),或使用 @ndb.transactional_async 修饰器。与其他异步函数一样,这将返回 NDB Future

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

事务也可以用 tasklet 完成。例如,我们可以在等待阻止 RPC 时将 update_counter 代码更改为 yield

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

使用 Future.wait_any()

有时您希望创建多个异步请求,并在第一个请求完成时返回结果。 您可以使用 ndb.Future.wait_any() 类方法完成该操作:

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

遗憾的是,上面的代码无法方便地转换为 tasklet;并行 yield 会等待所有 Future 完成,包括那些您不想等待的 Future。