NDB 非同期オペレーション

アプリケーションのパフォーマンスを最適化するときは、NDB の使用を検討してください。たとえば、アプリケーションがキャッシュにない値を読み込む場合、読み込みに時間がかかります。 他の処理と並行して Datastore アクションを実行したり、いくつかの Datastore アクションを並行して行うことでアプリケーションを高速化できる場合があります。

NDB クライアント ライブラリでは、多くの非同期関数を提供しています。 これらの関数を使用して、アプリケーションが Datastore にリクエストを送信します。 この関数からは即座に Future オブジェクトが返されます。Datastore がリクエストを処理している間、アプリケーションは別の処理を実行できます。Datastore がリクエストの処理を終えると、アプリケーションは Future オブジェクトから結果を取得できます。

はじめに

アプリケーションのリクエスト ハンドラの 1 つが NDB を使用して書き込み(リクエストの記録など)を行う必要があるとします。また、他の NDB 操作(データのフェッチなど)も実行するとします。

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

put() に対する呼び出しを非同期の同等の put_async() に対する呼び出しに置き換えると、put() でブロックせずに、アプリケーションでその他の処理をすぐに実行できます。

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

これにより、Datastore がデータの書き込みを行っている間に、他の NDB 関数とテンプレートを実行できます。Datastore からデータを取得するまで、アプリケーションは Datastore でブロックされません。

この例では NDB の結果をアプリケーションで使用しないため、future.get_result を呼び出してもあまり意味がありません。このコードは、NDB の put が完了するまでリクエスト ハンドラが終了しないようにするためにここに配置されています。リクエスト ハンドラの終了が早すぎると put が実行されない場合があります。便宜上、リクエスト ハンドラを @ndb.toplevel で修飾できます。これにより、非同期リクエストが完了するまでハンドラが終了しないようにできます。リクエストを送信した後、その結果を心配する必要がなくなります。

WSGIApplication 全体を ndb.toplevel に指定できます。こうすることで、WSGIApplication の各ハンドラは、すべての非同期リクエストが終了するまで待つことになります。WSGIApplication のハンドラすべてを toplevel にするわけではありません。


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

toplevel アプリケーションを使用するほうが、そのすべてのハンドラ関数を使用するよりも便利です。ただし、ハンドラのメソッドが yield を使用する場合は、そのメソッドを別のデコレータ @ndb.synctasklet でラップする必要があります。そうしないと、yield で実行が停止し、終了できなくなります。

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Async API と Future の使用

ほとんどの同期 NDB 関数には _async バージョンがあります。たとえば、put() には put_async() があります。非同期関数の引数は、その同期バージョンとまったく同じです。 非同期メソッドの戻り値は、常に Future または Future のリスト(複数関数の場合)になります。

Future は、開始されたものの、未完了である可能性のあるオペレーションの状態を維持するためのオブジェクトです。非同期 API はすべて 1 つ以上の Futures を返します。Futureget_result() 関数を呼び出すことで、そのオペレーションの結果を確認できます。Future は、必要に応じて結果を取得するまでブロックし、結果を渡します。get_result() は、API の同期バージョンが返すのと同じ値を返します。

注: 他のプログラミング言語では、Future を結果として直接使用できるかもしれませんが、この言語では違います。他の言語で使用するのは暗黙的 Future であり、NDB が使用するのは明示的な Future です。NDB で Future の結果を得るには、get_result() を呼び出します。

処理で例外が発生した場合はどうなるでしょうか。いつ例外が起きたかによって異なります。リクエストを行うときに NDB が問題を認識した場合(引数の型が間違っているなど)、_async() メソッドで例外が発生します。しかし、例外が Datastore サーバーによって検出された場合は、_async() メソッドが Future を返します。例外はアプリケーションが get_result() を呼び出したときに発生します。これについて、あまり心配する必要はありません。いずれにしても自然な動作です。おそらく一番大きな違いは、トレースバックを出力すると、低レベルの非同期機構が露出する点でしょう。

たとえば、ゲストブック アプリケーションを作成しているとします。 ユーザーがログインすると、ゲストブックの最新の書き込みページが表示されるようにしたいと思います。 このページにはユーザーのニックネームも表示する必要があります。アプリケーションはログインしたユーザーのアカウント情報と、ゲストブックへの投稿の内容という 2 種類の情報が必要です。アプリケーションの同期バージョンは次のようなものになります。

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

この場合、Account エンティティの取得と最新の Guestbook のフェッチという 2 つの独立した I/O アクションがあります。同期 API を使用して、これらのアクションを順次発生させます。つまり、アカウント情報を受け取ってから、ゲストブック エンティティをフェッチします。しかしアプリケーションは、すぐにアカウント情報が必要なわけではありません。このことを利用して、非同期 API を使用します。

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

このバージョンのコードでは、まず 2 つの Futuresacct_futurerecent_entries_future)を作成し、それらを待ちます。サーバーでは両方のリクエストが並行して処理されます。 各 _async() 関数呼び出しは Future オブジェクトを作成し、リクエストを Datastore サーバーに送信します。サーバーはすぐにリクエストの処理を開始できます。サーバーのレスポンスは任意の順番で戻ります。Future オブジェクトのリンクは、対応するリクエストに対するレスポンスです。

同期リクエストはオーバーラップしませんが、非同期リクエストはオーバーラップできます。
同期リクエストと非同期リクエスト

非同期バージョンで費やされる合計(実)時間は、その処理中の最大処理時間とほぼ同じです。同期バージョンで費やされる合計時間は処理時間の合計を上回ります。 並行して多くの処理を実行できる場合は、非同期処理が便利です。

アプリケーションのクエリにかかる時間や、リクエストごとの I/O 操作数を確認するには、Appstats を使用します。 このツールは、アプリケーションの実際の動作に基づいて上のようなグラフを表示します。

タスクレットの使用

NDB タスクレットは、他のコードと同時に実行できるコードの断片です。タスクレットを記述すると、アプリケーションは非同期 NDB 関数と同じように使用できます。つまり、アプリケーションはタスクレットを呼び出し、タスクレットは Future を返します。その後、Futureget_result() メソッドを呼び出して結果を取得します。

タスクレットは、スレッドを使わずに同時実行関数を記述する方法の 1 つです。タスクレットはイベントループによって実行されるため、yield ステートメントを使って、I/O や他のオペレーションのために自身を一時停止できます。ブロッキング オペレーションの概念は Future クラスに抽象化されますが、タスクレットは RPC の完了を待つために、RPC の yield を行う場合もあります。タスクレットが結果を取得すると、それによって ndb.Return 例外が raise されます。これを受けて、NDB はそのタスクレットが取得した結果を、前に yield された Future に関連付けます。

NDB タスクレットを記述する際は、yieldraise を通常とは異なる方法で使用します。したがって、これら使用例を探しても、NDB タスクレットのようなコードを見つけられないかもしれません。

関数を NDB タスクレット化するには:

  • 関数に @ndb.tasklet を付けます。
  • 同期のデータストア呼び出しをすべて非同期のデータストア呼び出しの yield に置き換えます。
  • 関数が raise ndb.Return(retval) という戻り値を返すようにします(関数が何も返さない場合は必要ありません)。

タスクレットを使うと、アプリケーションは非同期 API をより細かく制御できます。 例として、次のスキーマを検討してみます。

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

メッセージを表示するとき、作成者のニックネームを表示しています。メッセージのリストを表示するためにデータをフェッチする「同期的」方法は、次のようなものです。

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

残念ながらこの方法は効率的ではありません。Appstats で確認すると、Get リクエストが連続で表示されるはずです。次のような階段状のパターンが見られるかもしれません。

連続して行われる同期 Get
連続して行われる同期 Get

プログラムのこの部分は、Get をオーバーラップできればずっと速くなります。 get_async を使用するようにコードを書き直すことができますが、どの非同期のリクエストとメッセージが対応しているのかを追跡するのが困難です。

タスクレット化することで、アプリケーションは独自の非同期関数を定義できます。 これにより、コードをわかりやすく整理できます。

さらに、関数で acct = key.get() または acct = key.get_async().get_result() ではなく acct = yield key.get_async() を使用する必要があります。この yield は、このタスクレットを停止して他のタスクレットを実行するのに適した場面であることを NDB に伝えます。

ジェネレータ関数を @ndb.tasklet で修飾すると、関数はジェネレータ オブジェクトではなく Future を返します。タスクレット内では、FutureyieldFuture の結果を待って返します。

次に例を示します。

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

get_async()Future を返しますが、タスクレット フレームワークは yieldFuture の結果を変数 acct に返します。

map()callback() を数回呼び出します。 callback()yield ..._async() により、NDB のスケジューラは、それらの呼び出しが完了するのを待たずに多数の非同期リクエストを送信できます。

非同期 Get のオーバーラップ
非同期 Get のオーバーラップ

Appstats でこれを見ると、複数の Get が重複しているだけでなく、同じリクエスト内ですべてが処理されることに驚くかもしれません。NDB は「autobatcher」を実装します。autobatcher は複数のリクエストをサーバーへの単一のバッチ RPC にまとめます。この方法は、他に作業がある限り(別のコールバックが実行されることがあります)、キーを集めるというものです。結果の 1 つが必要になるとすぐに、自動ワーカーはバッチ RPC を送信します。ほとんどのリクエストとは異なり、クエリは「一括処理」されません。

タスクレットを実行すると、タスクレットはデフォルトの名前空間を取得します。これは、タスクレットが生成されたときのデフォルトの名前空間、あるいはタスクレットが実行中に変更した名前空間です。つまり、デフォルトの名前空間は Context に関連付けされたり保存されたりはせず、あるタスクレットでデフォルトの名前空間を変更しても、他のタスクレットのデフォルトの名前空間には影響しません。ただし、それによって生成されたタスクレットを除きます。

タスクレット、並行クエリ、並行 Yield

複数のクエリで同時にレコードをフェッチするために、タスクレットを使用できます。 たとえば、アプリケーションにショッピング カートの内容とクーポンのリストを表示するページがあるとします。 スキーマは次のようなものになります。

class Account(ndb.Model):
    pass


class InventoryItem(ndb.Model):
    name = ndb.StringProperty()


class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()


class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

ショッピング カートのアイテムとクーポンを取得する同期関数は次のようなものになります。

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

この例では、クエリを使用してカートのアイテムとクーポンのリストをフェッチし、次に get_multi() で在庫のアイテムに関する詳細をフェッチします(この関数は get_multi() の戻り値を直接使用しません。get_multi() を呼び出して在庫の詳細をすべてキャッシュ内にフェッチしておき、後から、すぐに読み込むことができるようにします)。get_multi は多数の Get を 1 つのリクエストに統合します。しかし、クエリのフェッチは順番に処理されます。これらのフェッチを同時に行うには、2 つのクエリをオーバーラップします。

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

get_multi() 呼び出しは分離されたままです。クエリ結果に依存するため、クエリと組み合わせることはできません。

このアプリケーションがあるときはカート、あるときはクーポンが必要で、またあるときは両方必要とするとします。コードを整理して、カートを取得する関数とクーポンを取得する関数を持つようにしましょう。これらの関数を同時に呼び出すと、2 つのクエリがオーバーラップするのが理想的です。 これを行うために、これらの関数をタスクレット化します。

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)


@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)


@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

この yield xy は重要ですが、見落とされがちです。これが 2 つの個別の yield ステートメントであったとしたら、そのステートメントが順に発生することになります。しかし、タスクレットのタプルの yield 化は並列 yield です。つまり、タスクレットは並列実行が可能で、yield はこのようなタスクレットがすべて完了するまで待機してから結果を返します。(プログラミング言語の中には、これをバリアと呼ぶものがあります)。

コードの一部をタスクレット化すると、もっとタスクレット化したくなるかもしれません。タスクレットを使用して並行して実行できる同期コードがあれば、それもタスクレットにした方がいいかもしれません。 そうすると、それを並行 yield で同時に読み込むことができます。

リクエスト関数(webapp2 リクエスト関数、Django 表示関数など)をタスクレットとして記述すると、予想とは違った違う動作をします。yield は機能しますが、その後、実行を停止します。この場合は、関数を @ndb.synctasklet で修飾します。@ndb.synctasklet@ndb.tasklet に類似していますが、タスクレット上で get_result() を呼び出す形に変更されています。これにより、タスクレットを、通常どおり結果を返す関数にすることができます。

タスクレットのクエリ イテレータ

タスクレットでクエリ結果を反復処理するには、次のパターンを使用します。

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

これは次のコードと同等の働きをします。

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

最初のバージョンで太字になっている 3 行は、2 番目のバージョンで太字になっている 1 行と同等の働きをするタスクレットです。タスクレットは yield キーワードによってしか停止できません。 したがって、yield なしのループでは、他のタスクレットを実行できません。

このコードがなぜ、qry.fetch_async() を使用してすべてのエンティティをフェッチせずに、クエリ イテレータを使用するのか不思議に思うかもしれません。アプリケーションには RAM に収まりきれないほど多くのエンティティが存在する場合があります。 エンティティを探して見つかれば反復を止めることができるかもしれません。しかし、クエリ言語だけでは検索条件を表現することはできません。イテレータを使用してエンティティを読み込み、目的のものを見つけたときにループを抜け出すことができます。

NDB との非同期 Urlfetch

NDB Context には、NDB タスクレットとうまく並列化できる非同期の urlfetch() 関数があります。次に例を示します。

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

URL 取得サービスには独自の非同期リクエストの API があります。ただし、NDB タスクレットとの使用はいつも簡単とは限りません。

非同期トランザクションの使用

トランザクションは非同期でも実行できます。既存の関数を ndb.transaction_async() に渡せるほか、@ndb.transactional_async デコレータを使用することもできます。他の非同期関数と同様に、NDB Future を返します。

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

タスクレットでもトランザクションを処理できます。たとえば、RPC のブロックを待機している間に、update_counter コードを yield に変更します。

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

Future.wait_any() の使用

複数の非同期リクエストを行って、1 つのリクエストが完了したら戻りたい場合があるでしょう。 そのような場合は、ndb.Future.wait_any() クラスメソッドを使用します。

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

残念ながら、これをタスクレットに簡単に変換する方法はありません。並列の yield は、待機する必要がないものも含め、すべての Future が完了するまで待機することになります。