Async Datastore API

通过 Async Datastore API,您可以对数据存储区执行并行非阻塞调用,并在稍后处理请求时检索这些调用的结果。本文档介绍了 Async Datastore API 的以下几个方面:

使用异步数据存储区服务

借助 Async Datastore API,您可以使用 AsyncDatastoreService 接口的方法调用数据存储区。您可以通过调用 DatastoreServiceFactory 类的 getAsyncDatastoreService() 类方法来获取此对象。

import com.google.appengine.api.datastore.AsyncDatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;

// ...
AsyncDatastoreService datastore = DatastoreServiceFactory.getAsyncDatastoreService();

AsyncDatastoreService 支持的操作与 DatastoreService 相同,但大多数方法会立即返回一个 Future,您可以在后面某个时候在其结果上加以阻塞。例如,DatastoreService.get() 会返回一个实体,但 AsyncDatastoreService.get() 会返回一个 Future<Entity>

// ...

Key key = KeyFactory.createKey("Employee", "Max");
// Async call returns immediately
Future<Entity> entityFuture = datastore.get(key);

// Do other stuff while the get operation runs in the background...

// Blocks if the get operation has not finished, otherwise returns instantly
Entity entity = entityFuture.get();

注意:如果您尚未调用 get() 方法,则不会抛出异常。调用此方法可以验证异步操作是否已成功执行。

如果您有一个 AsyncDatastoreService,但需要同步执行一个操作,则可以调用相应的 AsyncDatastoreService 方法,然后立即阻塞结果:

// ...

Entity entity = new Employee("Employee", "Alfred");
// ... populate entity properties

// Make a sync call via the async interface
Key key = datastore.put(key).get();

使用异步事务

与同步调用一样,Async Datastore API 调用也可以参与事务。下面是一个函数,该函数调整 Employee 的薪金,并在 Employee 所在的实体组中写入一个额外的 SalaryAdjustment 实体,所有这些都在单个事务中完成。

void giveRaise(AsyncDatastoreService datastore, Key employeeKey, long raiseAmount)
        throws Exception {
    Future<Transaction> txn = datastore.beginTransaction();

    // Async call to lookup the Employee entity
    Future<Entity> employeeEntityFuture = datastore.get(employeeKey);

    // Create and put a SalaryAdjustment entity in parallel with the lookup
    Entity adjustmentEntity = new Entity("SalaryAdjustment", employeeKey);
    adjustmentEntity.setProperty("adjustment", raiseAmount);
    adjustmentEntity.setProperty("adjustmentDate", new Date());
    datastore.put(adjustmentEntity);

    // Fetch the result of our lookup to make the salary adjustment
    Entity employeeEntity = employeeEntityFuture.get();
    long salary = (Long) employeeEntity.getProperty("salary");
    employeeEntity.setProperty("salary", salary + raiseAmount);

    // Re-put the Employee entity with the adjusted salary.
    datastore.put(employeeEntity);
    txn.get().commit(); // could also call txn.get().commitAsync() here
}

此示例显示了不使用事务的异步调用与使用事务的异步调用之间的重大区别。在不使用事务时,确保单个异步调用完成的唯一方法是提取进行该调用时返回的 Future 值。使用事务时,调用 Transaction.commit() 会在提交事务之前,在自事务启动以来所有异步调用的结果上加以阻塞。

因此,在上面的示例中,调用 commit() 时插入 SalaryAdjustment 实体的异步调用可能仍然未完成,但如果插入未完成,提交将不会执行。同样地,如果您选择调用 commitAsync() 而不是 commit(),则在所有未完成的异步调用完成之前,对 commitAsync() 返回的 Future 调用 get() 会发生阻塞。

注意:事务与特定的线程相关联,而不是与 DatastoreServiceAsyncDatastoreService 的特定实例相关联。也就是说,如果您使用 DatastoreService 启动事务,并使用 AsyncDatastoreService 执行异步调用,则该异步调用将参与该事务。或者,简言之,DatastoreService.getCurrentTransaction()AsyncDatastoreService.getCurrentTransaction() 总是返回相同的 Transaction

使用 Future

Future Javadoc 介绍了要成功使用 Async Datastore API 返回的 Future 需要了解的大部分内容,但您需要注意以下 App Engine 专有事项:

异步查询

目前,我们没有提供用于查询的显式异步 API。但是,当您调用 PreparedQuery.asIterable()PreparedQuery.asIterator()PreparedQuery.asList(FetchOptions fetchOptions) 时,DatastoreService 和 AsyncDatastoreService 都会立即返回并异步预取结果。这样,应用可以在提取查询结果的同时并行执行任务。

// ...

Query q1 = new Query("Salesperson");
q1.setFilter(new FilterPredicate("dateOfHire", FilterOperator.LESS_THAN, oneMonthAgo));

// Returns instantly, query is executing in the background.
Iterable<Entity> recentHires = datastore.prepare(q1).asIterable();

Query q2 = new Query("Customer");
q2.setFilter(new FilterPredicate("lastContact", FilterOperator.GREATER_THAN, oneYearAgo));

// Also returns instantly, query is executing in the background.
Iterable<Entity> needsFollowup = datastore.prepare(q2).asIterable();

schedulePhoneCall(recentHires, needsFollowUp);

何时使用异步数据存储区调用

DatastoreService 接口公开的操作是同步的。例如,在调用 DatastoreService.get() 时,代码将阻塞,直到对数据存储区的调用完成。如果应用只需要以 HTML 形式呈现 get() 的结果,则在调用完成之前,执行阻塞操作是一个合理选择。但是,如果您的应用需要 get() 的结果以及 Query 的结果来呈现响应,并且 get()Query 没有任何数据依赖项,那么等待 get() 完成后再启动 Query 会浪费时间。下面是可以通过使用异步 API 获得改进一些代码的示例:

DatastoreService datastore = DatastoreServiceFactory.getDatastoreService();
Key empKey = KeyFactory.createKey("Employee", "Max");

// Read employee data from the Datastore
Entity employee = datastore.get(empKey); // Blocking for no good reason!

// Fetch payment history
Query query = new Query("PaymentHistory");
PreparedQuery pq = datastore.prepare(query);
List<Entity> result = pq.asList(FetchOptions.Builder.withLimit(10));
renderHtml(employee, result);

您可以使用 AsyncDatastoreService 的实例异步执行调用,而无需等待 get() 完成:

AsyncDatastoreService datastore = DatastoreServiceFactory.getAsyncDatastoreService();
Key empKey = KeyFactory.createKey("Employee", "Max");

// Read employee data from the Datastore
Future<Entity> employeeFuture = datastore.get(empKey); // Returns immediately!

// Fetch payment history for the employee
Query query = new Query("PaymentHistory", empKey);
PreparedQuery pq = datastore.prepare(query);

// Run the query while the employee is being fetched
List<Entity> result = pq.asList(FetchOptions.Builder.withLimit(10));
// Implicitly performs query asynchronously
Entity employee = employeeFuture.get(); // Blocking!
renderHtml(employee, result); 

此代码的同步和异步版本都使用类似数量的 CPU 资源(毕竟它们都执行相同的工作量),但由于异步版本允许两项数据存储区操作并行执行,因此异步版本的延迟时间较短。一般来说,如果需要执行多项没有任何数据依赖关系的数据存储区操作,则 AsyncDatastoreService 可以显著缩短延迟时间。