Asignación de tiempo para tareas de extracción

Una vez que las tareas están en una lista de extracción, un trabajador puede asignarse tiempo para ellas. Después de procesar las tareas, el trabajador debe borrarlas.

Antes de comenzar

Contexto importante

  • Este método solo se aplica a trabajadores que se ejecutan dentro de un servicio del entorno estándar.
  • Cuando utilizas las listas de extracción, tienes la responsabilidad de escalar los trabajadores según el volumen de procesamiento.

Asignación de tiempo para tareas

Cuando las tareas se encuentran en la lista, se puede asignar tiempo a un trabajador para una o más de ellas con el método lease_tasks(). Puede llevar algo de tiempo que las tareas recién agregadas con add() estén disponibles mediante lease_tasks().

Cuando solicitas una asignación, debes especificar la cantidad de tareas (hasta un máximo de 1,000) y la duración en segundos (hasta un máximo de una semana). La asignación debe durar lo suficiente para garantizar que la tarea más lenta tenga tiempo de completarse antes de que venza el período de la asignación. Puedes modificar la asignación de tiempo de una tarea mediante modify_task_lease().

Cuando se asigna una tarea, esta no está disponible para que la procese otro trabajador, y no lo estará hasta que venza la tarea.

El método lease_tasks() muestra un objeto Task que contiene una enumeración de las tareas de la lista con tiempo asignado.

En el ejemplo de código que sigue, se realiza una asignación de tiempo de una hora para 100 tareas de la lista pull-queue.

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

Agrupación en lotes con etiquetas de tareas

No todas las tareas son iguales. Tu código puede “etiquetar” tareas y después elegir para qué tareas asignar tiempo según la etiqueta. La etiqueta actúa como filtro.

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.

Regulación de tasas de sondeo

Los trabajadores que sondeen la lista en busca de tareas para las que puedan asignarse tiempo deberían detectar si están intentando asignarse tiempo para tareas más rápido de lo que la lista puede proporcionarlas. Si se produce este error, se pueden generar las siguientes excepciones de lease_tasks():

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


El código debería detectar estas excepciones, dejar de llamar a lease_tasks() y volver a intentarlo en otro momento. Para evitar este problema, puedes configurar un límite de RPC más elevado cuando se llama a lease_tasks(). También deberías detener los intentos cuando una solicitud de asignación de tiempo muestre una lista de tareas vacía.

Si generas más de 10 solicitudes LeaseTasks por lista por segundo, solo las primeras 10 mostrarán resultados. Si las solicitudes superan este límite, se muestra OK sin resultados.

Supervisa tareas en la consola

Para ver información sobre todas las tareas y las listas en tu aplicación, haz lo siguiente:

  1. Abre la página Cloud Tasks en la consola y busca el valor Extracción en la columna Tipo.

    Ir a Cloud Tasks

  2. Haz clic en el nombre de la lista que te interesa y abre la página de detalles de la lista. Se muestran todas las tareas en la lista seleccionada.

Cómo borrar tareas

Una vez que un trabajador completa una tarea, debe borrarla de la lista. Si ves que las tareas permanecen en una cola después de que el trabajador termina de procesarlas, es probable que haya fallado; en ese caso, otro trabajador procesará las tareas.

Puedes borrar una lista de tareas, como la que muestra lease_task(), con solo pasarla a delete_tasks() de la siguiente forma:

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

Ejemplo completo de listas de extracción

Para obtener un ejemplo de extremo a extremo simple pero completo sobre cómo usar las listas de extracción en Python, consulta appengine-pullqueue-counter.