Una volta che le attività sono in una coda pull, un worker può acquisire il loro leasing. Dopo l'elaborazione, il lavoratore deve eliminare le attività.
Prima di iniziare
- Crea una coda in modalità pull.
- Crea attività e aggiungile alla coda pull.
Contesto importante
- Questo metodo è applicabile solo ai worker in esecuzione in un servizio nell'ambiente standard.
- Quando utilizzi le code pull, è tua responsabilità eseguire il ridimensionamento dei tuoi worker in base al volume di elaborazione.
Attività di leasing
Una volta che le attività sono in coda, un lavoratore può acquisirne una o più utilizzando il metodo
lease_tasks()
. Potrebbe esserci un breve ritardo prima che le attività aggiunte di recente tramite
add()
diventino disponibili tramite
lease_tasks()
.
Quando richiedi un lease, specifichi il numero di attività da acquisire in leasing (fino a un massimo di 1000) e la durata del lease in secondi (fino a un massimo di una settimana). La durata del lease deve essere sufficientemente lunga per garantire che l'attività più lenta abbia il tempo di essere completata prima della scadenza del periodo di lease. Puoi modificare un contratto di assegnazione delle attività utilizzando
modify_task_lease()
.
Se una task viene concessa in leasing, non è disponibile per l'elaborazione da parte di un altro worker e rimane inaccessibile fino alla scadenza del leasing.
Il metodo
lease_tasks()
restituisce un oggetto Task contenente un elenco di attività prese in affitto dalla coda.
pull-queue
per un'ora:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
Raggruppamento con i tag delle attività
Non tutte le attività sono uguali; il codice può "taggare" le attività e poi scegliere le attività da concedere in leasing in base al tag. Il tag funge da filtro.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
Regolare le frequenze di polling
I worker che eseguono il polling della coda per le attività da assegnare in leasing devono rilevare se stanno tentando di assegnare in leasing le attività più velocemente di quanto la coda possa fornirle. Se si verifica questo errore, possono essere generate le seguenti eccezioni da
lease_tasks()
:
google.appengine.api.taskqueue.TransientError
google.appengine.runtime.apiproxy_errors.DeadlineExceededError
Il codice deve intercettare queste eccezioni, non chiamare
lease_tasks()
,
e riprova più tardi. Per evitare questo problema, ti consigliamo di impostare una scadenza RPC più elevata quando chiami
lease_tasks()
. Inoltre, devi eseguire il backoff quando una richiesta di assegnazione
restituisce un elenco vuoto di attività.
Se generi più di 10
richieste LeaseTasks per coda al secondo, solo le prime 10 richieste restituiranno risultati. Se le richieste superano questo limite, viene restituito OK
con zero risultati.
Monitoraggio delle attività nella console Google Cloud
Per visualizzare informazioni su tutte le attività e le code della tua applicazione:
Apri la pagina Cloud Tasks nella console Google Cloud e cerca il valore Pull nella colonna Tipo.
Fai clic sul nome della coda che ti interessa per aprire la pagina dei dettagli della coda. Vengono visualizzate tutte le attività nella coda selezionata.
Eliminazione delle attività
Una volta completata un'attività, il worker deve eliminarla dalla coda. Se noti che alcune attività rimangono in coda dopo che un worker ha completato l'elaborazione, è probabile che il worker non sia riuscito; in questo caso, le attività verranno elaborate da un altro worker.
Puoi eliminare un elenco di attività, ad esempio quello restituito da
lease_tasks()
,
semplicemente passandolo a delete_tasks()
:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
Un esempio end-to-end delle code pull
Per un esempio end-to-end semplice, ma completo, dell'utilizzo delle code pull in Python, consulta appengine-pullqueue-counter.