Operazione asincrona NDB

Quando ottimizzi le prestazioni di un'applicazione, considera il suo utilizzo di NDB. Ad esempio, se un'applicazione legge un valore che non si trova nella cache, la lettura richiede un po' di tempo. Potresti riuscire ad accelerare la tua applicazione eseguendo azioni Datastore in parallelo con altre operazioni o eseguendo alcune azioni Datastore in parallelo tra loro.

La libreria client NDB fornisce molte funzioni asincrone ("asincrone"). Ognuna di queste funzioni consente a un'applicazione di inviare una richiesta a Datastore. La funzione restituisce immediatamente, restituendo un oggetto Future. L'applicazione può fare altre cose mentre Datastore gestisce la richiesta. Dopo che Datastore gestisce la richiesta, l'applicazione può ottenere i risultati dall'oggetto Future.

Introduzione

Supponiamo che uno dei gestori di richieste della tua applicazione debba utilizzare l'NDB per scrivere qualcosa, magari per registrare la richiesta. Deve anche eseguire altre operazioni NDB, magari per recuperare alcuni dati.

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Se sostituisci la chiamata a put() con una chiamata all'equivalente asincrono put_async(), l'applicazione può svolgere subito altre attività anziché bloccare l'attività su put().

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

Ciò consente di eseguire le altre funzioni NDB e il rendering del modello mentre Datastore scrive i dati. L'applicazione non blocca il datastore finché non recupera i dati da quest'ultimo.

In questo esempio, è un po' sciocco chiamare future.get_result: l'applicazione non utilizza mai il risultato di NDB. Quel codice è proprio lì per assicurarsi che il gestore delle richieste non esca prima del termine dell'put NDB. Se il gestore delle richieste esce troppo presto, il put potrebbe non avvenire mai. Per comodità, puoi decorare il gestore delle richieste con @ndb.toplevel. Questo indica al gestore di non uscire fino al termine delle sue richieste asincrone. In questo modo puoi inviare la richiesta senza preoccuparti del risultato.

Puoi specificare un intero WSGIApplication come ndb.toplevel. Ciò garantisce che ciascuno dei gestori di WSGIApplication attende tutte le richieste asincrone prima di tornare. (non esegue il "primo livello" di tutti i gestori di WSGIApplication).


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

L'utilizzo di un'applicazione toplevel è più pratico di tutte le sue funzioni di gestore. Tuttavia, se un metodo di gestore utilizza yield, tale metodo deve comunque essere aggregato in un altro decorator, @ndb.synctasklet, altrimenti l'esecuzione verrà interrotta al yield e non verrà completata.

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Utilizzo di API asincrone e future

Quasi tutte le funzioni NDB sincrona hanno una controparte _async. Ad esempio, put() ha put_async(). Gli argomenti della funzione asincrona sono sempre gli stessi di quelli della versione sincrona. Il valore restituito di un metodo asincrono è sempre un Future o (per le funzioni "multi") un elenco di Future.

Un futuro è un oggetto che mantiene lo stato di un'operazione avviata ma che potrebbe non essere ancora completata; tutte le API asincrone restituiscono uno o più Futures. Puoi chiamare la funzione get_result() di Future per richiedere il risultato del suo funzionamento; il futuro quindi blocca, se necessario, finché il risultato non è disponibile, e poi te lo fornisce. get_result() restituisce il valore restituito dalla versione sincrona dell'API.

Nota: se hai utilizzato i futures in alcuni altri linguaggi di programmazione, potresti pensare di poter usare direttamente il modello Future. Non funziona qui. Queste lingue usano futuri impliciti, mentre NDB usa dei futuri espliciti. Chiama get_result() per ottenere un risultato di Future NDB.

Cosa succede se l'operazione solleva un'eccezione? Dipende da quando si verifica l'eccezione. Se NDB rileva un problema durante l'invio di una richiesta (forse un argomento di tipo sbagliato), il metodo _async() genera un'eccezione. Tuttavia, se l'eccezione viene rilevata, ad esempio, dal server Datastore, il metodo _async() restituisce un Future e l'eccezione verrà generata quando l'applicazione chiama get_result(). Non preoccuparti troppo di questo, tutto finisce per avere un comportamento abbastanza naturale; forse la più grande differenza è che se viene stampato un trackback, alcune parti del macchinario asincrono di basso livello verranno esposte.

Ad esempio, supponi di scrivere un'applicazione guestbook. Se l'utente ha eseguito l'accesso, vuoi presentare una pagina che mostra i post del libro degli ospiti più recenti. In questa pagina deve essere visualizzato anche il nickname dell'utente. L'applicazione richiede due tipi di informazioni: i dati dell'account dell'utente che ha eseguito l'accesso e i contenuti dei post del guestbook. La versione "sincrona" di questa applicazione potrebbe avere il seguente aspetto:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Qui sono presenti due azioni di I/O indipendenti: recupero dell'entità Account e recupero delle entità Guestbook recenti. Utilizzando l'API sincrona, queste operazioni si verificano uno dopo l'altro. Attendiamo di ricevere i dati dell'account prima di recuperare le entità del guestbook. Ma l'applicazione non ha bisogno subito dei dati dell'account. Possiamo trarre vantaggio da questa situazione e utilizzare le API asincrone:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Questa versione del codice crea prima due Futures (acct_future e recent_entries_future) e poi le attende. Il server lavora su entrambe le richieste in parallelo. Ogni chiamata di funzione _async() crea un oggetto Future e invia una richiesta al server Datastore. Il server può iniziare subito a lavorare alla richiesta. Le risposte del server possono essere restituite in qualsiasi ordine arbitrario; l'oggetto futuro collega le risposte alle richieste corrispondenti.

Le richieste sincrone non si sovrappongono, a differenza di quelle asincrone.
Richieste sincrone e asincrone

Il tempo totale (reale) trascorso nella versione asincrona è all'incirca uguale al tempo massimo per tutte le operazioni. Il tempo totale trascorso nella versione sincrona supera la somma dei tempi di operazione. Se puoi eseguire più operazioni in parallelo, le operazioni asincrone sono più utili.

Per conoscere il tempo necessario per le query della tua applicazione o il numero di operazioni di I/O eseguite per richiesta, valuta la possibilità di utilizzare Appstats. Questo strumento può mostrare grafici simili al disegno sopra basati sulla strumentazione di un'app live.

Utilizzo delle Tasklet

Un'attività NDB è una porzione di codice che può essere eseguita contemporaneamente ad altro codice. Se scrivi una tasklet, l'applicazione può utilizzarla in modo molto simile a una funzione NDB asincrona: chiama la tasklet, che restituisce un valore Future. In seguito, la chiamata al metodo get_result() di Future ottiene il risultato.

Le tasklet consentono di scrivere funzioni simultanee senza thread; le tasklet vengono eseguite da un loop di eventi e possono sospendersi per il blocco di I/O o per altre operazioni utilizzando un'istruzione di rendimento. La nozione di un'operazione di blocco è astratta nella classe Future, ma un tasklet può anche yield una RPC per attendere il completamento di questa RPC. Quando la tasklet ha un risultato, raisecostituisce un'eccezione ndb.Return; NDB associa quindi il risultato all'elemento Future precedentemente yield.

Quando scrivi una tasklet NDB, usi yield e raise in modo insolito. Pertanto, se cerchi esempi di come utilizzarli, probabilmente non troverai codice simile a un tasklet NDB.

Per trasformare una funzione in un tasklet NDB:

  • decora la funzione con @ndb.tasklet,
  • sostituire tutte le chiamate sincrone al datastore con yield di chiamate asincrone al datastore,
  • imposta alla funzione "return" il suo valore restituito con raise ndb.Return(retval) (non è necessario se la funzione non restituisce nulla).

Un'applicazione può utilizzare le tasklet per un controllo più preciso sulle API asincrone. Ad esempio, considera lo schema riportato di seguito:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

Quando si visualizza un messaggio, ha senso mostrare il nickname dell'autore. Il modo "sincrono" per recuperare i dati e mostrare un elenco di messaggi potrebbe avere il seguente aspetto:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

Sfortunatamente, questo approccio non è efficace. Se lo esamini in Appstats, vedresti che le richieste "Get" sono in serie. Potresti vedere il seguente pattern a "scala".

I "Get" sincroni si verificano in serie
I "Get" sincroni si verificano in serie.

Questa parte del programma sarebbe più rapida se i "recuperi" si sovrapponessero. Potresti riscrivere il codice per utilizzare get_async, ma è difficile tenere traccia di quali richieste asincrone e messaggi appartengono insieme.

L'applicazione può definire la propria funzione "asincrona" rendendola un tasklet. Questo ti consente di organizzare il codice in modo meno ambiguo.

Inoltre, anziché utilizzare acct = key.get() o acct = key.get_async().get_result(), la funzione deve usare acct = yield key.get_async(). Questo yield indica a NDB che è un buon posto per sospendere questa attività e consentire l'esecuzione di altri tasklet.

Se decori una funzione generatore con @ndb.tasklet, la funzione restituisce Future anziché un oggetto generatore. All'interno della tasklet, qualsiasi elemento yield di Future attende e restituisce il risultato di Future.

Ad esempio:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

Tieni presente che, anche se get_async() restituisce un valore Future, il framework delle attività fa sì che l'espressione yield restituisca il risultato di Future alla variabile acct.

map() chiama callback() diverse volte. Tuttavia, yield ..._async() in callback() consente allo scheduler di NDB di inviare molte richieste asincrone prima di attendere il completamento di ognuna.

"Get" asincroni sovrapposti
"Get" asincroni sovrapposti

Se esamini questo aspetto in Appstats, potresti sorprenderti nel vedere che questi vari Get non si sovrappongono, ma passano tutti nella stessa richiesta. NDB implementa un "autobatcher". L'autobatcher raggruppa più richieste in un singolo RPC al server in un unico batch; lo fa in modo che, fintanto che ci sia altro lavoro da fare (è possibile eseguire un altro callback), raccolga le chiavi. Non appena è necessario uno dei risultati, l'autobatcher invia l'RPC batch. A differenza della maggior parte delle richieste, le query non sono "in batch".

Quando viene eseguita, una tasklet ne riceve lo spazio dei nomi predefinito da quello predefinito (quando è stato generato o da qualsiasi altro elemento modificato dal tasklet durante l'esecuzione). In altre parole, lo spazio dei nomi predefinito non viene associato né archiviato nel Contesto e la modifica dello spazio dei nomi predefinito in una tasklet non influisce sullo spazio dei nomi predefinito negli altri tasklet, ad eccezione di quelli che ha generato.

Tasklet, query parallele, rendimento parallelo

Puoi utilizzare le attività in modo che più query recuperino i record contemporaneamente. Ad esempio, supponiamo che la tua applicazione abbia una pagina che mostra i contenuti di un carrello degli acquisti e un elenco di offerte speciali. Lo schema potrebbe avere il seguente aspetto:

class Account(ndb.Model):
    pass

class InventoryItem(ndb.Model):
    name = ndb.StringProperty()

class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()

class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

Una funzione "sincrona" che recupera gli articoli del carrello e le offerte speciali potrebbe avere il seguente aspetto:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

Questo esempio utilizza le query per recuperare elenchi di articoli del carrello e offerte; poi recupera i dettagli sugli articoli dell'inventario con get_multi(). Questa funzione non utilizza direttamente il valore restituito di get_multi(). Chiama get_multi() per recuperare tutti i dettagli dell'inventario nella cache, in modo che possano essere letti rapidamente in un secondo momento. get_multi combina molti Get in un'unica richiesta. ma i recuperi delle query avvengono uno dopo l'altro. Per fare in modo che i recuperi vengano eseguiti contemporaneamente, sovrapponi le due query:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

La chiamata get_multi() è ancora separata: dipende dai risultati della query, quindi non puoi combinarla con le query.

Supponiamo che a volte questa applicazione richieda il carrello, a volte le offerte e a volte entrambi. Vuoi organizzare il tuo codice in modo che ci sia una funzione per recuperare il carrello e una funzione per ricevere le offerte. Se l'applicazione chiama queste funzioni insieme, idealmente le loro query potrebbero "sovrapporsi". A questo scopo, rendi queste tasklet di funzioni:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)

@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)

@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

Questo valore yield xy è importante ma facile da trascurare. Se si trattasse di due dichiarazioni yield separate, l'operazione verrebbe eseguita in serie. Tuttavia, yieldl'azione di una tupla di tasklet è una rendimento parallelo: le attività possono essere eseguite in parallelo e l'elemento yield attende che tutte le attività vengano completate e restituisce i risultati. (In alcuni linguaggi di programmazione, questa operazione è nota come barriera).

Se trasformi una porzione di codice in un tasklet, probabilmente vorrai fare di più presto. Se noti codice "sincrono" che può essere eseguito in parallelo a un tasklet, probabilmente è una buona idea renderlo anche un tasklet. Quindi puoi metterlo in contemporanea con un yield parallelo.

Se scrivi una funzione di richiesta (una funzione di richiesta webapp2, una funzione di visualizzazione di Django e così via) come tasklet, non farà ciò che vuoi: produce ma poi si interrompe. In questo caso, vuoi decorare la funzione con @ndb.synctasklet. @ndb.synctasklet è come @ndb.tasklet, ma è stato modificato per chiamare get_result() nella tasklet. In questo modo la tasklet viene trasformata in una funzione che restituisce il risultato come di consueto.

Esegui query sugli iteratori nelle Tasklet

Per eseguire l'iterazione dei risultati di una query in un tasklet, utilizza il seguente pattern:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Si tratta dell'equivalente compatibile con le attività per le seguenti attività:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Le tre righe in grassetto nella prima versione sono l'equivalente per le attività di programmazione della singola riga in grassetto nella seconda versione. Le tasklet possono essere sospese solo in base a una parola chiave yield. Il ciclo for senza yield non consente l'esecuzione di altri tasklet.

Potresti chiederti perché questo codice utilizza un iteratore delle query invece di recuperare tutte le entità utilizzando qry.fetch_async(). L'applicazione potrebbe avere così tante entità che non possono essere inserite nella RAM. Magari stai cercando un'entità e puoi interrompere l'iterazione una volta trovata, ma non puoi esprimere i criteri di ricerca solo con il linguaggio della query. Potresti utilizzare un iteratore per caricare le entità da controllare, quindi uscire dal loop quando trovi ciò che vuoi.

Recupero URL asincrono con NDB

Un elemento Context NDB ha una funzione urlfetch() asincrona che si integra perfettamente con le tasklet NDB, ad esempio:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

Il servizio di recupero URL dispone della propria API di richiesta asincrona. va bene, ma non sempre è facile da usare con le tasklet NDB.

Utilizzo delle transazioni asincrone

Le transazioni possono anche essere eseguite in modo asincrono. Puoi passare una funzione esistente a ndb.transaction_async() o utilizzare il decorator @ndb.transactional_async. Come le altre funzioni asincrone, viene restituito un Future NDB:

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

Le transazioni funzionano anche con le tasklet. Ad esempio, potremmo cambiare il nostro codice update_counter in yield nell'attesa del blocco degli RPC:

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

Utilizzo di Future.wait_any()

A volte potresti voler effettuare più richieste asincrone e tornare ogni volta che viene completata la prima. Puoi farlo utilizzando il metodo della classe ndb.Future.wait_any():

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

Sfortunatamente, non esiste un modo conveniente per trasformare questa operazione in una tasklet; un yield parallelo attende il completamento di tutti i Future, compresi quelli che non vuoi attendere.