Operazione asincrona NDB

Quando ottimizzi le prestazioni di un'applicazione, considera il suo utilizzo di NDB. Ad esempio, se un'applicazione legge un valore che non è in cache, la lettura richiede un po' di tempo. Potresti riuscire a velocizzare la tua applicazione eseguendo azioni di Datastore in parallelo ad altre attività oppure alcune azioni di Datastore in parallelo tra loro.

La libreria client NDB fornisce molte funzioni asincrone ("async"). Ognuna di queste funzioni consente a un'applicazione di inviare una richiesta a Datastore. La funzione restituisce immediatamente, restituendo un Future . L'applicazione può eseguire altre operazioni mentre Datastore gestisce la richiesta. Dopo che Datastore gestisce la richiesta, l'applicazione può recuperare i risultati dall'oggetto Future.

Introduzione

Supponiamo che uno degli elaboratori delle richieste della tua applicazione debba utilizzare NDB per scrivere qualcosa, ad esempio per registrare la richiesta. Inoltre, deve eseguire alcune altre operazioni NDB, ad esempio per recuperare alcuni dati.

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Sostituendo la chiamata a put() con una chiamata al suo corrispondente asincrono put_async(), l'applicazione può eseguire immediatamente altre operazioni anziché bloccarsi su put().

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

In questo modo, le altre funzioni NDB e il rendering del modello possono avvenire mentre Datastore scrive i dati. L'applicazione non in Cloud Datastore finché non riceve i dati da quest'ultimo.

In questo esempio, è un po' sciocco chiamare future.get_result: l'applicazione non utilizza mai il risultato di NDB. Il codice è per fare in modo che il gestore delle richieste non escano prima finisce l'NDB put; se il gestore di richieste esce troppo presto, il put potrebbe non accadere mai. Se preferisci, puoi decorare la richiesta gestore con @ndb.toplevel. Questo indica all'handler di non uscire finché le sue richieste asincrone non sono state completate. Questo ti consente di inviare la richiesta senza preoccuparsi del risultato.

Puoi specificare un valore intero WSGIApplication come ndb.toplevel. In questo modo, ogni gestore di WSGIApplication attende tutte le richieste asincrone prima di restituire. Non "imposta come primo livello" tutti gli handler di WSGIApplication.


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

L'utilizzo di un'applicazione toplevel è più pratico che di tutte le sue funzioni di gestore. Tuttavia, se un metodo di gestore utilizza yield, questo metodo deve essere aggregato da un altro decorator, @ndb.synctasklet; altrimenti l'esecuzione verrà interrotta yield e non terminare.

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Utilizzo di API asincrone e Futures

Quasi tutte le funzioni NDB sincrone hanno una controparte _async. Per ad esempio, put() ha put_async(). Gli argomenti della funzione asincrona sono sempre gli stessi della funzione sincrona. Il valore restituito di un metodo asincrono è sempre un Future o (per le funzioni "multi") un elenco di Future

Un Future è un oggetto che gestisce lo stato di un'operazione avviata, ma che potrebbe non essere ancora stata completata. Tutte le API asincrone restituiscono uno o più Futures. Puoi chiamare il numero get_result() di Future per richiedere il risultato della sua operazione; il futuro blocca, se necessario, finché il risultato non è disponibile, e te la fornisce. get_result() restituisce il valore che verrebbe restituito dalla versione sincrona dell'API.

Nota: Se hai usato Futures in altri linguaggi di programmazione, potresti pensare puoi usare direttamente un Futuro. Qui non funziona. Queste lingue utilizzano futuri impliciti; NDB utilizza futuri espliciti. Chiama get_result() per ricevere un NDB di Future o il risultato finale.

Cosa succede se l'operazione genera un'eccezione? Dipende da quando si verifica l'eccezione. Se NDB rileva un problema quando effettua una richiesta (forse un argomento del tipo sbagliato), _async() genera un'eccezione. Ma se l'eccezione viene rilevata, ad esempio, server Datastore, il metodo _async() restituisce un Future e l'eccezione verrà sollevata quando la tua domanda chiama get_result(). Non preoccuparti troppo, alla fine tutto si comporta in modo abbastanza naturale. Forse la differenza più grande è che, se viene stampato un traceback, vedrai alcuni componenti della macchina asincrona di basso livello esposti.

Ad esempio, supponiamo che tu stia scrivendo un'applicazione guestbook. Se l'utente ha eseguito l'accesso, vuoi presentare una pagina che mostri i post del guestbook più recenti. In questa pagina deve essere mostrato anche il nickname dell'utente. L'applicazione richiede due tipi di informazioni: i dati dell'account dell'utente che ha eseguito l'accesso e i contenuti dei post del guestbook. L'impostazione "sincrona" di questa applicazione potrebbe ha il seguente aspetto:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Esistono due azioni di I/O indipendenti: recupero dell'entità Account e recupero delle entità Guestbook recenti. Con l'API sincrona, questi eventi si verificano uno dopo l'altro; attenderemo di ricevere le informazioni dell'account prima di recuperare le entità guestbook. Tuttavia, l'applicazione non ha bisogno subito delle informazioni sull'account. Possiamo sfruttare questa opportunità e utilizzare le API asincrone:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Questa versione del codice crea innanzitutto due Futures (acct_future e recent_entries_future), e poi li attende. Il server lavora su entrambe le richieste in parallelo. Ogni chiamata alla funzione _async() crea un oggetto Future e invia una richiesta al server Datastore. Il server può iniziare subito a elaborare la richiesta. Le risposte del server possono essere inviate in un ordine arbitrario; l'oggetto Future collega le risposte alle richieste corrispondente.

Le richieste sincrone non si sovrappongono, ma quelle asincrone possono.
Richieste sincrone e asincrone

Il tempo totale (in tempo reale) trascorso nella versione asincrona è all'incirca uguale a il tempo massimo tra le operazioni. Il tempo totale trascorso nel la versione sincrona supera la somma dei tempi dell'operazione. Se puoi eseguire più operazioni in parallelo, le operazioni asincrone sono più utili.

Per vedere quanto tempo richiedono le query della tua applicazione o quante operazioni di I/O per ogni richiesta, valuta la possibilità Statistiche app. Questo strumento può mostrare grafici simili al disegno sopra riportato in base alla misurazione di un'app in tempo reale.

Utilizzo dei tasklet

Una tasklet NDB è una porzione di codice che può essere eseguita in contemporanea con l'altro codice. Se scrivi una tasklet, l'applicazione può utilizzarla in modo molto simile utilizza una funzione NDB asincrona: chiama la tasklet, che restituisce Future; in seguito, chiamando il comando Future get_result() ottiene il risultato.

I tasklet sono un modo per scrivere funzioni concorrenti senza thread. I tasklet vengono eseguiti da un loop di eventi e possono sospendersi bloccandosi per l'I/O o per qualche altra operazione utilizzando un'istruzione yield. La nozione di operazione di blocco è astratta Future ma un tasklet può anche yield RPC per attendere il completamento dell'RPC. Quando il tasklet ha un risultato, si tratta di un'raiseeccezione ndb.Return; NDB associa quindi il risultato al Futureyield precedente.

Quando scrivi un tasklet NDB, utilizzi yield e raise in un modo insolito. Pertanto, se cerchi esempi di come utilizzarli, probabilmente non troverai codice come un tasklet NDB.

Per trasformare una funzione in un tasklet NDB:

  • decora la funzione con @ndb.tasklet,
  • sostituisci tutte le chiamate al datastore sincrono con yield chiamate al datastore asincrone,
  • Fai in modo che la funzione "retituri" il suo valore restituito con raise ndb.Return(retval) (non necessario se la funzione non restituisce nulla).

Un'applicazione può utilizzare i tasklet per un controllo più fine delle API asincrone. Ad esempio, considera lo schema seguente:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

Quando si visualizza un messaggio, ha senso mostrare il nickname dell'autore. Il modo "sincronico" per recuperare i dati per mostrare un elenco di messaggi potrebbe essere simile al seguente:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

Purtroppo, questo approccio non è efficiente. Se l'avevi visto Statistiche app, vedresti che il pulsante "Get" sono in serie. Potresti vedere la seguente "scala" pattern.

I &quot;Get&quot; sincroni si verificano in serie
"Ottieni" sincrone avvengono in serie.

Questa parte del programma sarebbe più veloce se questi "Get" potessero sovrapporsi. Potresti riscrivere il codice per utilizzare get_async, ma è difficile tenere traccia di quali richieste e messaggi asincroni appartengono insieme.

L'applicazione può definire la propria funzione "async" trasformandola in un tasklet. In questo modo puoi organizzare il codice in modo meno confuso.

Inoltre, anziché utilizzare acct = key.get() o acct = key.get_async().get_result(), la funzione deve utilizzare acct = yield key.get_async(). Questo yield indica a NDB che è un buon punto per sospendere questo tasklet e consentire l'esecuzione di altri tasklet.

Decorazione di una funzione del generatore con @ndb.tasklet rende la funzione restituita un Future invece di una generatore. All'interno della tasklet, tutti i yield di Future attende e restituisce il risultato di Future.

Ad esempio:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

Tieni presente che, anche se get_async() restituisce un Future, il framework del tasklet determina la yield per restituire il risultato di Future alla variabile acct.

map() chiama callback() più volte. Tuttavia, yield ..._async() in callback() consente allo scheduler di NDB di inviare molte richieste asincrone prima di attendere il completamento di una di esse.

&quot;Get&quot; asincroni sovrapposti
"Ottieni" asincrone sovrapposte

Se esamini questo aspetto in Appstats, potresti sorprenderti di scoprire che questi più get non si sovrappongono, ma vengono tutti eseguiti nella stessa richiesta. NDB implementa un "autobatcher". Autobatcher raggruppa più richieste in una singola RPC batch al server; lo fa in modo che, purché ci sia altro lavoro da fare (potrebbe essere eseguito un altro callback) raccoglie le chiavi. Non appena è necessario uno dei risultati, l'autobatcher invia l'RPC batch. A differenza della maggior parte richieste, le query non sono "raggruppate".

Quando viene eseguito, un tasklet ottiene il proprio spazio dei nomi predefinito da qualunque valore predefinito fosse presente al momento della sua creazione o da quello impostato dal tasklet durante l'esecuzione. In altre parole, lo spazio dei nomi predefinito non viene associato o archiviato in Contesto e la modifica del valore predefinito lo spazio dei nomi predefinito di un'altra attività non influisce sullo spazio dei nomi predefinito i tasklet, ad eccezione di quelli generati da quest'ultimo.

Tasklet, query parallele, rendimento parallelo

Puoi utilizzare i tasklet in modo che più query recuperino i record contemporaneamente. Ad esempio, supponiamo che la tua applicazione abbia una pagina che mostri i contenuti di un carrello degli acquisti e un elenco di offerte speciali. Lo schema potrebbe avere il seguente aspetto:

class Account(ndb.Model):
    pass


class InventoryItem(ndb.Model):
    name = ndb.StringProperty()


class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()


class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

Uno "sincrona" funzione che ottiene articoli del carrello e offerte speciali potrebbe avere il seguente aspetto:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

In questo esempio vengono utilizzate query per recuperare elenchi di articoli del carrello e offerte. poi recupera i dettagli degli elementi dell'inventario con get_multi(). (Questa funzione non utilizza direttamente il valore restituito di get_multi(). Chiama get_multi() per recuperare tutti i dettagli dell'inventario nel Cache, in modo da poterli leggere rapidamente in un secondo momento.) get_multi combina più oggetti Get in un'unica richiesta. Tuttavia, i recuperi della query avvengono uno dopo l'altro. Per fare in modo che i recuperi vengano eseguiti contemporaneamente, sovrapponi le due query:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

get_multi() è ancora separata: dipende dai risultati della query, quindi e non possono combinarla con le query.

Supponiamo che questa applicazione a volte abbia bisogno del carrello, a volte delle offerte, e talvolta entrambe le cose. Vuoi organizzare il codice in modo che ci sia per ottenere il carrello e una funzione per ottenere le offerte. Se le tue l'applicazione chiama insieme queste funzioni, idealmente le loro query che si possono "sovrapporre". Per farlo, crea queste funzioni come tasklet:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)


@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)


@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

Questo yield xy è importante, ma facile da trascurare. Se si trattasse di due yield le dichiarazioni, che accadono in serie. Ma yielduna tupla delle attività è un rendimento parallelo: le attività possono essere eseguite in parallelo e yield attende che vengano completati e ritorni i risultati. (In alcuni linguaggi di programmazione, questo è chiamato ostacolo.)

Se trasformi una porzione di codice in un tasklet, probabilmente vorrai farne di più al più presto. Se noti l'indicazione "sincrona" che può essere eseguito in parallelo con un tasklet, probabilmente è una buona idea trasformarlo anche in un tasklet. Poi puoi eseguirla in parallelo con un yield parallelo.

Se scrivi una funzione di richiesta (una funzione di richiesta webapp2, una funzione di visualizzazione Django e così via) da utilizzare come tasklet, non farà ciò che vuoi: restituisce un valore, ma poi smette di funzionare. In questa situazione, devi decorare la funzione con @ndb.synctasklet. @ndb.synctasklet è simile a @ndb.tasklet ma modificato per chiamare get_result() sulla tasklet. In questo modo il tasklet diventa una funzione che restituisce il risultato nel solito modo.

Iteratori di query nei tasklet

Per eseguire l'iterazione sui risultati della query in un tasklet, utilizza il seguente pattern:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Questo è l'equivalente compatibile con i tasklet di quanto segue:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Le tre righe marcate della prima versione sono il testo ideale per le attività equivalente alla singola riga in grassetto nella seconda versione. I tasklet possono essere sospesi solo per una parola chiave yield. Il ciclo for senza yield non consente l'esecuzione di altri tasklet.

Potresti chiederti perché questo codice utilizza un iteratore di query anziché recuperare tutte le entità utilizzando qry.fetch_async(). L'applicazione potrebbe avere così tante entità da non rientrare nella RAM. Ad esempio, potresti cercare un'entità e interrompere l'iterazione quando la trovi, ma non puoi esprimere i tuoi criteri di ricerca solo con il linguaggio di query. Potresti usare un iteratore per caricare entità da verificare, quindi uscire dal ciclo quando trovi ciò che cerchi.

Recupero URL asincrono con NDB

Un NDB Context ha un obiettivo Funzione urlfetch() che si parallelizza perfettamente con le attività NDB, ad esempio:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

Il servizio di recupero URL ha il suo dell'API di richiesta asincrona. Va bene, ma non è sempre facile da usare con i tasklet NDB.

Utilizzo delle transazioni asincrone

Le transazioni possono essere eseguite anche in modo asincrono. Puoi passare una funzione esistente a ndb.transaction_async() o utilizzare il decoratore @ndb.transactional_async. Come le altre funzioni asincrone, restituisce un NDB Future:

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

Le transazioni funzionano anche con i tasklet. Ad esempio, potremmo modificare update_counter codice a yield in attesa del blocco RPC:

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

Utilizzo di Future.wait_any()

A volte vuoi inviare più richieste asincrone e tornare ogni volta che la prima viene completata. Puoi farlo utilizzando il metodo della classe ndb.Future.wait_any():

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

Purtroppo, non c'è un modo conveniente per trasformarlo in un tasklet: un yield parallelo attende che tutti i Future completato, incluse quelle che non vuoi aspettare.