Pull-Aufgaben leasen

Sobald Aufgaben sich in einer Pull-Warteschlange befinden, können sie von einem Worker freigegeben werden. Nachdem die Aufgaben verarbeitet wurden, müssen sie vom Worker gelöscht werden.

Vorbereitung

Wichtiger Kontext

  • Diese Methode gilt nur für Worker, die in einem Dienst in der Standardumgebung ausgeführt werden.
  • Wenn Sie Pull-Warteschlangen verwenden, müssen Sie die Worker je nach Verarbeitungsvolumen skalieren.

Aufgaben leasen

Wenn sich die Aufgaben in der Warteschlange befinden, können eine oder mehrere Aufgaben mithilfe der Methode taskqueue.Lease von einem Worker freigegeben werden. Es kann zu einer kurzen Verzögerung kommen, bis Aufgaben, die erst kürzlich mit taskqueue.Add hinzugefügt wurden, über taskqueue.Lease verfügbar werden.

Sie geben die Anzahl der freizugebenden Aufgaben (bis zu 1.000 Aufgaben) und die Freigabedauer in Sekunden (bis zu einer Woche) an, um die Freigabe anzufordern. Die Freigabedauer muss so lang sein, dass auch die langsamste Aufgabe vor Ablauf des Freigabezeitraums abgeschlossen werden kann. Sie können eine Aufgabenfreigabe mit taskqueue.ModifyLease bearbeiten.

Sobald eine Aufgabe freigegeben wurde, ist sie nicht mehr für die Verarbeitung durch einen anderen Worker verfügbar und steht erst dann wieder zur Verfügung, wenn die Freigabe abgelaufen ist.

Im folgenden Beispielcode werden 100 Aufgaben aus der Warteschlange pull-queue für eine Stunde freigegeben:

tasks, err := taskqueue.Lease(ctx, 100, "pull-queue", 3600)

Stapelanfragen mit Aufgaben-Tags

Nicht alle Aufgaben sind identisch. Sie können Aufgaben im Code mit Tags kennzeichnen und Aufgaben dann nach Tags freigeben. Das Tag dient dann als Filter. Im folgenden Codebeispiel wird gezeigt, wie Sie Aufgaben kennzeichnen und dann nach Tags freigeben:

_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("parse"), Method: "PULL", Tag: "parse",
}, "pull-queue")
_, err = taskqueue.Add(ctx, &taskqueue.Task{
	Payload: []byte("render"), Method: "PULL", Tag: "render",
}, "pull-queue")

// leases render tasks, but not parse
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "render")

// Leases up to 100 tasks that have same tag.
// Tag is that of "oldest" task by ETA.
tasks, err = taskqueue.LeaseByTag(ctx, 100, "pull-queue", 3600, "")

Abfrageraten regulieren

Worker, die die Warteschlange nach Aufgaben zum Leasen abfragen, sollten erkennen, ob sie Aufgaben schneller freizugeben versuchen, als die Warteschlange sie bereitstellen kann. Wenn dieser Fehler auftritt, gibt taskqueue.Lease einen Backoff-Fehler zurück.

Ihr Code muss diese Fehler beheben, den Aufruf von taskqueue.Lease beenden und es dann später noch einmal versuchen. Wenn Sie dieses Problem vermeiden möchten, legen Sie beim Aufrufen von taskqueue.Lease ein höheres RPC-Zeitlimit fest. Stellen Sie die Aufrufe auch dann zurück, wenn bei einer Lease-Anfrage eine leere Aufgabenliste geliefert wird. Wenn Sie mehr als 10 LeaseTasks-Anfragen pro Warteschlange und Sekunde generieren, liefern nur die ersten 10 Anfragen Ergebnisse. Wenn Anfragen diesen Grenzwert überschreiten, wird OK mit null Ergebnissen geliefert.

Aufgaben in der Google Cloud Console überwachen

So rufen Sie Informationen zu allen Aufgaben und Warteschlangen in der Anwendung auf:

  1. Öffnen Sie in der Google Cloud Console die Seite Cloud Tasks und suchen Sie in der Spalte Typ nach dem Wert Pull.

    Zu Cloud Tasks

  2. Klicken Sie auf den Namen der gewünschten Warteschlange, um die zugehörige Detailseite zu öffnen. Hier werden alle Aufgaben angezeigt, die sich in der ausgewählten Warteschlange befinden.

Aufgaben löschen

Nachdem ein Worker eine Aufgabe durchgeführt hat, muss er die Aufgabe aus der Warteschlange löschen. Wenn Aufgaben in einer Warteschlange verbleiben, nachdem die Verarbeitung durch einen Worker abgeschlossen ist, ist der Worker-Vorgang wahrscheinlich fehlgeschlagen. In diesem Fall werden die Aufgaben von einem anderen Worker verarbeitet.

Sie können eine Aufgabenliste wie die von taskqueue.Lease zurückgegebene löschen. Übergeben Sie sie dazu an taskqueue.DeleteMulti:

tasks, err = taskqueue.Lease(ctx, 100, "pull-queue", 3600)
// Perform some work with the tasks here

taskqueue.DeleteMulti(ctx, tasks, "pull-queue")