Pull-Aufgaben leasen

Sobald Aufgaben sich in einer Pull-Warteschlange befinden, können sie von einem Worker freigegeben werden. Nachdem die Aufgaben verarbeitet wurden, müssen sie vom Worker gelöscht werden.

Hinweise

Wichtiger Kontext

  • Diese Methode gilt nur für Worker, die in einem Dienst in der Standardumgebung ausgeführt werden.
  • Wenn Sie Pull-Warteschlangen verwenden, müssen Sie die Worker je nach Verarbeitungsvolumen skalieren.

Aufgaben leasen

Wenn sich die Aufgaben in der Warteschlange befinden, können eine oder mehrere Aufgaben mithilfe der Methode leaseTasks() von einem Worker freigegeben werden. Es kann zu einer kurzen Verzögerung kommen, bis Aufgaben, die erst kürzlich mit add() hinzugefügt wurden, über leaseTasks() verfügbar werden.

Sie geben die Anzahl der freizugebenden Aufgaben (bis zu 1.000 Aufgaben) und die Freigabedauer in Sekunden (bis zu einer Woche) an, um die Freigabe anzufordern. Die Freigabedauer muss so lang sein, dass auch die langsamste Aufgabe vor Ablauf des Freigabezeitraums abgeschlossen werden kann. Sie können eine Aufgabenfreigabe mit modifyTaskLease() bearbeiten.

Sobald eine Aufgabe freigegeben wurde, ist sie nicht mehr für die Verarbeitung durch einen anderen Worker verfügbar und steht erst dann wieder zur Verfügung, wenn die Freigabe abgelaufen ist.

Im folgenden Beispielcode werden Aufgaben aus der Warteschlange pull-queue für eine Stunde freigegeben:

List<TaskHandle> tasks = q.leaseTasks(3600, TimeUnit.SECONDS, numberOfTasksToLease);

Stapelanfragen mit Aufgaben-Tags

Nicht alle Aufgaben sind identisch. Sie können Aufgaben im Code mit Tags kennzeichnen und Aufgaben dann nach Tags freigeben. Das Tag dient dann als Filter.

q.add(
    TaskOptions.Builder.withMethod(TaskOptions.Method.PULL)
        .payload(content.toString())
        .tag("process".getBytes()));

Geben Sie die gefilterten Aufgaben dann frei:

// Lease only tasks tagged with "process"
List<TaskHandle> tasks =
    q.leaseTasksByTag(3600, TimeUnit.SECONDS, numberOfTasksToLease, "process");
// You can also specify a tag to lease via LeaseOptions passed to leaseTasks.

Abfrageraten regeln

Worker, die die Warteschlange nach Aufgaben zur Freigaben abfragen, sollten erkennen, ob sie Aufgaben schneller freizugeben versuchen, als die Warteschlange sie bereitstellen kann. Wenn dieser Fehler auftritt, können die folgenden Ausnahmen von leaseTasks() generiert werden:


Der Code muss diese Ausnahmen abfangen, den Aufruf von leaseTasks() zurückstellen und es dann später noch einmal versuchen. Legen Sie ein höheres RPC-Zeitlimit beim Aufrufen von leaseTasks() fest, um dieses Problem zu vermeiden. Stellen Sie die Aufrufe auch dann zurück, wenn bei einer Freigabeanfrage eine leere Aufgabenliste zurückgegeben wird.

Wenn Sie mehr als 10 LeaseTasks-Anfragen pro Warteschlange und Sekunde generieren, liefern nur die ersten 10 Anfragen Ergebnisse. Wenn Anfragen diesen Grenzwert überschreiten, wird OK mit null Ergebnissen geliefert.

Aufgaben in der Google Cloud Console überwachen

So rufen Sie Informationen zu allen Aufgaben und Warteschlangen in der Anwendung auf:

  1. Öffnen Sie in der Google Cloud Console die Seite Cloud Tasks und suchen Sie in der Spalte Typ nach dem Wert Pull.

    Zu Cloud Tasks

  2. Klicken Sie auf den Namen der gewünschten Warteschlange, um die zugehörige Detailseite zu öffnen. Hier werden alle Aufgaben angezeigt, die sich in der ausgewählten Warteschlange befinden.

Aufgaben löschen

Nachdem ein Worker eine Aufgabe durchgeführt hat, muss er die Aufgabe aus der Warteschlange löschen. Wenn Aufgaben in einer Warteschlange verbleiben, nachdem die Verarbeitung durch einen Worker abgeschlossen ist, ist der Worker-Vorgang wahrscheinlich fehlgeschlagen. In diesem Fall werden die Aufgaben von einem anderen Worker verarbeitet.

Mit deleteTask() können Sie eine einzelne Aufgabe oder eine Liste von Aufgaben löschen. Sie müssen den Namen der Aufgabe kennen, um sie zu löschen. Sie finden die Aufgabennamen im Task object, das von leaseTasks() zurückgegeben wird.

Im folgenden Codebeispiel wird gezeigt, wie eine Aufgabe aus einer Warteschlange gelöscht wird:

q.deleteTask(task);