Una vez que las tareas están en una lista de extracción, un trabajador puede asignarse tiempo para ellas. Después de procesar las tareas, el trabajador debe borrarlas.
Antes de comenzar
- Crea una lista de extracción.
- Crea tareas y agrégalas a la lista de extracción.
Contexto importante
- Este método solo se aplica a trabajadores que se ejecutan dentro de un servicio del entorno estándar.
- Cuando utilizas listas de extracción, tienes la responsabilidad de escalar los trabajadores según el volumen de procesamiento.
Asignación de tiempo para tareas
Cuando las tareas se encuentran en la lista, se puede asignar tiempo a un trabajador para una o más de ellas con el método
lease_tasks()
. Puede llevar algo de tiempo que las tareas recién agregadas con
add()
estén disponibles a través de
lease_tasks()
.
Cuando solicitas una asignación, debes especificar la cantidad de tareas (hasta un máximo
de 1,000) y la duración en segundos (hasta un máximo de una
semana). La asignación debe durar lo suficiente para garantizar que la tarea más lenta
tenga tiempo de completarse antes de que venza el período de la asignación. Puede modificar la asignación de tiempo
de una tarea con
modify_task_lease()
.
Cuando se asigna tiempo para una tarea, esta deja de estar disponible para que la procese otro trabajador hasta que venza la asignación.
El método
lease_tasks()
muestra un objeto Task que contiene una enumeración de las tareas de la lista con tiempo asignado.
pull-queue
:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
Agrupación en lotes con etiquetas de tareas
No todas las tareas son iguales. Tu código puede “etiquetar” tareas y después elegir para qué tareas asignar tiempo según la etiqueta. La etiqueta actúa como filtro.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
Regula tasas de sondeo
Los trabajadores que sondeen la lista en busca de tareas a las que se le puede asignar tiempo deberían detectar si están
intentando asignar tiempo con mayor rapidez de la que puede proporcionar la lista. Si se produce este
error, las siguientes excepciones de
lease_tasks()
pueden generarse:
google.appengine.api.taskqueue.TransientError
google.appengine.runtime.apiproxy_errors.DeadlineExceededError
El código debe detectar estas excepciones, dejar de llamar a
lease_tasks()
y volver a intentarlo en otro momento. Para evitar este problema, considera establecer un plazo mayor de RPC cuando llames a
lease_tasks()
. También debes retirarte cuando una solicitud
de asignación de tiempo muestra una lista vacía de tareas.
Si generas más de 10
solicitudes LeaseTasks por lista por segundo, solo las primeras 10 solicitudes mostrarán
resultados. Si las solicitudes superan este límite, se muestra OK
sin resultados.
Supervisa tareas en la consola de Google Cloud
Para ver información sobre todas las tareas y las listas en tu aplicación, haz lo siguiente:
Abre la página Cloud Tasks en la consola de Google Cloud y busca el valor Extraer en la columna Tipo.
Haz clic en el nombre de la lista que te interesa y abre la página de detalles de la lista. Se muestran todas las tareas en la lista seleccionada.
Cómo borrar tareas
Una vez que un trabajador completa una tarea, debe borrarla de la lista. Si ves que las tareas permanecen en una cola después de que el trabajador termina de procesarlas, es probable que haya fallado; en ese caso, otro trabajador procesará las tareas.
Puedes borrar una lista de tareas, como la que muestra
lease_tasks()
,
con solo pasarla a delete_tasks()
de la siguiente forma:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
Ejemplo completo de listas de extracción
Para obtener un ejemplo simple, pero integral completo sobre cómo usar las listas de extracción en Python, consulta appengine-pullqueue-counter.