NDB 非同期オペレーション

アプリケーションのパフォーマンスを最適化するときは、NDB の使用を検討してください。たとえば、アプリケーションがキャッシュにない値を読み込む場合、読み込みに時間がかかります。他の処理と並行して Datastore アクションを実行したり、いくつかの Datastore アクションを並行して行うことでアプリケーションを高速化できる場合があります。

NDB クライアント ライブラリでは、多くの非同期関数を提供しています。これらの関数を使用して、アプリケーションが Datastore にリクエストを送信します。この関数からは即座に Future オブジェクトが返されます。Datastore がリクエストを処理している間、アプリケーションは別の処理を実行できます。Datastore がリクエストの処理を終えると、アプリケーションは Future オブジェクトから結果を取得できます。

はじめに

アプリケーションのリクエスト ハンドラの 1 つが NDB を使用して書き込み(リクエストの記録など)を行う必要があるとします。また、他の NDB 操作(データのフェッチなど)も実行するとします。

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

put() の呼び出しを非同期の put_async() に置き換えると、put() でブロックせずに、他の処理をすぐに実行することができます。

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

これにより、Datastore がデータの書き込みを行っている間に、他の NDB 関数とテンプレートを実行できます。Datastore からデータを取得するまで、アプリケーションは Datastore でブロックされません。

この例では NDB の結果をアプリケーションで使用しないため、future.get_result を呼び出してもあまり意味がありません。このコードは、NDB の put が完了するまでリクエスト ハンドラが終了しないようにするためにここに配置されています。リクエスト ハンドラの終了が早すぎると put が実行されない場合があります。便宜上、リクエスト ハンドラを @ndb.toplevel で宣言できます。これにより、非同期リクエストが完了するまでハンドラが終了しないようにできます。リクエストを送信した後、その結果を心配する必要がなくなります。

WSGIApplication 全体を ndb.toplevel として指定できます。こうすることで、WSGIApplication の各ハンドラは、すべての非同期リクエストが終了するまで待つことになります。WSGIApplication のハンドラすべてを toplevel にするわけではありません。


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

toplevel アプリケーションを使用することは、そのすべてのハンドラ関数を使用するよりも便利です。ただし、ハンドラのメソッドが yield を使用する場合は、そのメソッドを別のデコレータ @ndb.synctasklet でラップする必要があります。そうしないと、yield で実行が停止し、終了できなくなります。

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Async API と Future の使用

ほとんどの同期 NDB 関数には _async バージョンがあります。たとえば、put() には put_async() があります。非同期関数の引数は、その同期バージョンとまったく同じです。非同期メソッドの戻り値は、Future または Future のリスト(複数関数の場合)になります。

Future は、開始されたものの、まだ完了していない可能性のあるオペレーションの状態を維持するためのオブジェクトです。すべての非同期 API は 1 つ以上の Futures を返します。Futureget_result() 関数を呼び出すことで、そのオペレーションの結果を確認できます。Future は、必要に応じて結果を取得するまでブロックし、結果を渡します。 get_result() は、API の同期バージョンが返すのと同じ値を返します。

注: 他のプログラミング言語では、Future を結果として直接使用できるかもしれませんが、この言語では違います。これらの言語は暗黙的 Future を使用しますが、NDB は明示的な Future を使用します。NDB Future の結果を得るには、get_result() を呼び出します。

処理で例外が発生した場合はどうなるでしょうか。いつ例外が起きたかによって異なります。リクエストを行うときに NDB が問題を認識した場合(引数の型が間違っているなど)、_async() メソッドで例外が発生します。しかし、例外が Datastore サーバーによって検出された場合は、_async() メソッドが Future を返します。例外はアプリケーションが get_result() を呼び出したときに発生します。これについて、あまり心配する必要はありません。いずれにしても自然な動作です。おそらく一番大きな違いは、トレースバックを出力すると、低レベルの非同期機構が露出する点でしょう。

たとえば、ゲストブック アプリケーションを作成しているとします。ユーザーがログインすると、ゲストブックの最新の書き込みページが表示されるようにしたいと思います。このページにはユーザーのニックネームも表示する必要があります。アプリケーションはログインしたユーザーのアカウント情報と、ゲストブックへの投稿の内容という 2 種類の情報が必要です。アプリケーションの同期バージョンは次のようなものになります。

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

ここには、Account エンティティの取得と最新の Guestbook のフェッチという 2 つの独立した I/O アクションがあります。同期 API を使用して、これらのアクションを順次発生させます。つまり、アカウント情報を受け取ってから、ゲストブック エンティティをフェッチします。しかしアプリケーションは、すぐにアカウント情報が必要なわけではありません。このことを利用して、非同期 API を使用します。

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

このバージョンでは、まず 2 つの Futuresacct_futurerecent_entries_future)を作成し、それらを待ちます。サーバーは両方のリクエストを並行して処理します。各 _async() 関数呼び出しは Future オブジェクトを作成し、リクエストを Datastore サーバーに送信します。サーバーはすぐにリクエストの処理を開始できます。サーバーのレスポンスは任意の順番で戻ります。Future オブジェクトのリンクは、対応するリクエストに対するレスポンスです。

同期リクエストはオーバーラップしませんが、非同期リクエストはオーバーラップできます。
同期リクエストと非同期リクエスト

非同期バージョンで費やされる合計(実)時間は、その処理中の最大処理時間とほぼ同じです。同期バージョンで費やされる合計時間は処理時間の合計を上回ります。並行して多くの処理を実行できる場合は、非同期処理が便利です。

アプリケーションのクエリにかかる時間や、リクエストごとの I/O 操作数を確認するには、Appstats を使用します。このツールは、アプリケーションの実際の動作に基づいて上のようなグラフを表示します。

タスクレットの使用

NDB タスクレットは、他のコードと同時に実行できるコードの断片です。タスクレットを記述すると、アプリケーションは非同期 NDB 関数と同じように使用できます。つまり、アプリケーションはタスクレットを呼び出し、タスクレットは Future を返します。その後、Futureget_result() メソッドを呼び出して結果を取得します。

タスクレットは、スレッドを使わずに同期関数を記述する方法の 1 つです。タスクレットはイベントループによって実行されるため、yield ステートメントを使って、I/O や他のオペレーションのために自身を一時停止できます。ブロッキング オペレーションの概念は Future クラスに抽象化されますが、タスクレットは RPC が完了するのを待つために、RPC を yield する場合もあります。タスクレットは結果を取得すると、raise によって ndb.Return 例外を発生させます。これを受けて、NDB はそのタスクレットが取得した結果を前に yield された Future に関連付けます。

NDB タスクレットを記述すると、yieldraise を普段とは違った方法で使います。したがって、これら使用例を探しても、NDB タスクレットのようなコードを見つけられないかもしれません。

関数を NDB タスクレット化するには:

  • @ndb.tasklet で関数を装飾します。
  • 同期のデータストア呼び出しをすべて非同期のデータストア呼び出しの yield に置き換えます。
  • 関数が raise ndb.Return(retval) という戻り値を返すようにします(関数が何も返さない場合は必要ありません)。

タスクレットを使うと、アプリケーションは非同期 API をより細かく制御できます。例として、次のスキーマを検討してみます。

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

メッセージを表示するとき、作成者のニックネームを表示しています。メッセージのリストを表示するためにデータをフェッチする「同期的」方法は、次のようなものです。

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

残念ながらこの方法は効率的ではありません。Appstats で確認すると、Get リクエストが連続で表示されるはずです。次のような階段状のパターンが見られるかもしれません。

連続して行われる同期 Get
同期の Get が連続して発生します。

プログラムのこの部分は、Get をオーバーラップできればずっと速くなります。この部分を get_async を使って書き直すことができますが、どの非同期リクエストにどのメッセージが対応しているのか追跡するのが困難です。

タスクレット化することで、アプリケーションは独自の非同期関数を定義できます。これにより、コードをわかりやすく整理できます。

さらに、関数で acct = key.get()acct = key.get_async().get_result() ではなく acct = yield key.get_async() を使用する必要があります。この yield は、このタスクレットを停止して他のタスクレットを実行するのに適した場面であることを NDB に伝えます。

ジェネレータ関数を @ndb.tasklet で装飾すると、関数はジェネレータ オブジェクトではなく Future を返します。タスクレット内では、FutureyieldFuture の結果を待って返します。

次に例を示します。

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

get_async()Future を返しますが、タスクレット フレームワークは yield 式で Future の結果を変数 acct に返します。

map()callback() を何度も呼び出します。callback()yield ..._async() により、NDB のスケジューラは、それらの呼び出しが完了するのを待たずに多数の非同期リクエストを送信できます。

非同期 Get のオーバーラップ
非同期 Get のオーバーラップ

これを Appstats で見ると、このような複数の Get が単にオーバーラップしているのではなく、すべて同じリクエストを通過していることに驚くかもしれません。NDB は「autobatcher」を実装します。autobatcher は複数のリクエストを、サーバーに対する 1 つのバッチ RPC にバンドルします。つまり、他の作業がある(別のコールバックが行われる可能性がある)限り、鍵を収集するという方法です。1 つでも結果が必要になると、autobatcher はバッチ RPC を送信します。他のほとんどのリクエストと異なり、クエリはバッチ処理されません。

タスクレットを実行すると、タスクレットはデフォルトの名前空間を取得します。これは、タスクレットが生成されたときのデフォルトの名前空間、あるいはタスクレットが実行中に変更した名前空間です。つまり、デフォルトの名前空間は Context に関連付けされたり保存されたりはせず、あるタスクレットでデフォルトの名前空間を変更しても、他のタスクレットのデフォルトの名前空間には影響しません。ただし、それによって生成されたタスクレットを除きます。

タスクレット、並行クエリ、並行 Yield

複数のクエリで同時にレコードをフェッチするために、タスクレットを使用できます。たとえば、アプリケーションにショッピング カートの内容とクーポンのリストを表示するページがあるとします。スキーマは次のようなものになります。

class Account(ndb.Model):
    pass

class InventoryItem(ndb.Model):
    name = ndb.StringProperty()

class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()

class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

ショッピング カートのアイテムとクーポンを取得する同期関数は次のようなものになります。

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

この例では、クエリを使用してカートのアイテムとクーポンのリストをフェッチし、次に get_multi() で在庫のアイテムに関する詳細をフェッチします。(この関数は get_multi() の戻り値を直接使用しません。get_multi() を呼び出して在庫の詳細をすべてキャッシュ内にフェッチしておき、後から、すぐに読み込むことができるようにします)。get_multi は複数の Get を 1 つのリクエストに統合します。しかし、クエリのフェッチは順番に処理されます。これらのフェッチを同時に行うには、2 つのクエリをオーバーラップします。

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

get_multi() 呼び出しはまだ分離されています。クエリの結果に依存するので、クエリに統合できないのです。

このアプリケーションがあるときはカート、あるときはクーポンが必要で、またあるときは両方必要とするとします。コードを整理して、カートを取得する関数とクーポンを取得する関数を持つようにしましょう。アプリケーションでこうした関数を同時に呼び出す場合、そのクエリが「オーバーラップ」するのが理想的です。そのためには、このような関数をタスクレット化します。

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)

@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)

@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

この yield xy は重要ですが、見落とされがちです。これが 2 つの個別の yield ステートメントであったとしたら、そのステートメントが順に発生することになります。しかし、タスクレットのタプルの yield 化は並列 yield です。つまり、タスクレットは並列実行が可能で、yield はこのようなタスクレットがすべて完了するまで待機してから結果を返します。(プログラミング言語の中には、これをバリアと呼ぶものがあります)。

コードの一部をタスクレット化すると、もっとタスクレット化したくなるかもしれません。タスクレットを使用して並行して実行できる同期コードがあれば、それもタスクレットにした方がいいかもしれません。そうすると、それを並行 yield で同時に読み込むことができます。

リクエスト関数(webapp2 リクエスト関数、Django 表示関数など)をタスクレットとして記述すると、予想とは違った違う動作をします。yield は機能しますが、その後、実行を停止します。この場合は、関数を @ndb.synctasklet で修飾します。 @ndb.synctasklet@ndb.tasklet に似ていますが、タスクレットでは get_result() を呼び出すように変更されています。これにより、タスクレットを、通常どおり結果を返す関数にすることができます。

タスクレットのクエリ イテレータ

タスクレットでクエリ結果を反復処理するには、次のパターンを使用します。

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

これは次のコードと同等の働きをします。

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

最初のバージョンで太字になっている 3 行は、2 番目のバージョンで太字になっている 1 行と同等の働きをするタスクレットです。タスクレットは yield キーワードでしか停止できません。したがって、yield のないループでは他のタスクレットを実行できません。

このコードが qry.fetch_async() を使ってすべてのエンティティをフェッチせずに、クエリ インテレータを使用するのか不思議に思うかもしれません。アプリケーションには RAM に収まりきれないほど多くのエンティティが存在する場合があります。エンティティを探して見つかったら反復を停止させるという方法もあるかもしれませんが、クエリ言語だけで検索条件を表現することはできません。イテレータを使用してエンティティを読み込み、目的のものを見つけたときにループを抜け出すことができます。

NDB との非同期 Urlfetch

NDB Context には、NDB タスクレットとうまく並列化できる、非同期の urlfetch() 関数があります。例:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

URL フェッチ サービスには独自の非同期リクエストの API があります。ただし、NDB タスクレットとの使用はいつも簡単とは限りません。

非同期トランザクションの使用

トランザクションは非同期でも実行できます。既存の関数を ndb.transaction_async() に渡すことも、@ndb.transactional_async デコレータを使用することもできます。他の非同期関数と同様に、NDB Future を返します。

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

タスクレットでもトランザクションを処理できます。たとえば、RPC のブロックを待機している間に、update_counter コードを yield に変更します。

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

Future.wait_any() の使用

複数の非同期リクエストを行って、1 つのリクエストが完了したら戻りたい場合があるでしょう。そのような操作を行うには、ndb.Future.wait_any() クラスメソッドを使用します。

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

これをタスクレットに簡単に変換する方法はありません。並列の yield はすべての Future が完了するまで待機します。残念ながら、待機の必要のないものも待つことになります。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Python 2 の App Engine スタンダード環境