Louer des tâches de retrait

Lorsque des tâches sont dans une file d'attente, un nœud de calcul peut les louer. Une fois les tâches traitées, le nœud de calcul doit les supprimer.

Avant de commencer

Contexte important

  • Cette méthode est uniquement applicable aux nœuds de calcul qui s'exécutent au sein d'un service dans l'environnement standard.
  • Lorsque vous utilisez des files d'attente de retrait, vous êtes responsable du scaling de vos nœuds de calcul en fonction de votre volume de traitement.

Louer des tâches

Lorsque les tâches sont dans une file d'attente, un nœud de calcul peut en louer une ou plusieurs avec la méthode lease_tasks(). Il peut y avoir un léger délai avant que les tâches récemment ajoutées à l'aide de add() ne soient disponibles via lease_tasks().

Pour effectuer une demande de bail, vous devez spécifier le nombre de tâches à louer (1 000 tâches maximum) et la durée du bail en secondes (maximum une semaine). La durée du bail doit être suffisamment longue pour que la tâche la plus lente puisse se terminer. Vous pouvez modifier la durée de bail d'une tâche à l'aide de modify_task_lease().

Louer une tâche empêche qu'elle soit traitée par un autre processus, et elle reste indisponible jusqu'à la fin de la location.

La méthode lease_tasks() renvoie un objet Task contenant une liste des tâches louées à partir de la file d'attente.

L'exemple de code suivant loue 100 tâches de la file d'attente pull-queue pendant une heure :

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

Traitement par lots avec des tags de tâches

Toutes les tâches ne se ressemblent pas ; votre code peut ajouter des "tags" aux tâches, puis choisir des tâches à louer par tag. Le tag agit comme un filtre.

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.

Réguler les interrogations

Les nœuds de calcul qui interrogent la file d'attente pour les tâches à louer doivent détecter s'ils essaient de louer des tâches plus rapidement que la file ne peut les fournir. En cas d'échec, les exceptions suivantes de lease_tasks() peuvent être générées :

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


Votre code doit relever ces exceptions, arrêter d'appeler lease_tasks() et réessayer ultérieurement. Pour éviter ce problème, pensez à définir un délai RPC plus élevé lors de l'appel de lease_tasks(). Vous devez également arrêter temporairement les appels lorsqu'une demande de bail renvoie une liste de tâches vide.

Si vous générez plus de 10 requêtes LeaseTasks par file et par seconde, seules les 10 premières requêtes renverront des résultats. Si le nombre de requêtes dépasse cette limite, OK est renvoyé sans résultat.

Surveiller des tâches dans la console Google Cloud

Pour afficher des informations sur toutes les tâches et les files d’attente de votre application :

  1. Ouvrez la page Cloud Tasks dans la console Google Cloud et recherchez la valeur Pull dans la colonne Type.

    Accéder à Cloud Tasks

  2. Cliquez sur le nom de la file qui vous intéresse et ouvrez la page des détails de la file. Toutes les tâches de la file d'attente sélectionnée sont affichées.

Supprimer des tâches

Une fois qu'un nœud de calcul a terminé une tâche, il doit la supprimer de la file d'attente. Si vous voyez des tâches restant dans une file d'attente après qu'un nœud de calcul a fini de les traiter, il est probable que ce dernier a échoué. Dans ce cas, les tâches seront traitées par un autre nœud de calcul.

Vous pouvez supprimer une liste de tâches, telle que celle renvoyée par lease_task(), en la transmettant simplement à delete_tasks() :

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

Exemple complet de files d'attente de retrait

Pour obtenir un exemple simple et complet d'utilisation de files d'attente de retrait en Python, consultez la page appengine-pullqueue-counter.