Lease delle attività pull

Quando le attività si trovano in una coda in modalità pull, un worker può prenderle in leasing. Dopo l'elaborazione delle attività, il worker deve eliminarle.

Prima di iniziare

Contesto importante

  • Questo metodo è applicabile solo ai worker in esecuzione all'interno di un servizio nell'ambiente standard.
  • Quando utilizzi le code in modalità pull, sei responsabile della scalabilità dei worker in base al volume di elaborazione.

Leasing attività

Quando le attività sono nella coda, un worker può affittare una o più attività utilizzando il metodo lease_tasks(). Potrebbe trascorrere un po' di tempo prima che le attività aggiunte di recente utilizzando add() diventino disponibili tramite lease_tasks().

Quando richiedi un lease, specifichi il numero di attività di cui eseguire il lease (fino a un massimo di 1000) e la durata in secondi (fino a un massimo di una settimana). La durata del lease deve essere sufficientemente lunga da garantire che l'attività più lenta abbia il tempo di terminare prima della scadenza del periodo di lease. Puoi modificare il lease di un'attività utilizzando modify_task_lease().

L'attività in leasing rende non disponibile per l'elaborazione da parte di un altro worker e rimane non disponibile fino alla scadenza del lease.

Il metodo lease_tasks() restituisce un oggetto Task contenente un elenco di attività noleggiate dalla coda.

Il seguente esempio di codice prende in leasing 100 attività dalla coda pull-queue per un'ora:

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

Raggruppamento in batch con i tag attività

Non tutte le attività sono uguali. Il codice può "taggare" le attività e quindi scegliere le attività da affittare in base al tag. Il tag funge da filtro.

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.

Regolazione dei seggi elettorali

I worker che eseguono il polling della coda per il lease delle attività devono rilevare se stanno tentando di eseguire il lease delle attività più velocemente di quanto la coda possa fornirle. Se si verifica questo errore, è possibile generare le seguenti eccezioni da lease_tasks():

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


Il codice deve rilevare queste eccezioni, annullare la chiamata al numero lease_tasks() e riprovare più tardi. Per evitare questo problema, valuta la possibilità di impostare una scadenza RPC più alta quando chiami lease_tasks(). Dovresti anche annullare il servizio quando una richiesta di lease restituisce un elenco vuoto di attività.

Se generi più di 10 richieste LeaseTasks per coda al secondo, solo le prime 10 richieste restituiranno risultati. Se le richieste superano questo limite, viene restituito OK con zero risultati.

Monitoraggio delle attività nella console Google Cloud

Per visualizzare le informazioni su tutte le attività e le code nella tua applicazione:

  1. Apri la pagina Cloud Tasks nella console Google Cloud e cerca il valore Pull nella colonna Type (Tipo).

    Vai a Cloud Tasks

  2. Fai clic sul nome della coda di tuo interesse per aprire la pagina dei dettagli della coda. Vengono visualizzate tutte le attività nella coda selezionata.

Eliminazione attività in corso...

Una volta che un worker completa un'attività, deve eliminarla dalla coda. Se noti delle attività rimanenti in una coda al termine dell'elaborazione da parte di un worker, è probabile che il worker non sia riuscito; in questo caso, le attività verranno elaborate da un altro worker.

Puoi eliminare un elenco di attività, come quella restituita da lease_task(), semplicemente trasmettendolo a delete_tasks():

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

Un esempio end-to-end di code in modalità pull

Per un esempio end-to-end semplice ma completo dell'utilizzo delle code in modalità pull in Python, vedi appengine-pullqueue-counter.