Async Datastore API

Async Datastore API を使用すると、データストアに対して並列の非ブロック化呼び出しを行い、後でリクエストの処理中にこれらの呼び出しに対する結果を取得できます。このドキュメントでは、Async Datastore API の次の点について説明します。

非同期データストア サービスの操作

Async Datastore API を使用する場合、AsyncDatastoreService インターフェースのメソッドによりデータストア呼び出しを行います。このオブジェクトを取得するには、DatastoreServiceFactory クラスの getAsyncDatastoreService() クラスメソッドを呼び出します。

import com.google.appengine.api.datastore.AsyncDatastoreService;
import com.google.appengine.api.datastore.DatastoreServiceFactory;

// ...
AsyncDatastoreService datastore = DatastoreServiceFactory.getAsyncDatastoreService();

AsyncDatastoreService では、DatastoreService と同じオペレーションがサポートされますが、ほとんどのメソッドで、後で結果をブロックできる Future が直ちに返されます。たとえば、DatastoreService.get()Entity を返しますが、AsyncDatastoreService.get()Future<Entity> を返します。

// ...

Key key = KeyFactory.createKey("Employee", "Max");
// Async call returns immediately
Future<Entity> entityFuture = datastore.get(key);

// Do other stuff while the get operation runs in the background...

// Blocks if the get operation has not finished, otherwise returns instantly
Entity entity = entityFuture.get();

注: get() メソッドを呼び出すまで例外はスローされません。このメソッドを呼び出すと、非同期処理が正常に完了したことを確認できます。

AsyncDatastoreService があるときに、なんらかの操作を同期して実行する必要がある場合、該当する AsyncDatastoreService メソッドを呼び出して、結果を即時にブロックします。

// ...

Entity entity = new Employee("Employee", "Alfred");
// ... populate entity properties

// Make a sync call via the async interface
Key key = datastore.put(key).get();

非同期トランザクションの操作

同期呼び出しと同様、Async Datastore API 呼び出しをトランザクションに含めることができます。次の関数では、Employee の給与を調整し、追加する SalaryAdjustment エンティティを Employee と同じエンティティ グループに書き込みます。これらをすべて、1 つのトランザクションで行います。

void giveRaise(AsyncDatastoreService datastore, Key employeeKey, long raiseAmount)
        throws Exception {
    Future<Transaction> txn = datastore.beginTransaction();

    // Async call to lookup the Employee entity
    Future<Entity> employeeEntityFuture = datastore.get(employeeKey);

    // Create and put a SalaryAdjustment entity in parallel with the lookup
    Entity adjustmentEntity = new Entity("SalaryAdjustment", employeeKey);
    adjustmentEntity.setProperty("adjustment", raiseAmount);
    adjustmentEntity.setProperty("adjustmentDate", new Date());
    datastore.put(adjustmentEntity);

    // Fetch the result of our lookup to make the salary adjustment
    Entity employeeEntity = employeeEntityFuture.get();
    long salary = (Long) employeeEntity.getProperty("salary");
    employeeEntity.setProperty("salary", salary + raiseAmount);

    // Re-put the Employee entity with the adjusted salary.
    datastore.put(employeeEntity);
    txn.get().commit(); // could also call txn.get().commitAsync() here
}

このサンプルは、トランザクションを使用しない非同期呼び出しと、トランザクションを使用した非同期呼び出しとの重要な違いを示しています。トランザクションを使用しない場合、個々の非同期呼び出しが完了したことを確認するには、呼び出しが行われたときに返された Future の戻り値をフェッチするしかありません。トランザクションを使用する場合、Transaction.commit() を呼び出して、トランザクションの開始から commit までに行われたすべての非同期呼び出しの結果をブロックします。

したがって前述の例では、SalaryAdjustment エンティティを挿入する非同期呼び出しの処理が、commit() の呼び出し時点で未完了である場合、commit は挿入が完了するまで行われません。同様に、commit() ではなく commitAsync() の呼び出しを選択した場合、commitAsync() によって返される Future に対する get() の呼び出しは、未処理の非同期呼び出しがすべて完了するまでブロックされます。

注: トランザクションは、DatastoreService または AsyncDatastoreService の特定のインスタンスではなく、特定のスレッドに関連付けられます。つまり、DatastoreService でトランザクションを開始し、AsyncDatastoreService で非同期呼び出しを実行すると、非同期呼び出しはそのトランザクションに含まれることになります。より簡単に言うと、DatastoreService.getCurrentTransaction()AsyncDatastoreService.getCurrentTransaction() は常に同じ Transaction を返します。

Future の操作

Async Datastore API で返される Future を適切に処理するために必要な情報の多くは Future に関する Java ドキュメントに記載されていますが、他にも知っておくべき App Engine 固有の情報がいくつかあります。

非同期クエリ

現在のところ、Google ではクエリの非同期 API を明示的には公開していません。ただし、PreparedQuery.asIterable()PreparedQuery.asIterator()PreparedQuery.asList(FetchOptions fetchOptions) のいずれかを呼び出すと、DatastoreService と AsyncDatastoreService の両方がすぐに結果を返して、非同期で結果をプリフェッチします。このため、アプリケーションは、クエリ結果を取得しながら、並行して処理を行うことができます。

// ...

Query q1 = new Query("Salesperson");
q1.setFilter(new FilterPredicate("dateOfHire", FilterOperator.LESS_THAN, oneMonthAgo));

// Returns instantly, query is executing in the background.
Iterable<Entity> recentHires = datastore.prepare(q1).asIterable();

Query q2 = new Query("Customer");
q2.setFilter(new FilterPredicate("lastContact", FilterOperator.GREATER_THAN, oneYearAgo));

// Also returns instantly, query is executing in the background.
Iterable<Entity> needsFollowup = datastore.prepare(q2).asIterable();

schedulePhoneCall(recentHires, needsFollowUp);

非同期データストア呼び出しを使用するタイミング

DatastoreService インターフェースで公開される処理は同期的です。たとえば、DatastoreService.get() を呼び出すと、データストアの呼び出しが完了するまでコードはブロックされます。アプリケーションで、get() の結果を HTML で表示するだけでよい場合、呼び出しが完了するまでブロックするという処理は完全に合理的です。しかし、レスポンスの表示に get() の結果といずれかの Query の結果が必要であり、get() とその Query の間にデータの依存関係がない場合は、get() が完了するまで Query を開始しないのは時間の無駄になります。非同期 API を使用すると改善できるコードの例を次に示します。

DatastoreService datastore = DatastoreServiceFactory.getDatastoreService();
Key empKey = KeyFactory.createKey("Employee", "Max");

// Read employee data from the Datastore
Entity employee = datastore.get(empKey); // Blocking for no good reason!

// Fetch payment history
Query query = new Query("PaymentHistory");
PreparedQuery pq = datastore.prepare(query);
List<Entity> result = pq.asList(FetchOptions.Builder.withLimit(10));
renderHtml(employee, result);

get() が完了するのを待つのではなく、AsyncDatastoreService のインスタンスを使用して、呼び出しを非同期で実行します。

AsyncDatastoreService datastore = DatastoreServiceFactory.getAsyncDatastoreService();
Key empKey = KeyFactory.createKey("Employee", "Max");

// Read employee data from the Datastore
Future<Entity> employeeFuture = datastore.get(empKey); // Returns immediately!

// Fetch payment history for the employee
Query query = new Query("PaymentHistory", empKey);
PreparedQuery pq = datastore.prepare(query);

// Run the query while the employee is being fetched
List<Entity> result = pq.asList(FetchOptions.Builder.withLimit(10));
// Implicitly performs query asynchronously
Entity employee = employeeFuture.get(); // Blocking!
renderHtml(employee, result); 

このコードの同期バージョンと非同期バージョンの CPU の使用量はそれほど変わりませんが(結局、作業量は両方とも同じです)、非同期バージョンでは 2 つのデータストア オペレーションを並列実行できるので、レイテンシが短くなります。一般に、複数のデータストア オペレーションを実行する必要があり、データに依存関係がない場合は、AsyncDatastoreService を使用すると、レイテンシを大幅に短縮できます。