Operazione asincrona NDB

Quando ottimizzi le prestazioni di un'applicazione, considera il suo utilizzo di NDB. Ad esempio, se un'applicazione legge un valore che non è nella cache, la lettura richiede del tempo. Potresti riuscire a velocizzare la tua applicazione eseguendo azioni di Datastore in parallelo ad altre attività oppure alcune azioni di Datastore in parallelo tra loro.

La libreria client NDB fornisce molte funzioni asincrone ("asinc"). Ognuna di queste funzioni consente a un'applicazione di inviare una richiesta a Datastore. La funzione restituisce immediatamente, restituendo un Future . L'applicazione può eseguire altre operazioni mentre Datastore gestisce la richiesta. Una volta che Datastore gestisce la richiesta, l'applicazione può ottenere i risultati dall'oggetto Future.

Introduzione

Supponiamo che uno dei gestori delle richieste della tua applicazione debba usare NDB per scrivere qualcosa, magari per registrare la richiesta. Inoltre, deve eseguire altre operazioni NDB, magari per recuperare alcuni dati.

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Sostituendo la chiamata a put() con una chiamata al suo l'equivalente asincrono put_async(), può fare subito altre cose anziché bloccare put().

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

Ciò consente alle altre funzioni NDB e al rendering del modello di avvengono mentre Datastore scrive i dati. L'applicazione non o il blocco su Datastore finché non riceve dati da quest'ultimo.

In questo esempio, è un po' sciocco chiamare future.get_result: l'applicazione non utilizza mai il risultato di NDB. Il codice è per fare in modo che il gestore delle richieste non escluda prima finisce l'NDB put; se il gestore di richieste esce troppo presto, il put potrebbe non accadere mai. Se preferisci, puoi decorare la richiesta gestore con @ndb.toplevel. Questo indica al gestore deve uscire finché le relative richieste asincrone non sono terminate. Questo ti consente di inviare la richiesta senza preoccuparsi del risultato.

Puoi specificare un valore intero WSGIApplication come ndb.toplevel. In questo modo ogni I gestori di WSGIApplication attendono per tutti i casi asincroni richieste prima di restituirle. (Non è di "primo livello" tutti i gestori di WSGIApplication).


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

L'utilizzo di un'applicazione toplevel è più pratico che di tutte le sue funzioni di gestore. Tuttavia, se un metodo di gestore utilizza yield, questo metodo deve essere aggregato da un altro decorator, @ndb.synctasklet; altrimenti l'esecuzione verrà interrotta yield e non terminare.

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Utilizzo di API asincrone e future

Quasi tutte le funzioni NDB sincrone hanno una controparte _async. Per ad esempio, put() ha put_async(). Gli argomenti della funzione asincrona sono sempre gli stessi della funzione sincrona. Il valore restituito di un metodo asincrono è sempre un Future o (per le funzioni "multi") un elenco di Future

Un futuro è un oggetto che mantiene lo stato di un'operazione che è stata avviata, ma potrebbe non essere ancora stata completata; tutti asincroni Le API restituiscono uno o più Futures. Puoi chiamare il numero get_result() di Future per richiedere il risultato della sua operazione; il futuro blocca, se necessario, finché il risultato non è disponibile, e te la fornisce. get_result() restituisce il valore che verrebbe restituito dalla versione sincrona dell'API.

Nota: Se hai usato Futures in altri linguaggi di programmazione, potresti pensare puoi usare direttamente un Futuro. Non funziona qui. Questi linguaggi utilizzano future impliciti; L'NDB usa future espliciti. Chiama get_result() per ricevere un NDB di Future o il risultato finale.

Cosa succede se l'operazione genera un'eccezione? Dipende da quando un'eccezione. Se NDB rileva un problema quando effettua una richiesta (forse un argomento del tipo sbagliato), _async() genera un'eccezione. Ma se l'eccezione viene rilevata, ad esempio, server Datastore, il metodo _async() restituisce un Future e l'eccezione verrà sollevata quando la tua domanda chiama get_result(). Non preoccuparti troppo, finisce per comportarsi in modo piuttosto naturale; forse la differenza principale è che, se viene stampata una traccia, vengono visualizzate alcune parti con macchinari asincroni.

Ad esempio, supponiamo che tu stia scrivendo un'applicazione guestbook. Se l'utente ha eseguito l'accesso, vuoi presentare una pagina che mostra i post più recenti del libro degli ospiti. Questa pagina dovrebbe mostrare anche il nickname dell'utente. L'applicazione richiede due tipi di informazioni: le informazioni sull'account dell'utente che ha effettuato l'accesso e i contenuti post del libro degli ospiti. L'impostazione "sincrona" di questa applicazione potrebbe ha il seguente aspetto:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Ci sono due azioni di I/O indipendenti: Entità Account e recupero Guestbook recente le entità. Con l'API sincrona, questi eventi si verificano uno dopo l'altro; attenderemo di ricevere le informazioni dell'account prima di recuperare le entità guestbook. Ma l'applicazione non ha bisogno dell'account le informazioni. Possiamo sfruttarlo e utilizzare le API asincrone:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Questa versione del codice crea innanzitutto due Futures (acct_future e recent_entries_future), e poi li attende. Il server lavora su entrambe le richieste in parallelo. Ogni chiamata di funzione _async() crea un oggetto Futuro e invia una richiesta al server Datastore. Il server può avviare lavorando immediatamente alla richiesta. Le risposte del server potrebbero essere restituite in qualsiasi ordine arbitrario; le risposte del link oggetto futuro alle loro richieste corrispondenti.

A differenza delle richieste asincrone, le richieste sincrone non si sovrappongono.
Richieste sincrone e asincrone

Il tempo totale (in tempo reale) trascorso nella versione asincrona è all'incirca uguale a il tempo massimo tra le operazioni. Il tempo totale trascorso nel la versione sincrona supera la somma dei tempi dell'operazione. Se puoi eseguire più operazioni in parallelo, le operazioni asincrone sono più utili.

Per vedere quanto tempo richiedono le query della tua applicazione o quante operazioni di I/O per ogni richiesta, considera l'utilizzo Statistiche app. Questo strumento può mostrare grafici simili al disegno sopra riportato sulla strumentazione di un'app live.

Utilizzo dei Tasklet

Una tasklet NDB è una porzione di codice che può essere eseguita in contemporanea con l'altro codice. Se scrivi una tasklet, l'applicazione può utilizzarla in modo molto simile utilizza una funzione NDB asincrona: chiama la tasklet, che restituisce Future; in seguito, chiamando il comando Future get_result() ottiene il risultato.

I tasklet sono un modo per scrivere funzioni simultanee senza thread; le tasklet vengono eseguite da un loop di eventi e possono essere sospese bloccare l'I/O o qualche altra operazione usando un rendimento l'Informativa. La nozione di operazione di blocco è astratta Future ma un tasklet può anche yield RPC per attendere il completamento della RPC. Se la tasklet ha un risultato, raise ndb.Return eccezione; NDB associa quindi il risultato con Future, yieldin precedenza.

Quando scrivi una tasklet NDB, utilizzi yield e raise in un modo insolito. Pertanto, se cerchi esempi come usarle, probabilmente non troverai un codice come un tasklet NDB.

Per trasformare una funzione in un tasklet NDB:

  • decora la funzione con @ndb.tasklet,
  • sostituisci tutte le chiamate sincrone al datastore con yield delle chiamate asincrone al datastore,
  • imposta la funzione "return" il suo valore restituito raise ndb.Return(retval) (non è necessario se la funzione non restituisce nulla).

Un'applicazione può utilizzare i tasklet per avere un controllo più preciso sulle API asincrone. Considera ad esempio il seguente schema:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

Quando si visualizza un messaggio, ha senso mostrare il nickname dell'autore. L'impostazione "sincrona" recuperare i dati per mostrare un elenco di messaggi ha questo aspetto:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

Purtroppo, questo approccio non è efficace. Se l'avevi visto Statistiche app, vedresti che il pulsante "Get" sono in serie. Potresti vedere la seguente "scala" pattern.

&quot;Get&quot; sincrono si verificano in serie
"Ottieni" sincrone avvengono in serie.

Questa parte del programma sarebbe più veloce se questi "ricevi" potrebbero sovrapporsi. Puoi riscrivere il codice per utilizzare get_async, ma difficile tenere traccia di quali messaggi e richieste asincrone appartengono.

L'applicazione può definire il proprio "asinc" creando un tasklet. Ciò consente di organizzare il codice in modo meno confuso.

Inoltre, invece di utilizzare acct = key.get() o acct = key.get_async().get_result(), la funzione dovrebbe utilizzare acct = yield key.get_async(). Questo yield indica a NDB che è un posto idoneo per sospendere l'impostazione e consentire l'esecuzione di altre attività.

Decorazione di una funzione del generatore con @ndb.tasklet rende la funzione restituita un Future invece di una generatore. All'interno della tasklet, tutti i yield di Future attende e restituisce il risultato di Future.

Ad esempio:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

Tieni presente che, anche se get_async() restituisce un Future, il framework del tasklet determina la yield per restituire il risultato di Future alla variabile acct.

map() chiama callback() diverse volte. Ma yield ..._async() in callback() consente allo scheduler di NDB di inviare molte richieste asincrone prima di attendere per completare una qualsiasi.

&quot;Ottieni&quot; asincrone sovrapposte
"Ottieni" asincrone sovrapposte

Se guardi questo esempio in Appstat, potresti sorprenderti nel vedere che tutti questi elementi Get non si sovrappongono: richiesta. NDB implementa un "autobatcher". Autobatcher raggruppa più richieste in una singola RPC batch al server; lo fa in modo che, purché ci sia altro lavoro da fare (potrebbe essere eseguito un altro callback) raccoglie le chiavi. Non appena uno dei dei risultati, l'autobatcher invia l'RPC batch. A differenza della maggior parte richieste, le query non sono "raggruppate".

Quando viene eseguita, una tasklet riceve lo spazio dei nomi predefinito da a prescindere da quella predefinita al momento della generazione della tasklet o da qualsiasi altro modificata in "durante l'esecuzione". In altre parole, lo spazio dei nomi predefinito non viene associato o archiviato in Contesto e la modifica del valore predefinito lo spazio dei nomi predefinito di un'altra attività non influisce sullo spazio dei nomi predefinito i tasklet, ad eccezione di quelli generati da quest'ultimo.

Tasklet, query parallele, rendimento parallelo

Puoi utilizzare i tasklet in modo che più query recuperino i record contemporaneamente. Ad esempio, supponiamo che la tua applicazione abbia una pagina che visualizza i contenuti di un carrello degli acquisti e un elenco di offerte speciali. Lo schema potrebbe essere simile al seguente:

class Account(ndb.Model):
    pass


class InventoryItem(ndb.Model):
    name = ndb.StringProperty()


class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()


class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

Uno "sincrona" funzione che ottiene articoli del carrello e offerte speciali potrebbe avere il seguente aspetto:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

In questo esempio vengono utilizzate query per recuperare elenchi di articoli del carrello e offerte. poi recupera i dettagli degli elementi dell'inventario con get_multi(). (Questa funzione non utilizza direttamente il valore restituito di get_multi(). Chiama get_multi() per recuperare tutti i dettagli dell'inventario nel Cache, in modo da poterli leggere rapidamente in un secondo momento.) get_multi combina più oggetti Get in un'unica richiesta. ma i recuperi della query avvengono uno dopo l'altra. Per fare in modo che i recuperi vengano eseguiti contemporaneamente, sovrapponi le due query:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

get_multi() è ancora separata: dipende dai risultati della query, quindi e non possono combinarla con le query.

Supponiamo che questa applicazione a volte abbia bisogno del carrello, a volte delle offerte, e talvolta entrambe le cose. Vuoi organizzare il codice in modo che ci sia per ottenere il carrello e una funzione per ottenere le offerte. Se le tue l'applicazione chiama insieme queste funzioni, idealmente le loro query che si possono "sovrapporre". Per farlo, crea dei tasklet con queste funzioni:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)


@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)


@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

L'importanza di yield xy ma facile da trascurare. Se si trattasse di due yield le dichiarazioni, che accadono in serie. Ma yielduna tupla delle attività è un rendimento parallelo: le attività possono essere eseguite in parallelo e yield attende che vengano completati e ritorni i risultati. (In alcuni linguaggi di programmazione, questo è chiamato ostacolo.)

Se trasformi una porzione di codice in un tasklet, probabilmente vorrai farne di più al più presto. Se noti l'indicazione "sincrona" che può essere eseguito in parallelo con un tasklet, probabilmente è una buona idea trasformarlo anche in un tasklet. Quindi puoi metterlo in parallelo con un valore yield parallelo.

Se scrivi una funzione di richiesta (una richiesta webapp2 funzione, una funzione di visualizzazione di Django e così via) per essere un tasklet, non farà quello che vuoi: cede, ma poi si ferma in esecuzione. In questa situazione, devi decorare la funzione con @ndb.synctasklet. @ndb.synctasklet è simile a @ndb.tasklet ma modificato per chiamare get_result() sulla tasklet. In questo modo la tasklet diventa una funzione che restituisce il risultato nel solito modo.

Iteratori di query nei tasklet

Per eseguire l'iterazione sui risultati della query in un tasklet, utilizza il seguente pattern:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Si tratta dell'equivalente di quanto segue, facilmente compatibile con le attività:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Le tre righe marcate della prima versione sono il testo ideale per le attività equivalente alla singola riga in grassetto nella seconda versione. I tasklet possono essere sospesi solo per una parola chiave yield. Il ciclo for senza yield non consente l'esecuzione di altri tasklet.

Potresti chiederti perché questo codice utilizza un iteratore di query anziché recuperando tutte le entità utilizzando qry.fetch_async(). L'applicazione potrebbe avere così tante entità da non rientrare nella RAM. Magari stai cercando un'entità e puoi interrompere l'iterazione una volta trovarlo; ma non puoi esprimere i tuoi criteri di ricerca solo con la query lingua. Potresti usare un iteratore per caricare entità da verificare, quindi uscire dal ciclo quando trovi ciò che cerchi.

Recupero URL asincrono con NDB

Un NDB Context ha un motore asincrono Funzione urlfetch() che si parallelizza perfettamente con le attività NDB, ad esempio:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

Il servizio di recupero URL ha il suo dell'API di richiesta asincrona. Va bene, ma non sempre è facile da usare con i tasklet NDB.

Utilizzo delle transazioni asincrone

Le transazioni possono anche essere eseguite in modo asincrono. Puoi passare una funzione esistente a ndb.transaction_async() o usa Decoratrice di @ndb.transactional_async. Come per le altre funzioni asincrone, verrà restituito un valore NDB Future:

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

Le transazioni funzionano anche con i tasklet. Ad esempio, potremmo modificare update_counter codice a yield in attesa del blocco RPC:

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

Utilizzo di Future.wait_any()

A volte potresti voler effettuare più richieste asincrone e ritorna ogni volta che la prima viene completata. Puoi farlo utilizzando il metodo della classe ndb.Future.wait_any():

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

Purtroppo, non c'è un modo conveniente per trasformarlo in un tasklet: un yield parallelo attende che tutti i Future completato, incluse quelle che non vuoi aspettare.