Lease delle attività pull

Una volta che le attività si trovano in una coda in modalità pull, un worker può prestarle in leasing. Una volta elaborate le attività, il worker deve eliminarle.

Prima di iniziare

Contesto importante

  • Questo metodo è applicabile solo ai worker in esecuzione all'interno di un servizio nell'ambiente standard.
  • Quando utilizzi le code in modalità pull, sei responsabile della scalabilità dei worker in base al volume di elaborazione.

Attività di leasing

Quando le attività sono in coda, un worker può acquisire una o più attività in leasing utilizzando il metodo lease_tasks(). Potrebbe verificarsi un leggero ritardo prima che le attività aggiunte di recente utilizzando add() diventino disponibili tramite lease_tasks().

Quando richiedi un lease, specifichi il numero di attività da prendere in lease (fino a un massimo di 1000 attività) e la durata del lease in secondi (fino a un massimo di una settimana). La durata del lease deve essere abbastanza lunga da garantire che l'attività più lenta abbia il tempo di essere completata prima della scadenza del periodo di lease. Puoi modificare il lease di un'attività utilizzando modify_task_lease().

Il leasing di un'attività la rende non disponibile per l'elaborazione da parte di un altro worker e rimane non disponibile fino alla scadenza del lease.

Il metodo lease_tasks() restituisce un oggetto Task contenente un elenco di attività noleggiate dalla coda.

Il seguente esempio di codice prende in prestito 100 attività dalla coda pull-queue per un'ora:

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

Batch con tag attività

Non tutte le attività sono uguali; il codice può "taggare" le attività e quindi scegliere le attività da acquisire in base ai tag. Il tag agisce come filtro.

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.

Regolazione dei tassi di polling

I worker che eseguono il polling della coda per le attività di cui eseguire il lease devono rilevare se stanno tentando di eseguire il lease delle attività più velocemente rispetto a quanto consentito dalla coda. Se si verifica questo errore, è possibile generare le seguenti eccezioni da lease_tasks():

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


Il tuo codice deve rilevare queste eccezioni, annullare la chiamata al numero lease_tasks() e riprovare più tardi. Per evitare questo problema, valuta la possibilità di impostare una scadenza RPC più alta quando chiami lease_tasks(). Ti conviene anche tornare indietro quando una richiesta di lease restituisce un elenco vuoto di attività.

Se generi più di 10 richieste LeaseTasks per coda al secondo, verranno restituiti risultati solo nelle prime 10 richieste. Se le richieste superano questo limite, viene restituito OK con zero risultati.

Monitoraggio delle attività nella console Google Cloud

Per visualizzare le informazioni su tutte le attività e le code nella tua applicazione:

  1. Apri la pagina Cloud Tasks nella console Google Cloud e cerca il valore Pull nella colonna Tipo.

    Vai a Cloud Tasks

  2. Fai clic sul nome della coda che ti interessa per aprire la pagina dei dettagli della coda. Vengono visualizzate tutte le attività nella coda selezionata.

Eliminazione delle attività in corso...

Una volta che un worker ha completato un'attività, deve eliminarla dalla coda. Se noti delle attività che rimangono in coda dopo che un worker ha terminato di elaborarle, è probabile che il worker non sia riuscito; in questo caso, le attività verranno elaborate da un altro worker.

Puoi eliminare un elenco di attività, come quello restituito da lease_task(), semplicemente passandolo a delete_tasks():

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

Esempio end-to-end di code in modalità pull

Per un esempio end-to-end semplice ma completo dell'utilizzo delle code in modalità pull in Python, consulta appengine-pullqueue-counter.