Quando as tarefas estão em uma fila pull, o worker pode alocá-las. Após o processamento das tarefas, ele precisa excluí-las.
Antes de começar
- Crie uma fila pull.
- Crie tarefas e adicione-as à fila pull.
Contexto importante
- Esse método é aplicável apenas a workers que estejam sendo executados em um serviço no ambiente padrão.
- Ao usar filas pull, você é responsável pelo escalonamento dos workers com base no volume de processamento.
Como colocar tarefas em lease
Depois que as tarefas estiverem na fila, um worker poderá colocar uma ou mais delas em lease usando o método lease_tasks()
. Pode haver um pequeno atraso até que as tarefas adicionadas recentemente usando add()
sejam disponibilizadas por lease_tasks()
.
Ao solicitar um lease, você especifica o número de tarefas a serem alocadas (máximo de 1.000) e a duração do lease em segundos (no máximo uma semana). A duração do lease deve ser longa o suficiente para garantir que haja tempo suficiente para a conclusão da tarefa mais lenta antes do fim do período de lease. É possível modificar um lease de tarefa usando modify_task_lease()
.
Uma tarefa em lease fica indisponível para processamento por outro worker até que o lease expire.
O método lease_tasks()
retorna um objeto Task que contém uma lista de tarefas da fila que estão em lease.
pull-queue
por uma hora:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
Processamento em lote com tags de tarefa
Nem todas as tarefas são iguais. Seu código pode marcar tarefas com tags e depois escolher as que serão colocadas em lease por tag. A tag funciona como um filtro.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
Como regular taxas de pesquisa
Os workers que pesquisam a fila quanto a tarefas para colocar em lease precisam detectar se estão tentando o lease de tarefas mais rapidamente do que a fila pode fornecer. Se essa falha ocorrer, poderão ser geradas as exceções de lease_tasks()
a seguir:
google.appengine.api.taskqueue.TransientError
google.appengine.runtime.apiproxy_errors.DeadlineExceededError
Seu código precisa capturar essas exceções, desistir de chamar lease_tasks()
e tentar novamente mais tarde. Para evitar esse problema, defina um prazo de RPC mais alto ao chamar lease_tasks(). Você também precisa desistir quando uma solicitação de lease retornar uma lista vazia de tarefas.
Se você gerar mais de 10 solicitações LeaseTasks por fila por segundo, apenas as 10 primeiras retornarão resultados. Se as solicitações ultrapassarem esse limite, OK
será retornado sem nenhum resultado.
Como monitorar tarefas no Console do Google Cloud
Para ver informações sobre todas as tarefas e filas no aplicativo:
Abra a página do Cloud Tasks no Console do Google Cloud e procure o valor Pull na coluna Tipo.
Clique no nome da fila em que você está interessado e abra a página de detalhes dela. Serão exibidas todas as tarefas na fila selecionada.
Como excluir tarefas
Depois que o worker conclui uma tarefa, ele precisa excluí-la da fila. Se continuarem sendo exibidas tarefas restantes em uma fila depois que o worker terminar de processá-las, é provável que tenha ocorrido falha no worker. Nesse caso, as tarefas serão processadas por outro worker.
É possível excluir uma lista de tarefas, como a retornada por lease_task()
, simplesmente passando-a para delete_tasks()
:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
Um exemplo completo de filas pull
Para um exemplo simples e completo do uso de filas pull no Python, consulte appengine-pullqueue-counter.