Lease delle attività di pull

Una volta che le attività si trovano in una coda in modalità pull, un worker può prenderle in leasing. Una volta elaborate le attività, il lavoratore deve eliminarle.

Prima di iniziare

Contesto importante

  • Questo metodo è applicabile solo ai worker in esecuzione all'interno di un servizio nell'ambiente standard.
  • Quando utilizzi le code pull, sei responsabile del dimensionamento dei worker in base al volume di elaborazione.

Leasing delle attività

Una volta che le attività sono in coda, un lavoratore può prenderne in affitto una o più utilizzando il metodo taskqueue.Lease. Potrebbe verificarsi un breve ritardo prima che le attività aggiunte di recente utilizzando taskqueue.Add diventino disponibili tramite taskqueue.Lease.

Quando richiedi un lease, specifichi il numero di attività da acquisire (fino a un massimo di 1000) e la durata del lease in secondi (fino a un massimo di una settimana). La durata del lease deve essere sufficiente a garantire che l'attività più lenta abbia il tempo di terminare prima della scadenza del periodo di lease. Puoi modificare una concessione di attività utilizzando taskqueue.ModifyLease.

Il leasing di un'attività la rende non disponibile per l'elaborazione da parte di un altro worker e rimane non disponibile fino alla scadenza del leasing.

Il seguente esempio di codice acquisisce 100 attività dalla coda pull-queue per un'ora:

tasks, err := taskqueue.Lease(ctx, 100, "pull-queue", 3600)

Raggruppamento con i tag delle attività

Non tutte le attività sono uguali: il codice può "taggare" le attività e poi scegliere quelle da affittare in base al tag. Il tag funge da filtro. Il seguente esempio di codice mostra come taggare le attività e poi eseguire il lease in base ai tag:

_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("parse"), Method: "PULL", Tag: "parse",
}, "pull-queue")
_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("render"), Method: "PULL", Tag: "render",
}, "pull-queue")

// leases render tasks, but not parse
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "render")

// Leases up to 100 tasks that have same tag.
// Tag is that of "oldest" task by ETA.
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "")

Regolazione dei tassi di polling

I worker che eseguono il polling della coda per le attività da acquisire devono rilevare se stanno tentando di acquisire attività più velocemente di quanto la coda possa fornirle. Se si verifica questo errore, taskqueue.Lease restituirà un errore di backoff.

Il tuo codice deve gestire questi errori, interrompere la chiamata a taskqueue.Lease e riprovare in un secondo momento. Per evitare questo problema, ti consigliamo di impostare una scadenza RPC più elevata quando chiami taskqueue.Lease. Devi anche eseguire il backoff quando una richiesta di lease restituisce un elenco vuoto di attività. Se generi più di 10 richieste LeaseTasks per coda al secondo, solo le prime 10 richieste restituiranno risultati. Se le richieste superano questo limite, viene restituito OK con zero risultati.

Monitoraggio delle attività nella console Google Cloud

Per visualizzare informazioni su tutte le attività e le code nella tua applicazione:

  1. Apri la pagina Cloud Tasks nella console Google Cloud e cerca il valore Pull nella colonna Tipo.

    Vai a Cloud Tasks

  2. Fai clic sul nome della coda che ti interessa per aprire la pagina dei dettagli. Mostra tutte le attività nella coda selezionata.

Eliminazione delle attività

Una volta completata un'attività, il worker deve eliminarla dalla coda. Se vedi attività rimanenti in una coda dopo che un worker ha terminato di elaborarle, è probabile che il worker non sia riuscito a completarle. In questo caso, le attività verranno elaborate da un altro worker.

Puoi eliminare un elenco di attività, ad esempio quello restituito da taskqueue.Lease, passandolo a taskqueue.DeleteMulti:

tasks, err = taskqueue.Lease(ctx, 100, "pull-queue", 3600)
// Perform some work with the tasks here

taskqueue.DeleteMulti(ctx, tasks, "pull-queue")