Una vez que las tareas están en una cola de extracción, un trabajador puede alquilarlas. Una vez que se hayan procesado las tareas, el trabajador debe eliminarlas.
Antes de empezar
- Crea una cola de extracción.
- Crea tareas y añádelas a la cola de extracción.
Contexto importante
- Este método solo se aplica a los workers que se ejecutan en un servicio del entorno estándar.
- Cuando usas colas de extracción, eres responsable de escalar tus trabajadores en función del volumen de procesamiento.
Asignar tareas
Una vez que las tareas están en la cola, un trabajador puede alquilar una o varias de ellas mediante el método lease_tasks()
. Puede que las tareas que hayas añadido recientemente con add()
tarden un poco en estar disponibles en lease_tasks()
.
Cuando solicitas una asignación, especificas el número de tareas que quieres asignar (hasta un máximo de 1000) y la duración de la asignación en segundos (hasta un máximo de una semana). La duración de la asignación debe ser suficiente para que la tarea más lenta tenga tiempo de ejecutarse antes de que caduque el periodo de asignación. Puedes modificar un alquiler de tarea con modify_task_lease()
.
Durante la asignación de una tarea, otro trabajador no puede procesarla. La tarea deja de estar disponible hasta que caduca la asignación.
El método lease_tasks()
devuelve un objeto Task que contiene una lista de tareas tomadas en préstamo de la cola.
pull-queue
durante una hora:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
Agrupación por lotes con etiquetas de tarea
No todas las tareas son iguales. Tu código puede "etiquetar" tareas y, a continuación, elegir las tareas que se van a alquilar por etiqueta. La etiqueta actúa como filtro.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
Regular las frecuencias de sondeo
Los trabajadores que sondean la cola para arrendar tareas deben detectar si están intentando arrendar tareas más rápido de lo que la cola puede proporcionárselas. Si se produce este error, se pueden generar las siguientes excepciones de lease_tasks()
:
google.appengine.api.taskqueue.TransientError
google.appengine.runtime.apiproxy_errors.DeadlineExceededError
Tu código debe detectar estas excepciones, dejar de llamar a lease_tasks()
y volver a intentarlo más tarde. Para evitar este problema, te recomendamos que definas un plazo de RPC más largo al llamar a lease_tasks(). También deberías reducir la frecuencia de las llamadas cuando una solicitud de LeaseTasks devuelva una lista de tareas vacía.
Si generas más de 10 solicitudes de LeaseTasks por cola y por segundo, solo las 10 primeras solicitudes devolverán resultados. Si las solicitudes superan este límite, se devuelve OK
con cero resultados.
Monitorizar tareas en la consola de Google Cloud
Para ver información sobre todas las tareas y colas de su aplicación, siga estos pasos:
Abre la página Cloud Tasks en la consola de Google Cloud y busca el valor Pull en la columna Tipo.
Haz clic en el nombre de la cola que te interese para abrir la página de detalles de la cola. Se muestran todas las tareas de la cola seleccionada.
Eliminar tareas
Una vez que un trabajador completa una tarea, debe eliminarla de la cola. Si ves que quedan tareas en una cola después de que un trabajador las haya procesado, es probable que el trabajador haya fallado. En ese caso, las tareas las procesará otro trabajador.
Puedes eliminar una lista de tareas, como la que devuelve lease_task()
, simplemente pasándola a delete_tasks()
:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
Ejemplo completo de colas de extracción
Para ver un ejemplo sencillo pero completo de principio a fin de cómo usar colas para tareas extraídas en Python, consulta appengine-pullqueue-counter.