Operación asíncrona de NDB

Cuando optimices el rendimiento de una aplicación, considera usar NDB. Por ejemplo, si una aplicación lee un valor que no está en caché, esa lectura toma un tiempo. Para acelerar tu aplicación, realiza las acciones de Datastore en paralelo con otras actividades, o realiza algunas acciones de Datastore de manera simultánea.

La biblioteca cliente de NDB proporciona muchas funciones asíncronas. Cada una de estas funciones permite que una aplicación envíe una solicitud a Datastore. La función entrega el resultado de inmediato y muestra un objeto Future. La aplicación puede realizar otras acciones mientras Datastore administra la solicitud. Una vez que Datastore administra la solicitud, la aplicación puede obtener los resultados del objeto Future.

Introducción

Supongamos que uno de los controladores de solicitudes de la aplicación necesita usar NDB para escribir algo, como registrar la solicitud. También necesita realizar otras operaciones de NDB, como obtener algunos datos.

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Si reemplazas la llamada a put() por una llamada a su equivalente asíncrono put_async(), la aplicación puede realizar otras acciones de inmediato, en lugar de bloquearse en put().

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

Esto permite que se realicen el procesamiento de plantillas y las demás funciones de NDB mientras Datastore escribe los datos. La aplicación no se bloquea en Datastore hasta que obtiene los datos de Datastore.

En este ejemplo, no tiene mucho sentido llamar a future.get_result, ya que la aplicación nunca usa el resultado de NDB. El único propósito de ese código es garantizar que el controlador de solicitudes no se cierre antes de que finalice la función put de NDB. Si el controlador de solicitudes se cierra antes de tiempo, es posible que la función put nunca se ejecute. Para tu comodidad, puedes decorar el controlador de solicitudes con @ndb.toplevel. Esto le indica al controlador que no debe cerrarse hasta que sus solicitudes asíncronas hayan finalizado. A su vez, esto te permite enviar la solicitud sin preocuparte por el resultado.

Puedes especificar una WSGIApplication completa como ndb.toplevel. Esto garantiza que cada uno de los controladores de WSGIApplication esperará todas las solicitudes asíncronas antes de regresar (No usa una función “toplevel” para todos los controladores de WSGIApplication).


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

Es más conveniente usar una aplicación de toplevel que todas sus funciones de controlador. Sin embargo, si un método de controlador usa yield, ese método aún debe unirse en otro decorador, @ndb.synctasklet. De lo contrario, dejará de ejecutarse en el yield y no finalizará.

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Usa APIs asíncronas y objetos Future

Casi todas las funciones de NDB síncronas tienen una contraparte _async. Por ejemplo, put() tiene put_async(). Los argumentos de la función asíncrona son siempre los mismos que los de la versión síncrona. El valor resultante de un método asíncrono siempre es un objeto Future o una lista de Future (para las funciones “múltiples”).

Un Future es un objeto que mantiene el estado de una operación que se inició, pero que quizás todavía no se completó. Todas las API asíncronas muestran uno o más Futures. Puedes llamar a la función get_result() de Future para pedirle el resultado de su operación. A continuación, el objeto Future se bloquea, si es necesario, hasta que el resultado esté disponible, y, luego, se entrega. get_result() muestra el valor que mostraría la versión síncrona de la API.

Nota: Si usaste objetos futuros en otros lenguajes de programación, podrías pensar que puedes usar directamente un objeto futuro. Eso no funciona aquí. Esos lenguajes usan objetos Future implícitos, pero NDB emplea Future explícitos. Llama a get_result() para obtener el resultado de un Future de NDB.

¿Qué pasa si la operación plantea una excepción? Eso depende de cuándo se produce la excepción. Si NDB detecta un problema mientras realiza una solicitud (por ejemplo, un argumento del tipo incorrecto), el método _async() genera una excepción. Pero si el servidor de Datastore detecta la excepción, el método _async() muestra un Future y la excepción se producirá cuando la aplicación llame a su get_result(). No te preocupes demasiado por esto, ya que al final el comportamiento termina siendo bastante natural. Quizás la mayor diferencia es que si se imprime un objeto traceback, verás expuestas algunas piezas de la maquinaria asíncrona de bajo nivel.

Por ejemplo, supón que estás escribiendo una aplicación de libro de visitas. Si el usuario ya accedió, debes presentar una página que muestre las publicaciones más recientes del libro de visitas. Esta página también debería mostrar al usuario su sobrenombre. La aplicación necesita dos tipos de información: la información de la cuenta del usuario que accedió y el contenido de las publicaciones del libro de visitas. La versión “síncrona” de esta aplicación puede tener el siguiente aspecto:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

A continuación, se muestran dos acciones de E/S independientes: obtener la entidad Account y recuperar las entidades Guestbook recientes. Mediante el uso de la API síncrona, estas acciones suceden una a continuación de la otra. Esperamos hasta recibir la información de la cuenta antes de buscar las entidades del libro de visitas. No obstante, la aplicación no necesita la información de la cuenta de inmediato. Podemos aprovechar esto y usar APIs asíncronas:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Esta versión del código primero crea dos Futures (acct_future y recent_entries_future) y, luego, los espera. El servidor trabaja en ambas solicitudes en paralelo. Cada llamada a la función _async() crea un objeto Future y envía una solicitud al servidor de Datastore. El servidor puede comenzar a trabajar en la solicitud de inmediato. Las respuestas del servidor pueden volver en cualquier orden arbitrario. El objeto Future vincula las respuestas con sus solicitudes correspondientes.

Las solicitudes sincrónas no se superponen, pero las asincrónas sí pueden hacerlo.
Solicitudes síncronas frente a asíncronas

El tiempo total (real) empleado en la versión asíncrona es aproximadamente igual al tiempo máximo de todas las operaciones. El tiempo total empleado en la versión síncrona excede la suma de los tiempos de operación. Si puedes ejecutar más operaciones en paralelo, las operaciones asíncronas son más convenientes.

Para ver cuánto tiempo toman las consultas de tu aplicación o cuántas operaciones de E/S realiza por solicitud, considera usar Appstats. Esta herramienta puede mostrar gráficos similares al dibujo anterior basados en la instrumentación de una aplicación en vivo.

Usa tasklets

Un tasklet de NDB es un fragmento de código que puede ejecutarse de manera simultánea con otro código. Si escribes un tasklet, tu aplicación puede usarlo de la misma manera que usa una función asíncrona de NDB: llama al tasklet, que muestra un Future y, luego, llama al método get_result() de Future para obtener el resultado.

Los tasklets son una forma de ejecutar operaciones de escritura simultáneas sin subprocesos. Se ejecutan en un bucle de eventos y pueden suspenderse a sí mismos con un bloqueo para E/S o alguna otra operación mediante una instrucción yield. La noción de una operación de bloqueo se abstrae en la clase Future, pero un tasklet también puede yield una RPC para esperar a que se complete esa RPC. Cuando el tasklet tiene un resultado, genera (raise) una excepción ndb.Return; luego, NDB asocia el resultado con el Future obtenido antes con la instrucción yield.

Cuando escribes un tasklet de NDB, usas yield y raise de una manera inusual. Por lo tanto, si buscas ejemplos de cómo usarlas, es probable que no encuentres código como un tasklet de NDB.

Para convertir una función en un tasklet de NDB, haz lo siguiente:

  • Decora la función con @ndb.tasklet.
  • Reemplaza todas las llamadas del almacén de datos síncrono por yield de las llamadas del almacén de datos asíncrono.
  • Haz que la función “muestre” su valor resultante con raise ndb.Return(retval) (no es necesario si la función no muestra nada).

Una aplicación puede usar tasklets para ejercer un control más preciso de las APIs asíncronas. Como ejemplo, considera el siguiente esquema:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

Cuando se muestra un mensaje, tiene sentido mostrar el sobrenombre del autor. La forma “síncrona” de obtener los datos para mostrar una lista de mensajes podría tener este aspecto:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

Desafortunadamente, este enfoque es ineficiente. Si lo observaras en Appstats, verías que las solicitudes "Get" están en serie. Es posible que veas el siguiente patrón de “escalera”.

Las solicitudes “Get” síncronas ocurren en serie
Las solicitudes “Get” síncronas ocurren en serie

Esta parte del programa sería más rápida si las solicitudes “Get” pudieran superponerse. Puedes volver a escribir el código para usar get_async, pero es difícil hacer un seguimiento de qué solicitudes y mensajes asíncronos deben estar juntos.

Para definir su propia función “asíncrona” , la aplicación puede convertirla en un tasklet. Esto te permite organizar el código de manera menos confusa.

Además, en lugar de usar acct = key.get() o acct = key.get_async().get_result(), la función debe usar acct = yield key.get_async(). Esta instrucción yield le indica a NDB que este es un buen punto para suspender este tasklet y permitir que se ejecuten otros tasklets.

Decorar una función de generador con @ndb.tasklet hace que la función muestre un Future en lugar de un objeto de generador. Dentro del tasklet, cualquier yield de un Future espera y muestra el resultado de Future.

Por ejemplo:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

Ten en cuenta que, aunque get_async() muestra un Future, el framework del tasklet hace que la expresión yield muestre el resultado del Future a la variable acct.

El map() llama a callback() varias veces. Sin embargo, yield ..._async() en callback() permite que el programador de NDB envíe muchas solicitudes asíncronas antes de esperar a que alguna de ellas finalice.

Solicitudes “Get” asíncronas superpuestas
Solicitudes “Get” asíncronas superpuestas

Si observas esto en las estadísticas de la aplicación, es posible que te sorprendas cuando veas que estas solicitudes “Get” no solo se superponen, sino que todas se procesan en la misma solicitud. NDB implementa un “procesador por lotes automático”. El procesador por lotes automático agrupa varias solicitudes en una sola RPC por lotes al servidor. Lo hace de tal manera que, si hay más trabajo por hacer (podría ejecutarse otra devolución de llamada), recopila las claves. En cuanto se necesite uno de los resultados, el procesador por lotes automático envía la RPC por lotes. A diferencia de la mayoría de las solicitudes, las consultas no se “procesan por lotes”.

Cuando se ejecuta un tasklet, obtiene su espacio de nombres predeterminado de lo que era el valor predeterminado cuando se generó el tasklet, o a lo que lo cambió el tasklet durante su ejecución. En otras palabras, el espacio de nombres predeterminado no está asociado o almacenado en el contexto, y el cambio del espacio de nombres predeterminado en un tasklet no afecta el espacio de nombres predeterminado en otros tasklets, excepto aquellos generados por él.

Tasklets, consultas paralelas y rendimiento paralelo

Puedes usar tasklets para que varias consultas obtengan registros al mismo tiempo. Por ejemplo, supongamos que tu aplicación tiene una página que muestra el contenido de un carrito de compras y una lista de ofertas especiales. El esquema podría verse así:

class Account(ndb.Model):
    pass

class InventoryItem(ndb.Model):
    name = ndb.StringProperty()

class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()

class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

Una función “síncrona” que obtiene elementos del carrito de compras y ofertas especiales podría verse de la siguiente manera:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

En este ejemplo, se usan consultas para recuperar listas de ofertas y elementos del carrito; luego, se recuperan detalles sobre los elementos del inventario con get_multi(). Esta función no usa el valor de muestra de get_multi() directamente, sino que llama a get_multi() para recuperar todos los detalles del inventario en la caché a fin de que se puedan leer con rapidez más adelante. get_multi combina muchas solicitudes Get en una sola. Sin embargo, las recuperaciones de consultas ocurren una tras otra. Para que esas búsquedas se realicen al mismo tiempo, superpón las dos consultas:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

La llamada a get_multi() aún es independiente: depende de los resultados de la consulta, por lo que no se puede combinar con las consultas.

Supongamos que esta aplicación a veces necesita el carrito; a veces, las ofertas; y, a veces, ambos. Deseas organizar tu código de modo que haya una función para obtener el carrito y una a fin de obtener las ofertas. Si la aplicación llama a estas funciones juntas, lo ideal sería que sus consultas se pudieran “superponer”. Para ello, crea los siguientes tasklets de funciones:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)

@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)

@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

Esa instrucción yield xy es importante, pero fácil de pasar por alto. Si se tratara de dos declaraciones yield independientes, ocurrirían en serie. Sin embargo, cuando se usa yield en una tupla de tasklets, la operación es un yield en paralelo: los tasklets pueden ejecutarse en paralelo y yield esperará a que todos terminen y muestren los resultados. En algunos lenguajes de programación, esto se conoce como una barrera.

Si conviertes un fragmento de código en un tasklet, probablemente querrás volver a hacerlo pronto. Si observas un código “síncrono” que podría ejecutarse en paralelo con un tasklet, es probable sea una buena idea convertirlo en un tasklet también. Luego, podrás paralelizarlo con una función yield en paralelo.

Si escribes una función de solicitud (una función de solicitud webapp2, una función de vista de Django, etc.) para que sea un tasklet, no hará lo que deseas: producirá un yield, pero, luego, dejará de ejecutarse. En esta situación, debes decorar la función con @ndb.synctasklet. @ndb.synctasklet es como @ndb.tasklet, pero se modificó para llamar a get_result() en el tasklet. Esto convierte tu tasklet en una función que muestra su resultado de la manera habitual.

Iteradores de consultas en tasklets

Para iterar los resultados de las consultas en un tasklet, usa el siguiente patrón:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Este es el equivalente fácil de usar del tasklet para el siguiente código:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Las tres líneas en negrita de la primera versión son el equivalente fácil de usar del tasklet de la línea en negrita de la segunda versión. Los tasklets solo se pueden suspender con la palabra clave yield. El bucle sin yield no permite que se ejecuten otros tasklets.

Quizás te preguntes por qué este código usa un iterador de consultas en lugar de recuperar todas las entidades mediante qry.fetch_async(). Es posible que la aplicación tenga tantas entidades que no quepan en la RAM. Quizás buscas una entidad y puedes detener la iteración una vez que la encuentres, pero no puedes expresar tus criterios de búsqueda solo con el lenguaje de la consulta. Puedes usar un iterador para cargar las entidades que deseas verificar y, luego, salir del bucle cuando encuentres lo que deseas.

Recuperación de URL asíncrona con NDB

Un Context de NDB tiene una función urlfetch() asíncrona que se paraleliza bien con los tasklets de NDB, por ejemplo:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

El servicio de recuperación de URL tiene su propia API de solicitud asíncrona. Está bien, pero no siempre es fácil de usar con los tasklets de NDB.

Usa transacciones asíncronas

Las transacciones también se pueden realizar de forma asíncrona. Puedes pasar una función existente a ndb.transaction_async() o usar el decorador @ndb.transactional_async. Al igual que las otras funciones asíncronas, mostrará un Future de NDB:

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

Las transacciones también funcionan con los tasklets. Por ejemplo, podríamos cambiar el código update_counter a yield mientras esperamos el bloqueo de las RPCs:

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

Usa Future.wait_any()

A veces, deseas realizar varias solicitudes asíncronas y mostrarlas cuando se complete la primera. Puedes hacerlo mediante el método de la clase ndb.Future.wait_any().

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

Por desgracia, no existe una manera conveniente de convertir esto en un tasklet; un yield en paralelo espera a que se completen todos los Future, incluidos aquellos que no quieres esperar.