Créer des tâches d'envoi

Cette page explique comment créer des tâches et les placer dans des files d'attente d'envoi. Lorsque vous souhaitez traiter une tâche, vous devez créer un objet de tâche et le placer dans une file d'attente. Vous pouvez spécifier explicitement le service et le gestionnaire qui traitent la tâche et éventuellement transmettre au gestionnaire des données spécifiques à la tâche. Vous pouvez également ajuster la configuration de la tâche, par exemple en planifiant une heure future d'exécution ou en limitant le nombre de nouvelles tentatives en cas d'échec de la tâche.

Créer une tâche

Pour créer une tâche et la placer en file d'attente, appelez la fonction taskqueue.add(). Le code suivant crée une tâche qui cible le service nommé worker et appelle son gestionnaire en définissant l'URL /update-counter :

class EnqueueTaskHandler(webapp2.RequestHandler):
    def post(self):
        amount = int(self.request.get('amount'))

        task = taskqueue.add(
            url='/update_counter',
            target='worker',
            params={'amount': amount})

        self.response.write(
            'Task {} enqueued, ETA {}.'.format(task.name, task.eta))

Vous pouvez également créer un objet Task et appeler sa méthode add().

Spécifier le service de nœuds de calcul

Lorsqu'une tâche est extraite de sa file d'attente, le service de file d'attente de tâches l'envoie à un service de nœuds de calcul. Chaque tâche comporte un élément target (cible) et un élément url, qui déterminent le service et le gestionnaire qui exécuteront la tâche.

target

La cible spécifie le service qui recevra la requête HTTP permettant d'exécuter la tâche. Il s'agit d'une chaîne qui spécifie un service, une version ou une instance dans une forme canonique. Voici les formes les plus courantes :

    service
    version.service
    instance.version.service

La chaîne cible est ajoutée au début du nom de domaine de votre application. Il existe trois façons de définir la cible pour une tâche :

  • Déclarez la cible au moment de créer la tâche. Vous pouvez alors la définir explicitement à l'aide du paramètre target dans la fonction taskqueue.add(). Reportez-vous à l'exemple ci-dessus.

  • Incluez une instruction target lorsque vous définissez une file d'attente dans le fichier queue.yaml, comme dans la définition de queue-blue. Toutes les tâches ajoutées à une file d'attente avec une instruction target utiliseront cette cible, même si une autre cible a été attribuée à la tâche au moment de sa création.

  • Si aucune cible n'est spécifiée selon l'une des deux méthodes précédentes, la cible de la tâche correspond alors à la version du service qui la met en file d'attente. Notez que si vous placez ainsi en file d'attente une tâche issue du service et de la version par défaut, et que la version par défaut change avant que la tâche ne soit lancée, cette dernière s'exécute dans la nouvelle version par défaut.

url

L'élément url sélectionne l'un des gestionnaires du service cible, qui effectuera la tâche.

L'élément url doit correspondre à l'un des formats d'URL de gestionnaire du service cible. L'élément url peut comprendre des paramètres de requête si la méthode spécifiée dans la tâche est GET ou PULL. Si aucun élément url n'est spécifié, l'URL par défaut /_ah/queue/[QUEUE_NAME] est utilisée, où [QUEUE_NAME] correspond au nom de la file d'attente de la tâche.

Transmettre des données au gestionnaire

Vous pouvez transmettre des données au gestionnaire en tant que paramètres de requête dans l'URL de la tâche, mais seulement si la méthode spécifiée dans la tâche est GET ou PULL.

Vous pouvez également utiliser l'un des champs suivants pour ajouter des données à une tâche :

  • payload, qui livre les données de tâche dans le corps de la requête HTTP
  • params

Ces trois appels sont équivalents :

taskqueue.add(method=GET, url='/update-counter?key=blue', target='worker')
taskqueue.add(url='/update-counter', params={'key': 'blue'}, target='worker')
taskqueue.add(url='/update-counter', payload="{'key': 'blue'}", target='worker')

Nommer une tâche

Lorsque vous créez une tâche, App Engine lui attribue un nom unique par défaut. Vous pouvez toutefois attribuer le nom de votre choix à une tâche à l'aide du paramètre name. Les tâches auxquelles vous avez attribué un nom personnalisé présentent l'avantage d'être dédupliquées, ce qui signifie que vous pouvez utiliser des noms de tâches pour vous assurer que les tâches ne sont ajoutées qu'une seule fois. La déduplication se poursuit pendant neuf jours après l'achèvement ou la suppression de la tâche.

Notez que la logique de déduplication introduit une surcharge de performances importante, entraînant des latences accrues et des taux d'erreur potentiellement plus élevés associés aux tâches nommées. Ces coûts peuvent être considérablement augmentés si les tâches sont nommées de manière séquentielle, par exemple avec des horodatages. Par conséquent, si vous attribuez vos propres noms, nous vous recommandons d'utiliser un préfixe distribué de manière homogène pour les noms de tâches, tel qu'un hachage du contenu.

Si vous attribuez vos propres noms à des tâches, notez que leur longueur maximale est de 500 caractères et qu'ils peuvent contenir des majuscules, des minuscules, des chiffres, des traits de soulignement et des traits d'union.

taskqueue.add(url='/url/path', name='first-try')

Ajouter des tâches de manière asynchrone

Par défaut, les appels qui ajoutent des tâches aux files d'attente sont synchrones. Dans la plupart des scénarios, les appels synchrones fonctionnent correctement. Généralement, l'ajout d'une tâche à une file d'attente s'effectue rapidement. Un faible pourcentage d'opérations d'ajout de tâches peut prendre beaucoup plus de temps, mais le temps moyen d'ajout d'une tâche est inférieur à 5 ms.

Les opérations d'ajout de tâches à différentes files d'attente ne peuvent pas être regroupées par lots. Par conséquent, l'API Task Queue propose également des appels asynchrones qui vous permettent d'ajouter ces tâches en parallèle, ce qui réduit encore davantage cette latence. Ces appels sont utiles si vous créez une application extrêmement sensible à la latence, qui doit effectuer plusieurs opérations d'ajout de tâches à différentes files d'attente en même temps.

Si vous souhaitez effectuer des appels asynchrones vers une file d'attente de tâches, utilisez les méthodes asynchrones proposées par la classe Queue ainsi qu'un objet RPC. Appelez get_result() sur l'objet RPC renvoyé pour forcer la requête à se terminer. Lorsque vous ajoutez des tâches de manière asynchrone dans une transaction, vous devez appeler get_result() sur l'objet RPC pour vous assurer que la requête est terminée avant d'effectuer un commit de la transaction.

Placer des tâches en file d'attente dans des transactions Cloud Datastore

Dans le cadre d'une transaction Datastore, une tâche n'est placée en file d'attente (et garantie d'être mise en file d'attente) que si la transaction est correctement validée. Les tâches ajoutées à une transaction sont considérées comme faisant partie de celle-ci et possèdent le même niveau d'isolation et de cohérence.

Une application ne peut pas insérer plus de cinq tâches transactionnelles dans des files d'attente de tâches au cours d'une même transaction. Les tâches transactionnelles ne doivent pas porter de noms spécifiés par l'utilisateur.

L'exemple de code suivant montre comment insérer des tâches transactionnelles dans une file d'attente d'envoi dans le cadre d'une transaction Datastore :

from google.appengine.api import taskqueue
from google.appengine.ext import ndb

@ndb.transactional
def do_something_in_transaction():
  taskqueue.add(url='/path/to/my/worker', transactional=True)
  #...

do_something_in_transaction()

Utiliser la bibliothèque de tâches différée au lieu d'un service de nœuds de calcul

La configuration d'un gestionnaire pour chaque tâche distincte (telle que décrite dans les sections précédentes) peut s'avérer fastidieuse, tout comme la sérialisation et la désérialisation d'arguments complexes, notamment si vous souhaitez que la file d'attente exécute de nombreuses tâches diverses, mais de faible taille. Le SDK Python inclut une bibliothèque (google.appengine.ext.deferred) qui expose une simple fonction qui vous dispense de l'ensemble des étapes du processus de configuration des gestionnaires de tâches dédiés ainsi que des processus de sérialisation/désérialisation des paramètres.

Pour utiliser cette bibliothèque, vous devez ajouter l'instruction intégrée deferred à app.yaml. Pour en savoir plus, consultez la section Gestionnaires intégrés de la documentation de référence sur app.yaml.

Pour utiliser la bibliothèque deferred, il vous suffit de transmettre la fonction et ses arguments à deferred.defer() :

import logging

from google.appengine.ext import deferred

def do_something_expensive(a, b, c=None):
    logging.info("Doing something expensive!")
    # Do your work here

# Somewhere else
deferred.defer(do_something_expensive, "Hello, world!", 42, True)

La bibliothèque deferred regroupe l'appel de fonction et ses arguments, puis les ajoute à la file d'attente de tâches. Une fois la tâche exécutée, la bibliothèque deferred exécute la fonction do_something_expensive("Hello, world!", 42, True).

Utiliser des tâches dans une application mutualisée

Par défaut, les files d'attente d'envoi utilisent l'espace de noms actuel tel qu'il est défini dans le gestionnaire d'espaces de noms au moment de la création de la tâche. Si votre application exploite une architecture mutualisée, consultez la section API Namespaces Python 2.

Étape suivante