Operazione asincrona NDB

Quando ottimizzi il rendimento di un'applicazione, prendi in considerazione il suo utilizzo di NDB. Ad esempio, se un'applicazione legge un valore che non è in cache, la lettura richiede un po' di tempo. Potresti essere in grado di velocizzare l'applicazione eseguendo azioni di Datastore in parallelo con altre operazioni o alcune azioni di Datastore in parallelo tra loro.

La libreria client NDB fornisce molte funzioni asincrone ("async"). Ognuna di queste funzioni consente a un'applicazione di inviare una richiesta al Datastore. La funzione restituisce immediatamente un oggetto Future. L'applicazione può fare altre cose mentre Datastore gestisce la richiesta. Dopo che Datastore gestisce la richiesta, l'applicazione può recuperare i risultati dall'oggetto Future.

Introduzione

Supponiamo che uno degli elaboratori delle richieste della tua applicazione debba utilizzare NDB per scrivere qualcosa, ad esempio per registrare la richiesta. Inoltre, deve eseguire alcune altre operazioni NDB, ad esempio per recuperare alcuni dati.

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Sostituendo la chiamata a put() con una chiamata al suo corrispondente asincrono put_async(), l'applicazione può eseguire immediatamente altre operazioni anziché bloccarsi su put().

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

In questo modo, le altre funzioni NDB e il rendering del modello possono avvenire mentre Datastore scrive i dati. L'applicazione non si blocca su Datastore finché non riceve dati da Datastore.

In questo esempio, è un po' sciocco chiamare future.get_result: l'applicazione non utilizza mai il risultato di NDB. Questo codice è presente solo per assicurarsi che il gestore delle richieste non esce prima del termine dell'operazione NDB put. Se il gestore delle richieste esce troppo presto, l'operazione put potrebbe non essere mai eseguita. Per praticità, puoi decorare l'handler della richiesta con @ndb.toplevel. Questo indica all'handler di non uscire finché le sue richieste asincrone non sono state completate. In questo modo, puoi inviare la richiesta senza preoccuparti del risultato.

Puoi specificare un intero WSGIApplication come ndb.toplevel. In questo modo, ogni gestore di WSGIApplication attende tutte le richieste asincrone prima di restituire. Non "imposta come primo livello" tutti gli handler di WSGIApplication.


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

L'utilizzo di un'applicazione toplevel è più pratico rispetto a tutte le sue funzioni di gestore. Tuttavia, se un metodo di gestore utilizza yield, questo metodo deve comunque essere racchiuso in un altro decoratore, @ndb.synctasklet; in caso contrario, l'esecuzione verrà interrotta al yield e non verrà completata.

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Utilizzo di API asincrone e Futures

Quasi tutte le funzioni NDB sincrone hanno una controparte _async. Ad esempio, put() ha put_async(). Gli argomenti della funzione asincrona sono sempre gli stessi della versione sincrona. Il valore restituito di un metodo asincrono è sempre un Future o (per le funzioni "multi") un elenco di Future.

Un Future è un oggetto che gestisce lo stato di un'operazione avviata, ma che potrebbe non essere ancora stata completata. Tutte le API asincrone restituiscono uno o più Futures. Puoi chiamare la funzione get_result() di Future per chiedergli il risultato della sua operazione. Il Future si blocca, se necessario, finché il risultato non è disponibile, per poi fornirtelo. get_result() restituisce il valore che verrebbe restituito dalla versione sincrona dell'API.

Nota: se hai utilizzato i Future in altri linguaggi di programmazione, potresti pensare di poter utilizzare un Future direttamente come risultato. Qui non funziona. Queste lingue utilizzano futuri impliciti; NDB utilizza i futuri espliciti. Chiama get_result() per ricevere il risultato di un NDB Future.

Che cosa succede se l'operazione genera un'eccezione? Dipende da quando si verifica l'eccezione. Se NDB rileva un problema durante l'invio di una richiesta (ad esempio un argomento del tipo sbagliato), il metodo _async() solleva un'eccezione. Tuttavia, se l'eccezione viene rilevata, ad esempio, dal server Datastore, il metodo _async() restituisce un Future e l'eccezione verrà sollevata quando la tua applicazione chiamerà il suo get_result(). Non preoccuparti troppo, alla fine tutto si comporta in modo abbastanza naturale. Forse la differenza più grande è che, se viene stampato un traceback, vedrai alcuni componenti della macchina asincrona di basso livello esposti.

Ad esempio, supponiamo che tu stia scrivendo un'applicazione guestbook. Se l'utente ha eseguito l'accesso, vuoi mostrare una pagina con i post del guestbook più recenti. In questa pagina deve essere mostrato anche il nickname dell'utente. L'applicazione richiede due tipi di informazioni: i dati dell'account dell'utente che ha eseguito l'accesso e i contenuti dei post del guestbook. La versione "sincrona" di questa applicazione potrebbe avere il seguente aspetto:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Esistono due azioni I/O indipendenti: recupero dell'entità Account e recupero delle entità Guestbook recenti. Se utilizzi l'API sincrona, queste operazioni vengono eseguite una dopo l'altra. aspettiamo di ricevere i dati dell'account prima di recuperare le entità del guestbook. Tuttavia, l'applicazione non ha bisogno subito delle informazioni sull'account. Possiamo sfruttare questa opportunità e utilizzare le API asincrone:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Questa versione del codice crea prima due Futures (acct_future e recent_entries_future), quindi li attende. Il server lavora su entrambe le richieste in parallelo. Ogni chiamata alla funzione _async() crea un oggetto Future e invia una richiesta al server Datastore. Il server può iniziare subito a elaborare la richiesta. Le risposte del server possono essere inviate in un ordine arbitrario; l'oggetto Future collega le risposte alle richieste corrispondente.

Le richieste sincrone non si sovrappongono, ma quelle asincrone possono.
Richieste sincrone e asincrone

Il tempo totale (reale) trascorso nella versione asincrona è approssimativamente uguale al tempo massimo di tutte le operazioni. Il tempo totale trascorso nella versione sincrona supera la somma dei tempi di operazione. Se puoi eseguire più operazioni in parallelo, le operazioni asincrone sono più utili.

Per sapere quanto tempo richiedono le query della tua applicazione o quante operazioni di I/O vengono eseguite per richiesta, ti consigliamo di utilizzare Appstats. Questo strumento può mostrare grafici simili al disegno sopra riportato in base alla misurazione di un'app in tempo reale.

Utilizzo dei tasklet

Un tasklet NDB è un frammento di codice che potrebbe essere eseguito contemporaneamente ad altro codice. Se scrivi un tasklet, la tua applicazione può utilizzarlo in modo molto simile a come utilizza una funzione NDB asincrona: chiama il tasklet, che restituisce un Future; in seguito, chiamando il metodo get_result() di Future si ottiene il risultato.

I tasklet sono un modo per scrivere funzioni concorrenti senza thread. I tasklet vengono eseguiti da un loop di eventi e possono sospendersi bloccandosi per l'I/O o per qualche altra operazione utilizzando un'istruzione yield. Il concetto di operazione di blocco è astratto nella classe Future, ma un tasklet può anche yield un'RPC per attendere il completamento dell'RPC. Quando il tasklet ha un risultato, si tratta di un'raiseeccezione ndb.Return. NDB associa quindi il risultato al Futureyield precedente.

Quando scrivi un tasklet NDB, utilizzi yield e raise in un modo insolito. Pertanto, se cerchi esempi di come utilizzarli, probabilmente non troverai codice come un tasklet NDB.

Per trasformare una funzione in un tasklet NDB:

  • decora la funzione con @ndb.tasklet,
  • sostituisci tutte le chiamate al datastore sincrono con yield chiamate al datastore asincrone,
  • Fai in modo che la funzione "retituri" il suo valore restituito con raise ndb.Return(retval) (non necessario se la funzione non restituisce nulla).

Un'applicazione può utilizzare i tasklet per un controllo più fine delle API asincrone. Ad esempio, considera lo schema seguente:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

Quando viene visualizzato un messaggio, ha senso mostrare il nickname dell'autore. Il modo "sincronico" per recuperare i dati per mostrare un elenco di messaggi potrebbe essere simile al seguente:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

Purtroppo, questo approccio non è efficiente. Se la guardi in Appstats, vedrai che le richieste "Get" sono in serie. Potresti vedere il seguente pattern a "scala".

I &quot;Get&quot; sincroni si verificano in serie
I "get" sincroni si verificano in serie.

Questa parte del programma sarebbe più veloce se questi "Get" potessero sovrapporsi. Potresti riscrivere il codice per utilizzare get_async, ma è difficile tenere traccia di quali richieste e messaggi asincroni appartengono insieme.

L'applicazione può definire la propria funzione "async" trasformandola in un tasklet. In questo modo puoi organizzare il codice in modo meno confuso.

Inoltre, anziché utilizzare acct = key.get() o acct = key.get_async().get_result(), la funzione deve utilizzare acct = yield key.get_async(). Questo yield indica a NDB che è un buon punto per sospendere questo tasklet e consentire l'esecuzione di altri tasklet.

La decorazione di una funzione generatore con @ndb.tasklet fa sì che la funzione restituisca un Future anziché un oggetto generatore. All'interno del tasklet, qualsiasi yield di un Future attende e restituisce il risultato di Future.

Ad esempio:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

Tieni presente che, anche se get_async() restituisce un Future, il framework dei tasklet fa in modo che l'espressione yield restituisca il risultato di Future alla variabile acct.

map() chiama callback() più volte. Tuttavia, yield ..._async() in callback() consente allo scheduler di NDB di inviare molte richieste asincrone prima di attendere il completamento di una di esse.

&quot;Get&quot; asincroni sovrapposti
"Get" asincroni sovrapposti

Se esamini questo aspetto in Appstats, potresti sorprenderti di scoprire che questi più get non si sovrappongono, ma vengono tutti eseguiti nella stessa richiesta. NDB implementa un "autobatcher". L'autobatcher agrupa più richieste in un'unica richiesta RPC batch al server. Lo fa in modo che, finché c'è altro lavoro da fare (potrebbe essere eseguito un altro callback), raccolga le chiavi. Non appena è necessario uno dei risultati, l'autobatcher invia l'RPC batch. A differenza della maggior parte delle richieste, le query non vengono raggruppate.

Quando viene eseguito, un tasklet ottiene il proprio spazio dei nomi predefinito da qualunque valore predefinito fosse presente al momento della sua creazione o da quello impostato dal tasklet durante l'esecuzione. In altre parole, lo spazio dei nomi predefinito non è associato o archiviato in Context e la modifica dello spazio dei nomi predefinito in un tasklet non influisce sullo spazio dei nomi predefinito in altri tasklet, tranne quelli generati da esso.

Tasklet, query parallele, rendimento parallelo

Puoi utilizzare i tasklet in modo che più query estraggano i record contemporaneamente. Ad esempio, supponiamo che la tua applicazione abbia una pagina che mostri i contenuti di un carrello degli acquisti e un elenco di offerte speciali. Lo schema potrebbe avere il seguente aspetto:

class Account(ndb.Model):
    pass


class InventoryItem(ndb.Model):
    name = ndb.StringProperty()


class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()


class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

Una funzione "sincrona" che recupera gli articoli del carrello degli acquisti e le offerte speciali potrebbe avere il seguente aspetto:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

Questo esempio utilizza query per recuperare gli elenchi di articoli e offerte del carrello, quindi recupera i dettagli degli articoli dell'inventario con get_multi(). Questa funzione non utilizza direttamente il valore restituito di get_multi(). Chiama get_multi() per recuperare tutti i dettagli dell'inventario nella cache in modo che possano essere letti rapidamente in un secondo momento. get_multi combina molti Get in un'unica richiesta. Tuttavia, i recuperi della query avvengono uno dopo l'altro. Per eseguire questi recuperi contemporaneamente, sovrapponi le due query:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

La chiamata get_multi() è ancora separata: dipende dai risultati della query, quindi non puoi combinarla con le query.

Supponiamo che a volte questa applicazione abbia bisogno del carrello, a volte delle offerte e a volte di entrambe. Devi organizzare il codice in modo che esista una funzione per recuperare il carrello e una per recuperare le offerte. Se la tua applicazione chiama queste funzioni insieme, idealmente le relative query potrebbero "sovrapporsi". Per farlo, crea queste funzioni come tasklet:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)


@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)


@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

Questo yield xy è importante, ma facile da trascurare. Se si trattasse di due istruzioni yield separate, si verificherebbero in serie. Tuttavia, yield una tupla di tasklet è un yield parallelo: i tasklet possono essere eseguiti in parallelo e yield attende che tutti terminino e restituisce i risultati. In alcuni linguaggi di programmazione, questo è noto come barriera.

Se trasformi un frammento di codice in un tasklet, probabilmente vorrai crearne altri a breve. Se noti codice "sincronico" che potrebbe essere eseguito in parallelo con un tasklet, è probabilmente una buona idea trasformarlo anche in un tasklet. Poi puoi eseguirla in parallelo con un yield parallelo.

Se scrivi una funzione di richiesta (una funzione di richiesta webapp2, una funzione di visualizzazione Django e così via) da utilizzare come tasklet, non farà ciò che vuoi: restituisce un valore, ma poi smette di funzionare. In questa situazione, devi decorare la funzione con @ndb.synctasklet. @ndb.synctasklet è simile a @ndb.tasklet, ma è stato modificato per chiamare get_result() nel tasklet. In questo modo il tasklet diventa una funzione che restituisce il risultato nel solito modo.

Iteratori di query nei tasklet

Per eseguire l'iterazione sui risultati della query in un tasklet, utilizza il seguente pattern:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Questo è l'equivalente compatibile con i tasklet di quanto segue:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Le tre righe in grassetto nella prima versione sono l'equivalente adatto ai tasklet della singola riga in grassetto nella seconda versione. I tasklet possono essere sospesi solo in corrispondenza di una parola chiave yield. Il ciclo for senza yield non consente l'esecuzione di altri tasklet.

Potresti chiederti perché questo codice utilizza un iteratore di query anziché recuperare tutte le entità utilizzando qry.fetch_async(). L'applicazione potrebbe avere un numero di entità così elevato da non rientrare nella RAM. Ad esempio, potresti cercare un'entità e interrompere l'iterazione quando la trovi, ma non puoi esprimere i tuoi criteri di ricerca solo con il linguaggio di query. Puoi utilizzare un iteratore per caricare le entità da controllare, quindi uscire dal ciclo quando trovi ciò che ti serve.

Urlfetch asincrono con NDB

Un Context NDB ha una funzione urlfetch() asincrona che si esegue in parallelo con i tasklet NDB, ad esempio:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

Il servizio URL Fetch ha la propria API di richiesta asincrona. Va bene, ma non è sempre facile da usare con i tasklet NDB.

Utilizzo delle transazioni asincrone

Le transazioni possono essere eseguite anche in modo asincrono. Puoi passare una funzione esistente a ndb.transaction_async() o utilizzare il decoratore @ndb.transactional_async. Come le altre funzioni asincrone, restituisce un NDB Future:

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

Le transazioni funzionano anche con i tasklet. Ad esempio, potremmo modificare il codice update_counter in yield in attesa di RPC bloccanti:

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

Utilizzo di Future.wait_any()

A volte vuoi inviare più richieste asincrone e tornare ogni volta che la prima viene completata. Puoi farlo utilizzando il metodo della classe ndb.Future.wait_any():

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

Purtroppo, non esiste un modo pratico per trasformarlo in un tasklet. Un yield parallelo attende il completamento di tutti i Future, inclusi quelli per i quali non vuoi attendere.