Una volta che le attività si trovano in una coda in modalità pull, un worker può prenderle in leasing. Una volta elaborate le attività, il lavoratore deve eliminarle.
Prima di iniziare
- Crea una coda in modalità pull.
- Crea attività e aggiungile alla coda in modalità pull.
Contesto importante
- Questo metodo è applicabile solo ai worker in esecuzione all'interno di un servizio nell'ambiente standard.
- Quando utilizzi le code pull, sei responsabile del dimensionamento dei worker in base al volume di elaborazione.
Leasing delle attività
Una volta che le attività sono in coda, un lavoratore può prenderne in affitto una o più utilizzando il metodo lease_tasks()
. Potrebbe verificarsi un breve ritardo prima che le attività aggiunte di recente utilizzando add()
diventino disponibili tramite lease_tasks()
.
Quando richiedi un lease, specifichi il numero di attività da acquisire (fino a un massimo di 1000) e la durata del lease in secondi (fino a un massimo di una settimana). La durata del lease deve essere sufficiente a garantire che l'attività più lenta abbia il tempo di terminare prima della scadenza del periodo di lease. Puoi modificare una concessione di attività utilizzando modify_task_lease()
.
Il leasing di un'attività la rende non disponibile per l'elaborazione da parte di un altro worker e rimane non disponibile fino alla scadenza del leasing.
Il metodo lease_tasks()
restituisce un oggetto Task contenente un elenco di attività prese in leasing dalla coda.
pull-queue
per un'ora:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
Raggruppamento con i tag delle attività
Non tutte le attività sono uguali: il codice può "taggare" le attività e poi scegliere quelle da affittare in base al tag. Il tag funge da filtro.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
Regolazione dei tassi di polling
I worker che eseguono il polling della coda per le attività da acquisire devono rilevare se stanno tentando di acquisire attività più velocemente di quanto la coda possa fornirle. Se si verifica questo errore, possono essere generate le seguenti eccezioni da lease_tasks()
:
google.appengine.api.taskqueue.TransientError
google.appengine.runtime.apiproxy_errors.DeadlineExceededError
Il codice deve rilevare queste eccezioni, interrompere la chiamata a lease_tasks()
e riprovare più tardi. Per evitare questo problema, valuta la possibilità di impostare un limite RPC più elevato quando chiami lease_tasks(). Devi anche eseguire il backoff quando una richiesta di lease restituisce un elenco vuoto di attività.
Se generi più di 10 richieste LeaseTasks al secondo per coda, solo le prime 10 richieste restituiranno risultati. Se le richieste superano questo limite, viene restituito OK
con zero risultati.
Monitoraggio delle attività nella console Google Cloud
Per visualizzare informazioni su tutte le attività e le code nella tua applicazione:
Apri la pagina Cloud Tasks nella console Google Cloud e cerca il valore Pull nella colonna Tipo.
Fai clic sul nome della coda che ti interessa per aprire la pagina dei dettagli. Mostra tutte le attività nella coda selezionata.
Eliminazione delle attività
Una volta completata un'attività, il worker deve eliminarla dalla coda. Se vedi attività rimanenti in una coda dopo che un worker ha terminato di elaborarle, è probabile che il worker non sia riuscito a completarle. In questo caso, le attività verranno elaborate da un altro worker.
Puoi eliminare un elenco di attività, ad esempio quello restituito da lease_task()
, semplicemente passandolo a delete_tasks()
:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
Un esempio end-to-end di code pull
Per un esempio end-to-end semplice ma completo di utilizzo delle code pull in Python, vedi appengine-pullqueue-counter.