Asynchroner NDB-Vorgang

Berücksichtigen Sie bei der Optimierung der Leistung einer Anwendung die Verwendung von NDB. Wenn eine Anwendung beispielsweise einen Wert liest, der sich nicht im Cache befindet, dauert der Lesevorgang länger. Sie können Ihre Anwendung möglicherweise beschleunigen, indem Sie Datenspeicheraktionen parallel zu anderen Aufgaben oder anderen Datenspeicheraktionen ausführen.

Die NDB-Clientbibliothek bietet viele asynchrone ("async") Funktionen. Mit jeder dieser Funktionen kann eine Anwendung eine Anfrage an den Datenspeicher senden. Die Funktion gibt sofort ein Future-Objekt zurück. Während der Datenspeicher die Anfrage verarbeitet, kann die Anwendung andere Aufgaben ausführen. Im Anschluss an die Verarbeitung der Anfrage durch den Datenspeicher können die Ergebnisse durch die Anwendung vom Future-Objekt abgerufen werden.

Einführung

Angenommen, einer der Anfrage-Handler Ihrer Anwendung benötigt NDB für einen Schreibvorgang, beispielsweise zum Aufzeichnen der Anfrage. Darüber hinaus hat er weitere NDB-Vorgänge auszuführen, wie etwa das Abrufen von Daten.

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Wenn der Aufruf von put() durch einen Aufruf des asynchronen Äquivalents put_async() ersetzt wird, kann die Anwendung andere Aufgaben sofort ausführen und wird nicht durch put() blockiert.

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

Auf diese Weise lassen sich die anderen NDB-Funktionen und das Vorlagen-Rendering ausführen, während der Datenspeicher die Daten schreibt. Die Anwendung blockiert den Datenspeicher erst, wenn sie Daten daraus abruft.

In diesem Beispiel ist es nicht sinnvoll, future.get_result aufzurufen, da die Anwendung nie das NDB-Ergebnis verwendet. Dieser Code sorgt lediglich dafür, dass der Anfrage-Handler erst beendet wird, nachdem der NDB-Vorgang put beendet wurde. Andernfalls kann "put" unter Umständen nicht ausgeführt werden. Sie können den Anfrage-Handler der Einfachheit halber @ndb.toplevel hinzufügen. Dadurch wird der Handler angewiesen, die Aktion erst nach Abschluss der asynchronen Anfragen zu beenden. Sie können somit die Anfrage senden, ohne sich Gedanken um das Ergebnis machen zu müssen.

Sie können eine vollständige WSGIApplication als ndb.toplevel angeben. Dadurch können alle Handler der WSGIApplication vor der Rückgabe auf den Abschluss aller asynchronen Anfragen warten. (Die Handler der WSGIApplication werden dadurch nicht als "toplevel" behandelt.)


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

Die Verwendung einer toplevel-Anwendung ist unkomplizierter als all ihre Handler-Funktionen. Wenn eine Handler-Methode jedoch yield verwendet, muss diese Methode trotzdem in einen anderen Decorator @ndb.synctasklet eingebunden werden. Andernfalls wird die Ausführung bei yield gestoppt und nicht beendet.

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Asynchrone APIs und Future-Objekte verwenden

Fast jede synchrone NDB-Funktion hat ein _async-Gegenstück. Bei put() ist dies beispielsweise put_async(). Die Argumente der asynchronen Funktion sind immer mit denen der synchronen Version identisch. Der Rückgabewert einer asynchronen Methode ist immer entweder ein Future-Objekt oder (für "Multi"-Funktionen) eine Liste von Future-Objekten.

Ein Future ist ein Objekt, das den Status während eines gestarteten, jedoch noch nicht abgeschlossenen Vorgangs beibehält. Alle asynchronen APIs geben mindestens ein Objekt vom Typ Futures zurück. Sie können die Funktion get_result() des Future-Objekts aufrufen, um das Ergebnis des Vorgangs abzufragen. Das Future-Objekt blockiert daraufhin bei Bedarf den Vorgang, bis das Ergebnis verfügbar ist, und gibt dieses anschließend an Sie zurück. get_result() gibt den Wert zurück, der von der synchronen Version der API zurückgegeben würde.

Hinweis: Wenn Sie Future-Objekte bereits aus bestimmten anderen Programmiersprachen kennen, sind Sie es möglicherweise gewohnt, ein Future-Objekt direkt als Ergebnis verwenden zu können. Dies funktioniert hier nicht. Diese Sprachen verwenden implizite Future-Objekte. NDB nutzt hingegen explizite Future-Objekte. Rufen Sie get_result() auf, um das Ergebnis eines NDB-Future-Objekts abzurufen.

Was passiert, wenn der Vorgang eine Ausnahme auslöst? Das hängt davon ab, wann die Ausnahme auftritt. Wenn NDB während der Erstellung einer Anfrage ein Problem feststellt (vielleicht ein Argument vom falschen Typ), löst die Methode _async() eine Ausnahme aus. Wenn die Ausnahme jedoch vom Datenspeicherserver erkannt wird, gibt die Methode _async() ein Future-Objekt zurück und die Ausnahme wird ausgelöst, wenn Ihre Anwendung get_result() aufruft. Sie müssen sich damit nicht näher befassen, da letztendlich alles recht selbsterklärend ist. Der vielleicht größte Unterschied besteht darin, dass beim Drucken von Rückverfolgungsinformationen Teile der asynchronen Low-Level-Maschinen angezeigt werden.

Angenommen, Sie schreiben eine Gästebuchanwendung. Wenn Nutzer angemeldet sind, soll eine Seite mit den neuesten Gästebuchbeiträgen angezeigt werden. Auch der Alias des Nutzers soll auf der Seite angegeben sein. Die Anwendung benötigt zwei Arten von Informationen: die Kontoinformationen des angemeldeten Nutzers sowie die Inhalte der Gästebuchbeiträge. Die "synchrone" Version dieser Anwendung könnte folgendermaßen aussehen:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Hier gibt es zwei unabhängige E/A-Aktionen: das Abrufen der Account-Entität und das Abrufen der letzten Guestbook-Entitäten. Mit der synchronen API werden diese nacheinander ausgeführt. Wir warten zuerst auf den Empfang der Kontoinformationen und rufen anschließend die Gästebuchentitäten ab. Die Anwendung benötigt die Kontoinformationen jedoch nicht sofort. Daher können wir asynchrone APIs nutzen:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Diese Version des Codes erstellt zuerst zwei Futures (acct_future und recent_entries_future) und wartet dann auf diese. Der Server verarbeitet beide Anfragen parallel. Jeder Aufruf einer _async()-Funktion erstellt ein Future-Objekt und sendet eine Anfrage an den Datenspeicherserver. Der Server kann sofort mit der Verarbeitung der Anfrage beginnen. Die Serverantworten können in beliebiger Reihenfolge zurückgegeben werden. Die Future-Objektverknüpfung antwortet auf die entsprechenden Anfragen.

Synchrone Anfragen überlappen sich nicht, asynchrone Anfragen können dies hingegen schon.
Synchrone und asynchrone Anfragen

Die gesamte (reale) Zeit, die in der asynchronen Version aufgewendet wird, entspricht in etwa der maximalen Zeit für die Vorgänge. Die Gesamtzeit in der synchronen Version übersteigt die Summe der Betriebszeiten. Wenn Sie mehrere Vorgänge parallel ausführen können, sind asynchrone Vorgänge hilfreicher.

Durch die Verwendung von Appstats können Sie ermitteln, wie lange die Abfragen Ihrer Anwendung dauern oder wie viele E/A-Vorgänge pro Anfrage ausgeführt werden. Dieses Tool kann Diagramme ähnlich der obigen Darstellung basierend auf der Instrumentierung einer Live-App anzeigen.

Tasklets verwenden

Ein NDB-tasklet ist ein Code, der gleichzeitig mit anderem Code ausgeführt werden kann. Wenn Sie ein Tasklet schreiben, kann Ihre Anwendung es ähnlich wie eine asynchrone NDB-Funktion verwenden: Sie ruft das Tasklet auf, das ein Future-Objekt zurückgibt. Später wird durch Aufrufen der Methode get_result() des Future-Objekts das Ergebnis ausgegeben.

Tasklets ermöglichen die gleichzeitige Ausführung von Funktionen, ohne Threads schreiben zu müssen. Tasklets werden von einer Ereignisschleife ausgeführt und können sich selbst mithilfe einer Yield-Anweisung sperren und damit E/A- oder verschiedene andere Vorgänge blockieren. Das Konzept eines blockierenden Vorgangs wird mit der Klasse Future realisiert. Allerdings kann ein Tasklet auch den yield eines RPC erzeugen und dann auf dessen Abschluss warten. Wenn das Tasklet ein Ergebnis aufweist, wird raise für eine Ausnahme vom Typ ndb.Return ausgeführt. NDB verknüpft dann das Ergebnis mit dem zuvor für das Future-Objekt ausgeführten yield.

Wenn Sie ein NDB-Tasklet schreiben, verwenden Sie yield und raise auf ungewöhnliche Weise. Wenn Sie nach Beispielen suchen, wie diese verwendet werden, finden Sie somit möglicherweise keinen Code wie ein NDB-Tasklet.

So wandeln Sie eine Funktion in ein NDB-Tasklet um:

  • Dekorieren Sie die Funktion mit @ndb.tasklet.
  • Ersetzen Sie alle synchronen Datenspeicheraufrufe durch yield-Objekte asynchroner Datenspeicheranrufe.
  • Führen Sie raise ndb.Return(retval) aus, damit die Funktion ihren Rückgabewert zurückgibt. Dies ist nicht notwendig, wenn die Funktion nichts zurückgibt.

Eine Anwendung kann Tasklets für eine genauere Kontrolle über asynchrone APIs verwenden. Ein Beispiel ist folgendes Schema:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

Beim Anzeigen einer Nachricht ist es sinnvoll, den Alias des Autors aufzurufen. Die "synchrone" Abrufmethode für die Anzeige einer Liste von Nachrichten könnte folgendermaßen aussehen:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

Diese Herangehensweise ist leider ineffizient. Bei der Betrachtung in Appstats sehen Sie, dass die "Get"-Anfragen als Serie vorliegen. Möglicherweise wird das folgende "Treppen"-Muster angezeigt.

Synchrone &quot;Gets&quot; treten in Serie auf
Synchrone "Gets" treten in Serie auf.

Dieser Teil des Programms wäre schneller, wenn diese "Gets" sich überlappen würden. Sie können den Code neu schreiben, um get_async zu verwenden. Es ist jedoch schwierig zu verfolgen, welche asynchronen Anfragen und Nachrichten zusammengehören.

Die Anwendung kann ihre eigene "async"-Funktion definieren, indem sie sie in ein Tasklet umwandelt. Dadurch können Sie den Code auf eine weniger verwirrende Weise organisieren.

Außerdem sollte die Funktion acct = yield key.get_async() anstelle von acct = key.get() oder acct = key.get_async().get_result() verwenden. yield teilt NDB mit, dass an dieser Stelle dieses Tasklet am besten gesperrt werden soll und dass andere Tasklets ausgeführt werden sollen.

Durch das Dekorieren einer Generatorfunktion mit @ndb.tasklet wird anstelle eines Generatorobjekts ein Future-Objekt zurückgegeben. Innerhalb des Tasklets wird bei jedem yield eines Future-Objekts auf das Ergebnis des Future-Objekts gewartet und dieses dann zurückgegeben.

Beispiel:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

Beachten Sie, dass das Tasklet-Framework bewirkt, dass der yield-Ausdruck das Future-Ergebnis in der Variablen acct zurückgibt, obwohl get_async() ein Future-Objekt zurückgibt.

Die Funktion map() ruft callback() mehrmals auf. Aber yield ..._async() in callback() veranlasst den NDB-Planer, viele asynchrone Anfragen zu senden, ohne darauf zu warten, dass eine von ihnen beendet wird.

Überlappende asynchrone &quot;Gets&quot;
Grafik: Überlappende asynchrone "Gets"

Bei der Betrachtung in Appstats werden Sie vielleicht überrascht sein, dass sich diese verschiedenen "Gets" nicht nur überlappen, sondern auch alle in derselben Anfrage verarbeitet werden. NDB implementiert einen Autobatcher. Der Autobatcher bündelt mehrere Requests in einem einzelnen Batch-RPC an den Server. Dabei wird so vorgegangen, dass Schlüssel gesammelt werden, solange weitere Aufgaben (möglicherweise ein anderer Callback) ausgeführt werden müssen. Sobald eines der Ergebnisse benötigt wird, sendet der Autobatcher den Batch-RPC. Im Gegensatz zu den meisten Anfragen werden Abfragen nicht "zu Batches zusammengefasst".

Wenn ein Tasklet ausgeführt wird, wird ihm der bei der Erstellung des Tasklets verwendete Standard-Namespace oder der bei der Ausführung des Tasklets geänderte Namespace zugewiesen. Der Standard-Namespace ist somit weder mit dem Kontext verknüpft noch wird er darin gespeichert. Außerdem wirkt sich eine Änderung des Standard-Namespace in einem Tasklet nicht auf den Standard-Namespace in anderen Tasklets aus, ausgenommen diejenigen, die von ihm erzeugt wurden.

Tasklets, parallele Abfragen, paralleles Ergebnis

Sie können Tasklets verwenden, sodass mehrere Abfragen Datensätze gleichzeitig abrufen. Angenommen, Ihre Anwendung verfügt über eine Seite, die den Inhalt eines Einkaufswagens und eine Liste mit Sonderangeboten anzeigt. Das Schema könnte folgendermaßen aussehen:

class Account(ndb.Model):
    pass


class InventoryItem(ndb.Model):
    name = ndb.StringProperty()


class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()


class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

Eine "synchrone" Funktion, die Einkaufswagenartikel und Sonderangebote abruft, könnte in etwa folgendermaßen aussehen:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

In diesem Beispiel werden Abfragen zum Abrufen von Listen mit Einkaufswagenartikeln und Angeboten verwendet. Anschließend werden Details zu Inventarelementen mit get_multi() abgerufen. (Diese Funktion verwendet den Rückgabewert von get_multi() nicht direkt. Sie ruft get_multi() auf, um alle Inventardetails in den Cache abzurufen, damit sie später schnell gelesen werden können.) get_multi kombiniert viele "Gets" zu einer Anfrage. Die Abrufvorgänge für Abfragen werden jedoch nacheinander ausgeführt. Überlappen Sie die beiden Abfragen, damit diese Abrufvorgänge gleichzeitig ausgeführt werden:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

Der get_multi()-Aufruf erfolgt weiterhin getrennt: Er hängt von den Abfrageergebnissen ab, sodass Sie ihn nicht mit den Abfragen kombinieren können.

Angenommen, diese Anwendung benötigt manchmal den Einkaufswagen, manchmal die Angebote und manchmal beides. Sie möchten Ihren Code so organisieren, dass eine Funktion zum Abrufen des Einkaufswagens und eine Funktion zum Abrufen der Angebote verfügbar ist. Wenn Ihre Anwendung diese Funktionen zusammen aufruft, könnten sich ihre Abfragen idealerweise "überlappen". Wandeln Sie dazu Ihre Funktionen in Tasklets um:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)


@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)


@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

yield xy ist wichtig, aber leicht zu übersehen. Wenn es sich um zwei getrennte yield-Anweisungen gehandelt hätte, würden sie in Serie ausgeführt. Aber die yield-Anweisung eines Tupels von Tasklets ist eine parallele Ausgabe: Die Tasklets können parallel ausgeführt werden. yield wartet, bis alle von ihnen beendet wurden und gibt die Ergebnisse zurück. (In einigen Programmiersprachen wird dies als Barriere bezeichnet.)

Wenn Sie einen Code in ein Tasklet umwandeln, ist es wahrscheinlich, dass Sie diesen Vorgang bald häufiger ausführen möchten. Wenn Sie "synchronen" Code bemerken, der parallel zu einem Tasklet ausgeführt werden könnte, ist es wahrscheinlich eine gute Idee, diesen ebenfalls in ein Tasklet umzuwandeln. Dann können Sie es mit einem parallelen yield parallelisieren.

Wenn Sie eine Anfragefunktion (eine Webapp2-Anfragefunktion, eine Django-Ansichtsfunktion usw.) als Tasklet schreiben, verhält es sich nicht entsprechend Ihren Erwartungen: Es liefert Ergebnisse, beendet dann jedoch die Ausführung. In diesem Fall dekorieren Sie die Funktion mit @ndb.synctasklet. @ndb.synctasklet ähnelt @ndb.tasklet, ruft aber get_result() für das Tasklet auf. Dadurch wird Ihr Tasklet zu einer Funktion, die das Ergebnis wie gewohnt zurückgibt.

Abfrage-Iteratoren in Tasklets

Verwenden Sie das folgende Muster, um Abfrageergebnisse in einem Tasklet zu durchlaufen:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Dies ist das Tasklet-freundliche Äquivalent zu Folgendem:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Die drei fett gedruckten Zeilen in der ersten Version sind das Tasklet-freundliche Äquivalent der einzelnen fett gedruckten Zeile in der zweiten Version. Tasklets können nur an einem yield-Schlüsselwort gesperrt werden. Die for-Schleife ohne yield verhindert die Ausführung anderer Tasklets.

Sie werden sich vielleicht fragen, warum dieser Code überhaupt einen Abfrage-Iterator verwendet, anstatt alle Entitäten mit qry.fetch_async() abzurufen. Die Anwendung enthält möglicherweise so viele Entitäten, dass sie nicht in den Arbeitsspeicher passen. Vielleicht suchen Sie nach einer Entität und können die Iteration beenden, sobald Sie sie gefunden haben. Sie können Ihre Suchkriterien aber nicht nur mit der Abfragesprache ausdrücken. Sie könnten einen Iterator verwenden, um zu überprüfende Entitäten zu laden, und dann aus der Schleife ausbrechen, wenn Sie gefunden haben, wonach Sie suchen.

Asynchrones Urlfetch mit NDB

Ein NDB-Context hat eine asynchrone urlfetch()-Funktion, die gut mit NDB-Tasklets parallelisiert werden kann, z. B.:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

Der URL-Abrufdienst verfügt über eine eigene asynchrone Anfrage-API. Er ist hilfreich, kann jedoch nicht immer einfach mit NDB-Tasklets verwendet werden.

Asynchrone Transaktionen verwenden

Transaktionen können auch asynchron durchgeführt werden. Sie können eine vorhandene Funktion an ndb.transaction_async() übergeben oder den Dekorator @ndb.transactional_async verwenden. Wie bei den anderen asynchronen Funktionen wird hierbei ein NDB-Future zurückgegeben:

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

Transaktionen funktionieren auch mit Tasklets. Beispielsweise könnten Sie den update_counter-Code in yield ändern, während Sie auf das Sperren von RPCs warten:

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

Future.wait_any() verwenden

Manchmal möchten Sie vielleicht mehrere asynchrone Anfragen erstellen und immer dann zurückgeben, wenn die erste abgeschlossen ist. Dazu können Sie die Klassenmethode ndb.Future.wait_any() verwenden:

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

Leider gibt es keine einfache Möglichkeit, dies in ein Tasklet umzuwandeln. Eine parallele yield-Anweisung wartet, bis alle Future-Objekte abgeschlossen sind – darunter auch jene, auf die Sie nicht warten möchten.