Quando le attività si trovano in una coda in modalità pull, un worker può prenderle in leasing. Dopo l'elaborazione delle attività deve essere eliminata dal worker.
Prima di iniziare
- Crea una coda pull.
- Crea attività e aggiungile alla coda pull.
Contesto importante
- Questo metodo è applicabile solo ai worker in esecuzione in un servizio nell'ambiente standard.
- Quando utilizzi le code in modalità pull, sei responsabile della scalabilità dei worker in base al volume di elaborazione.
Attività di leasing
Una volta che le attività sono in coda, un lavoratore può acquisirne una o più in leasing utilizzando il metodo lease_tasks()
. Potrebbe trascorrere un po' di tempo prima che le attività aggiunte di recente utilizzando add()
diventino disponibili tramite lease_tasks()
.
Quando richiedi un lease, specifichi il numero di attività di cui eseguire il lease (fino a un massimo di 1000) e la durata in secondi (fino a un massimo di una settimana). La durata del lease deve essere sufficientemente lunga da garantire che l'attività più lenta abbia il tempo di terminare prima della scadenza del periodo di lease. Puoi modificare il lease di un'attività utilizzando modify_task_lease()
.
L'attività in leasing rende non disponibile per l'elaborazione da parte di un altro worker e rimane non disponibile fino alla scadenza del lease.
Il metodo lease_tasks()
restituisce un oggetto Task contenente un elenco di attività prese in affitto dalla coda.
pull-queue
per un'ora:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
Raggruppamento con i tag delle attività
Non tutte le attività sono uguali; il codice può "taggare" le attività e poi scegliere le attività da concedere in leasing in base al tag. Il tag funge da filtro.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
Regolazione dei seggi elettorali
I worker che eseguono il polling della coda per il lease delle attività devono rilevare se stanno tentando di eseguire il lease delle attività più velocemente di quanto la coda possa fornirle. Se si verifica questo errore, possono essere generate le seguenti eccezioni da lease_tasks()
:
google.appengine.api.taskqueue.TransientError
google.appengine.runtime.apiproxy_errors.DeadlineExceededError
Il codice deve intercettare queste eccezioni, annullare la chiamata a lease_tasks()
e riprovare più tardi. Per evitare questo problema, valuta la possibilità di impostare una scadenza RPC più alta quando chiami lease_tasks(). Dovresti anche tornare indietro quando una richiesta di lease restituisce un elenco vuoto di attività.
Se generi più di 10 richieste LeaseTasks per coda al secondo, solo le prime 10 richieste restituiranno risultati. Se le richieste superano questo limite, viene restituito OK
con zero risultati.
Monitoraggio delle attività nella console Google Cloud
Per visualizzare le informazioni su tutte le attività e le code nella tua applicazione:
Apri la pagina Cloud Tasks nella console Google Cloud e cerca il valore Pull nella colonna Tipo.
Fai clic sul nome della coda che ti interessa per aprire la pagina dei dettagli della coda. Vengono visualizzate tutte le attività nella coda selezionata.
Eliminazione attività in corso...
Una volta che un worker completa un'attività, deve eliminarla dalla coda. Se noti che alcune attività rimangono in coda dopo che un worker ha completato l'elaborazione, è probabile che il worker non sia riuscito. In questo caso, le attività verranno elaborate da un altro worker.
Puoi eliminare un elenco di attività, ad esempio quello restituito da lease_task()
, semplicemente passandolo a delete_tasks()
:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
Un esempio end-to-end delle code pull
Per un esempio end-to-end semplice, ma completo, dell'utilizzo delle code pull in Python, consulta appengine-pullqueue-counter.