Operação assíncrona do NDB

Ao otimizar o desempenho de um aplicativo, pense no uso que ele faz do NDB. Por exemplo, se um aplicativo ler um valor que não esteja no cache, essa leitura demorará um pouco. Você pode acelerar seu aplicativo executando ações do Datastore em paralelo com outras coisas ou realizando algumas ações do Datastore em paralelo umas com as outras.

A biblioteca de cliente do NDB fornece muitas funções assíncronas ("async"). Cada uma delas permite que um aplicativo envie uma solicitação ao Datastore. A função retorna imediatamente com um objeto Future. O aplicativo pode fazer outras coisas enquanto o Datastore gerencia a solicitação. Depois que o Datastore gerenciar a solicitação, o aplicativo poderá receber os resultados do objeto Future.

Introdução

Suponha que um dos manipuladores de solicitação de seu aplicativo precise usar o NDB para escrever algo, talvez para registrar a solicitação. Ele também precisa executar algumas outras operações do NDB, talvez para buscar alguns dados.

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Substituindo a chamada para put() por uma chamada para o put_async() assíncrono equivalente, o aplicativo pode fazer outras coisas imediatamente, em vez de ficar ocupado com put().

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

Isso permite que as outras funções do NDB e a renderização de modelos ocorram enquanto o Datastore grava os dados. O Datastore não fica bloqueado pelo aplicativo até que este receba os dados dele.

Neste exemplo, não é conveniente chamar future.get_result: o aplicativo nunca usa o resultado do NDB. Esse código serve apenas para garantir que o gerenciador de solicitações não seja encerrado antes que o put do NDB seja concluído. Se o gerenciador de solicitações for encerrado muito cedo, o "put" talvez não aconteça. Se for conveniente, decore o gerenciador de solicitações com @ndb.toplevel. Isso faz com que o gerenciador não seja encerrado até que as solicitações assíncronas tenham sido concluídas. Por outro lado, permite que você envie a solicitação e não se preocupe com o resultado.

Você pode especificar um WSGIApplication inteiro como ndb.toplevel. Isso garante que cada um dos gerenciadores do WSGIApplication aguarde todas as solicitações assíncronas antes de retornar. Não aplica a categoria "toplevel" a todos os gerenciadores do WSGIApplication.


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

O uso de um aplicativo toplevel é mais conveniente do que todas as funções de gerenciador dele. Mas, se um método do gerenciador usa yield, esse método ainda precisa ser unido a outro decorador, @ndb.synctasklet. Caso contrário, ele deixará de ser executado em yield e não será concluído.

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Como usar as APIs assíncronas e os Futures

Quase toda função síncrona do NDB tem uma contraparte _async. Por exemplo, put() tem put_async(). Os argumentos da função assíncrona são sempre os mesmos da versão síncrona. O valor de retorno de um método assíncrono é sempre um Future ou (para funções "multi") uma lista de Futures.

Um Future é um objeto que mantém o estado de uma operação que foi iniciada, mas que ainda não foi concluída. Todas as APIs assíncronas retornam um ou mais Futures. Você pode chamar a função get_result() do Future para solicitar o resultado da operação dele. Em seguida, o Future fica bloqueado, se necessário, até que o resultado esteja disponível. Depois disso, entrega o resultado a você. get_result() retorna o valor que seria retornado pela versão síncrona da API.

Observação: se você já usou Futures em outras linguagens de programação, pode pensar que é possível usar um Future como resultado diretamente. Isso não funciona neste caso. Essas linguagens usam futures implícitos. O NDB usa futures explícitos. Chame get_result() para ter o resultado de um Future do NDB.

E se a operação gerar uma exceção? Isso depende de quando a exceção ocorre. Se o NDB perceber um problema ao fazer uma solicitação (talvez um argumento do tipo incorreto), o método _async() gerará uma exceção. Mas, se a exceção for detectada, digamos, pelo servidor do Datastore, o método _async() retornará um Future e a exceção será gerada quando o aplicativo chamar o get_result() dele. Não se preocupe muito com isso, tudo acaba se comportando de maneira bastante natural. Talvez a maior diferença seja que, se um rastreio for impresso, você verá algumas partes expostas do maquinário assíncrono de baixo nível.

Por exemplo, digamos que você esteja desenvolvendo um aplicativo de livro de visitas. Se o usuário tiver feito login, você quer apresentar uma página mostrando as postagens mais recentes do livro de visitas. Essa página também deve mostrar ao usuário o apelido dele. O aplicativo precisa de dois tipos de informações: as informações da conta do usuário que fez login e o conteúdo das postagens do livro de visitas. A versão "síncrona" desse aplicativo pode ser semelhante a:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Há duas ações independentes de E/S aqui: receber a entidade Account e buscar as entidades recentes do Guestbook. Usando a API síncrona, isso acontece sequencialmente. Esperamos receber as informações da conta antes de buscar as entidades do livro de visitas. Mas o aplicativo não precisa das informações da conta imediatamente. Podemos aproveitar isso e usar APIs assíncronas:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Essa versão do código cria dois Futures (acct_future e recent_entries_future) primeiro e, em seguida, aguarda por eles. O servidor funciona nas duas solicitações em paralelo. Cada chamada da função _async() cria um objeto Future e envia uma solicitação ao servidor do Datastore. O servidor pode começar a trabalhar na solicitação imediatamente. As respostas do servidor podem voltar em qualquer ordem arbitrária. O link do objeto Future responde às próprias solicitações correspondentes.

As solicitações síncronas não se sobrepõem, mas as assíncronas podem se sobrepor.
Solicitações síncronas versus assíncronas

O tempo total (real) gasto na versão assíncrona é aproximadamente igual ao tempo máximo entre as operações. O tempo total gasto na versão síncrona excede à soma dos tempos de operação. Se você puder executar mais operações em paralelo, as operações assíncronas ajudarão mais.

Para ver quanto tempo as consultas do aplicativo demoram ou quantas operações de E/S ele realiza por solicitação, avalie a possibilidade de usar o Appstats. Essa ferramenta pode mostrar gráficos semelhantes ao desenho acima, com base na instrumentação de um aplicativo ativo.

Como usar os tasklets

Um tasklet do NDB é um trecho de código que pode ser executado simultaneamente com outro código. Se você escrever um tasklet, seu aplicativo poderá usá-lo da mesma maneira que usa uma função assíncrona do NDB: ele chama o tasklet, que retorna um Future. Posteriormente, chamar o método get_result() do Future obtém o resultado.

Tasklets são uma maneira de gravar funções simultâneas sem linhas de execução. Eles são executados por um loop de eventos e podem se suspender bloqueando para E/S ou alguma outra operação usando uma instrução "yield". A noção de uma operação de bloqueio é abstraída na classe Future, mas um tasklet também pode produzir (yield) uma RPC para aguardar a conclusão dela. Quando o tasklet tem um resultado, ele gera (raise) uma exceção ndb.Return. Em seguida, o NDB associa o resultado ao yield produzido (Future) anteriormente.

Quando você escreve um tasklet do NDB, usa yield e raise de uma maneira incomum. Portanto, se você procurar exemplos de como usá-los, provavelmente não encontrará um código como um tasklet do NDB.

Para transformar uma função em um tasklet do NDB:

  • decore a função com @ndb.tasklet;
  • substitua todas as chamadas de armazenamento de dados síncronos por yields de chamadas de armazenamento de dados assíncrono;
  • faça a função "retornar" o valor de retorno com raise ndb.Return(retval), o que não será necessário se ela não retornar nada.

Um aplicativo pode usar tasklets para controlar melhor as APIs assíncronas. Por exemplo, pense no seguinte esquema:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

Ao exibir uma mensagem, faz sentido mostrar o apelido do autor. A maneira "síncrona" de buscar os dados para mostrar uma lista de mensagens pode ter esta aparência:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

Infelizmente, essa abordagem é ineficiente. Se você olhar para ela no Appstats, verá que as solicitações "Get" estão em série. Você pode ver o seguinte padrão "escada".

"Gets" síncronos ocorrem em série
"Gets" síncronos ocorrem em série.

Essa parte do programa será mais rápida se for possível sobrepor esses "Gets". Você pode reescrever o código para usar get_async, mas é difícil controlar quais solicitações assíncronas e mensagens são correspondentes.

O aplicativo pode definir sua própria função "assíncrona", tornando-a um tasklet. Isso permite organizar o código de maneira menos confusa.

Além disso, em vez de usar acct = key.get() ou acct = key.get_async().get_result(), a função precisa usar acct = yield key.get_async(). Esse yield informa ao NDB que esse é um bom momento para suspender esse tasklet e permitir que outros sejam executados.

Decorar uma função de gerador com @ndb.tasklet faz com que ela retorne um Future em vez de um objeto gerador. No tasklet, qualquer yield de um Future aguarda e retorna o resultado do Future.

Por exemplo:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

Observe que, embora get_async() retorne um Future, a estrutura do tasklet faz com que a expressão yield retorne o resultado do Future para a variável acct.

O map() chama callback() várias vezes. Mas o yield ..._async() em callback() permite que o programador do NDB envie muitas solicitações assíncronas antes de esperar que alguma delas seja concluída.

Sobreposição de "Gets" assíncronos
Sobreposição de "Gets" assíncronos

Se você observar isso no Appstats, ficará surpreso ao ver que esses vários "Gets" não se sobrepõem: todos eles passam pela mesma solicitação. O NDB implementa um "autobatcher". O autobatcher agrupa várias solicitações em um único RPC em lote para o servidor. Ele faz isso de tal maneira que, desde que haja mais trabalho a fazer (outro retorno de chamada pode ser executado), ele coleta chaves. Assim que um dos resultados for necessário, o autobatcher enviará o RPC em lote. Ao contrário da maioria das solicitações, as consultas não são "em lote".

Quando um tasklet é executado, ele extrai o namespace padrão do que era o padrão no momento da geração dele, ou de qualquer alteração feita nele durante a execução. Em outras palavras, o namespace padrão não está associado ou armazenado no Contexto, e a alteração do namespace padrão em um tasklet não afeta o namespace padrão em outros tasklets, exceto aqueles gerados por ele.

Tasklets, consultas paralelas, rendimento paralelo

Você pode usar tasklets para que várias consultas busquem registros ao mesmo tempo. Por exemplo, suponha que seu aplicativo tenha uma página que exiba o conteúdo de um carrinho de compras e uma lista de ofertas especiais. O esquema pode ser assim:

class Account(ndb.Model):
    pass

class InventoryItem(ndb.Model):
    name = ndb.StringProperty()

class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()

class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

Uma função "síncrona" que obtém itens e ofertas especiais do carrinho de compras pode se parecer com o seguinte:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

Esse exemplo usa consultas para buscar listas de itens e ofertas de carrinho de compras. Em seguida, busca detalhes sobre os itens de inventário com get_multi(). Essa função não usa o valor de retorno de get_multi() diretamente. Ela chama get_multi() para buscar no cache todos os detalhes do inventário para que possam ser lidos rapidamente mais tarde. get_multi combina muitos "Gets" em uma solicitação. Mas as buscas de consulta acontecem uma após a outra. Para fazer essas buscas acontecerem ao mesmo tempo, sobreponha as duas:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

A chamada get_multi() ainda é separada: depende dos resultados da consulta, portanto, não é possível combiná-la com as consultas.

Suponha que esse aplicativo às vezes precise do carrinho, às vezes das ofertas e, às vezes, de ambos. Você quer organizar seu código para que haja uma função para apresentar o carrinho e uma função para apresentar as ofertas. Se o aplicativo chamar essas funções ao mesmo tempo, o ideal será que as consultas se "sobreponham". Para fazer isso, crie estes tasklets:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)

@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)

@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

Esse yield xy é importante, mas fácil de ignorar. Se fossem duas declarações yield separadas, elas aconteceriam em série. No entanto, declarar (yield) uma tupla de tasklets é um yield paralelo, os tasklets podem ser executados em paralelo, e o yield aguarda o término de todos e retorna os resultados. Em algumas linguagens de programação, isso é conhecido como uma barreira.

Se você transformar um fragmento de código em um tasklet, provavelmente vai querer fazer mais em breve. Se você observar um código "síncrono" que possa ser executado em paralelo com um tasklet, provavelmente é uma boa ideia torná-lo um tasklet também. Então você pode paralelizá-lo com um yield paralelo.

Se você escrever uma função de solicitação, como uma função de solicitação webapp2 ou uma função de visualização do Django, para ser um tasklet, ela não fará o que você quer: ela é gerada, mas depois sua execução é interrompida. Nesta situação, você quer decorar a função com @ndb.synctasklet. @ndb.synctasklet é como @ndb.tasklet, mas alterado para chamar get_result() no tasklet. Isso transforma seu tasklet em uma função que retorna seu resultado da maneira usual.

Iteradores de consulta em tasklets

Para iterar resultados de consulta em um tasklet, use o seguinte padrão:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Esse é o equivalente compatível com tasklet do seguinte código:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

As três linhas em negrito na primeira versão são o equivalente a uma tarefa simples da linha em negrito na segunda versão. Os tasklets só podem ser suspensos em uma palavra-chave yield. O loop sem yield não permite que outros tasklets sejam executados.

Você pode se perguntar por que esse código usa um iterador de consulta, em vez de buscar todas as entidades usando qry.fetch_async(). O aplicativo pode ter tantas entidades que elas não cabem na RAM. Talvez você esteja procurando uma entidade e possa parar de iterar quando a encontrar, mas não é possível expressar seus critérios de pesquisa apenas com a linguagem de consulta. Você pode usar um iterador a fim de carregar entidades para verificar e, em seguida, sair do loop quando encontrar o que quer.

Urlfetch assíncrono com NDB

Um Context do NDB tem uma função urlfetch() assíncrona que paraleliza bem com os tasklets do NDB, por exemplo:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

O serviço de busca de URLs tem sua própria API de solicitação assíncrona. Tudo bem, mas nem sempre é fácil usá-la com os tasklets do NDB.

Como usar transações assíncronas

As transações também podem ser feitas de maneira assíncrona. Você pode passar uma função existente para ndb.transaction_async() ou usar o decorador @ndb.transactional_async. Como as outras funções assíncronas, isso retornará um Future do NDB:

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

As transações também funcionam com tasklets. Por exemplo, podemos alterar nosso código de update_counter para yield enquanto aguardamos o bloqueio de RPCs:

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

Como usar o Future.wait_any()

Às vezes, você quer fazer várias solicitações assíncronas e retornar sempre que a primeira for concluída. É possível fazer isso usando o método de classe ndb.Future.wait_any():

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

Infelizmente, não há uma maneira conveniente de transformar isso em um tasklet. Um yield paralelo aguarda até todos os Futures serem concluídos, incluindo aqueles que você não quer esperar.

Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…

Ambiente padrão do App Engine para Python 2