Lease delle attività di pull

Una volta che le attività sono in una coda in modalità pull, un worker può acquisire il loro leasing. Dopo l'elaborazione, il lavoratore deve eliminare le attività.

Prima di iniziare

Contesto importante

  • Questo metodo è applicabile solo ai worker in esecuzione in un servizio nell'ambiente standard.
  • Quando utilizzi le code pull, sei responsabile della scalabilità dei worker in base al volume di elaborazione.

Attività di leasing

Una volta che le attività sono in coda, un lavoratore può acquisirne una o più in leasing utilizzando il metodo taskqueue.Lease. Potrebbe esserci un breve ritardo prima che le attività aggiunte di recente utilizzando taskqueue.Add diventino disponibili tramite taskqueue.Lease.

Quando richiedi un lease, specifichi il numero di attività da acquisire in leasing (fino a un massimo di 1000) e la durata del lease in secondi (fino a un massimo di una settimana). La durata del leasing deve essere sufficientemente lunga per garantire che l'attività più lenta abbia il tempo di essere completata prima della scadenza del periodo di leasing. Puoi modificare un leasing di attività utilizzando taskqueue.ModifyLease.

Se una task viene concessa in leasing, non è disponibile per l'elaborazione da parte di un altro worker e rimane inaccessibile fino alla scadenza del leasing.

Il seguente esempio di codice acquisisce in leasing 100 attività dalla coda pull-queue per un'ora:

tasks, err := taskqueue.Lease(ctx, 100, "pull-queue", 3600)

Raggruppamento con i tag delle attività

Non tutte le attività sono uguali; il codice può "taggare" le attività e poi scegliere le attività da concedere in leasing in base al tag. Il tag funge da filtro. Il seguente esempio di codice mostra come taggare le attività e poi affittarle in base ai tag:

_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("parse"), Method: "PULL", Tag: "parse",
}, "pull-queue")
_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("render"), Method: "PULL", Tag: "render",
}, "pull-queue")

// leases render tasks, but not parse
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "render")

// Leases up to 100 tasks that have same tag.
// Tag is that of "oldest" task by ETA.
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "")

Regolare le frequenze di polling

I worker che eseguono il polling della coda per le attività da concedere in leasing devono rilevare se stanno tentando di acquisire attività più velocemente di quanto la coda possa fornirle. Se si verifica questo errore, taskqueue.Lease restituirà un errore di ritiro.

Il codice deve gestire questi errori, ritirarsi dalla chiamata a taskqueue.Lease e riprovare più tardi. Per evitare questo problema, ti consigliamo di impostare una scadenza RPC più alta quando chiami taskqueue.Lease. Inoltre, devi eseguire il riavvolgimento quando una richiesta di leasing restituisce un elenco vuoto di attività. Se generi più di 10 richieste LeaseTasks per coda al secondo, solo le prime 10 restituiranno risultati. Se le richieste superano questo limite, viene restituito OK con zero risultati.

Monitoraggio delle attività nella console Google Cloud

Per visualizzare informazioni su tutte le attività e le code della tua applicazione:

  1. Apri la pagina Cloud Tasks nella console Google Cloud e cerca il valore Pull nella colonna Tipo.

    Vai a Cloud Tasks

  2. Fai clic sul nome della coda che ti interessa per aprire la pagina dei dettagli della coda. Vengono visualizzate tutte le attività nella coda selezionata.

Eliminazione delle attività

Una volta completata un'attività, il worker deve eliminarla dalla coda. Se noti che alcune attività rimangono in coda dopo che un worker ha completato l'elaborazione, è probabile che il worker non sia riuscito. In questo caso, le attività verranno elaborate da un altro worker.

Puoi eliminare un elenco di attività, ad esempio quello restituito da taskqueue.Lease, passandolo a taskqueue.DeleteMulti:

tasks, err = taskqueue.Lease(ctx, 100, "pull-queue", 3600)
// Perform some work with the tasks here

taskqueue.DeleteMulti(ctx, tasks, "pull-queue")