Async Datastore API (Python)

注意强烈建议构建新应用的开发者使用 NDB 客户端库,它与 DB 客户端库相比具有多项优势,例如可通过 Memcache API 进行自动实体缓存。如果您当前使用的是较早的 DB 客户端库,请参阅 DB 到 NDB 的迁移指南

通过 Async Datastore API,您可以对数据存储区执行并行非阻塞调用,并在稍后处理请求时检索这些调用的结果。本文档介绍了 Async Datastore API 的以下几个方面:

使用异步数据存储区服务

通过 Async Datastore API,您可以使用 *_async 形式的方法(例如 google.appengine.ext.db 软件包中的 get_async())进行数据存储区调用。以下代码示例演示了一些使用异步方法进行的简单数据存储区操作:

from google.appengine.ext import db

get_future = db.get_async(key)
put_future = db.put_async(model)
delete_future = db.delete_async(key)
allocate_ids_future = db.allocate_ids_async(key, 10)

这些函数执行的操作和同步函数相同,只是这些函数会立即返回异步对象,供您稍后用于获取真实结果。以下代码示例演示了如何使用 get_result() 获取异步结果:

entity = get_future.get_result()
key = put_future.get_result()
range = allocate_ids_future.get_result()

# Wait for the operation to complete without returning a value.
# Exceptions that occurred in the call are thrown here. Calling
# get_result() allows you to verify that the deletion succeeded.
delete_future.get_result()

注意:在调用 get_result() 之前,系统不会抛出异常。通过调用此方法,您可以验证异步操作是否成功。

使用异步事务

与同步调用一样,Async Datastore API 调用也可以参与事务。下面是一个函数,该函数调整 Employee 的薪金,并在 Employee 所在的实体组中写入一个额外的 SalaryAdjustment 实体,所有这些都在单个事务中完成。

def adjust_salary(employee_key, raise_ammount):
   def runner():
        # Async call to lookup the Employee entity
        employee_entity_future = db.get_async(employee_key)

        # Create and put a SalaryAdjustment entity in parallel with the lookup
        adjustment_entity = SalaryAdjustment(parent=employeeKey)
        adjustment_entity.adjustment = raise_amount
        db.put_async(adjustmentEntity)

        # Fetch the result of our lookup to make the salary adjustment
        employee_entity = employee_entity_future.get_result()
        employee_entity.salary += raise_amount

        # Re-put the Employee entity with the adjusted salary.
        db.put_async(employee_entity)
    db.run_in_transaction(runner)

此示例说明了不使用事务的异步调用与使用事务的异步调用之间的重要区别。如果您不使用事务,则确保单个异步调用已完成的唯一方法就是使用事务来提取异步对象的结果。提交该事务将阻塞在事务内执行的所有异步调用的结果。

因此,在上述示例中,即使在 runner() 完成时插入 SalaryAdjustment 实体的异步调用仍可能未完成,也无法执行提交操作,除非插入完成。

异步查询

目前,我们没有提供用于查询的显式异步 API。但是,当您调用 Query.run() 时,查询会立即返回并以异步方式预取结果。这样,应用可以在提取查询结果的同时并行执行任务。

# ...

q1 = Salesperson.all().filter('date_of_hire <', one_month_ago)

# Returns instantly, query is executing in the background.
recent_hires = q1.run()

q2 = Customer.all().filter('last_contact >', one_year_ago)

# Also returns instantly, query is executing in the background.
needs_followup = q2.run()

schedule_phone_call(recent_hires, needs_followUp)

遗憾的是,Query.fetch() 不具有相同的行为。

何时使用异步数据存储区调用

如果您调用同步 google.ext.db 方法(例如 db.get()),则代码会阻塞,直到对数据存储区的调用完成。如果应用只需要以 HTML 形式呈现 get() 的结果,则在调用完成之前,执行阻塞操作是一个合理选择。然而,如果您的应用需要 get() 的结果以及 Query 的结果来呈现响应,而且 get() 和 Query 又没有任何数据依赖关系,则等待 get() 完成后再发起 Query 只是浪费时间。下面举例说明了可以通过使用异步 API 进行改进的一些代码:

# Read employee data from the Datastore
employee = Employee.get_by_key_name('Max')  # Blocking for no good reason!

# Fetch payment history
query = PaymentHistory.all().ancestor(employee.key())
history = query.fetch(10)
render_html(employee, history)

使用 db.get_async() 来异步执行调用,而不是等待 get() 完成:

employee_key = db.Key.from_path(Employee.kind(), 'Max')

# Read employee data from the Datastore
employee_future = db.get_async(employee_key)  # Returns immediately!

# Fetch payment history
query = PaymentHistory.all().ancestor(employee_key)

# Implicitly performs query asynchronously
history_itr = query.run(config=datastore_query.QueryOptions(limit=10))
employee = employee_future.get_result()
render_html(employee, history_itr)

此代码的同步和异步版本都使用类似数量的 CPU 资源(毕竟它们都执行相同的工作量),但由于异步版本允许两项数据存储区操作并行执行,因此异步版本的延迟时间较短。一般来说,如果需要执行多项没有任何数据依赖关系的数据存储区操作,则异步 API 可以显著缩短延迟时间。