Pull-Aufgaben leasen

Sobald Aufgaben sich in einer Pull-Warteschlange befinden, können sie von einem Worker freigegeben werden. Nachdem die Aufgaben verarbeitet wurden, müssen sie vom Worker gelöscht werden.

Vorbereitung

Wichtiger Kontext

  • Diese Methode gilt nur für Worker, die in einem Dienst in der Standardumgebung ausgeführt werden.
  • Wenn Sie Pull-Warteschlangen verwenden, müssen Sie die Worker je nach Verarbeitungsvolumen skalieren.

Aufgaben leasen

Wenn sich die Aufgaben in der Warteschlange befinden, können eine oder mehrere Aufgaben mithilfe der Methode lease_tasks() von einem Worker freigegeben werden. Es kann zu einer kurzen Verzögerung kommen, bis Aufgaben, die erst kürzlich mit add() hinzugefügt wurden, über lease_tasks() verfügbar werden.

Sie geben die Anzahl der freizugebenden Aufgaben (bis zu 1.000 Aufgaben) und die Freigabedauer in Sekunden (bis zu einer Woche) an, um die Freigabe anzufordern. Die Freigabedauer muss so lang sein, dass auch die langsamste Aufgabe vor Ablauf des Freigabezeitraums abgeschlossen werden kann. Sie können eine Aufgabenfreigabe mit modify_task_lease() bearbeiten.

Sobald eine Aufgabe freigegeben wurde, ist sie nicht mehr für die Verarbeitung durch einen anderen Worker verfügbar und steht erst dann wieder zur Verfügung, wenn die Freigabe abgelaufen ist.

Die Methode lease_tasks() gibt ein Task-Objekt mit einer Liste der Aufgaben zurück, die aus der Warteschlange freigegeben wurden.

Im folgenden Beispielcode werden 100 Aufgaben aus der Warteschlange pull-queue für eine Stunde freigegeben:

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

Stapelanfragen mit Aufgaben-Tags

Nicht alle Aufgaben sind identisch. Sie können Aufgaben im Code mit Tags kennzeichnen und Aufgaben dann nach Tags freigeben. Das Tag dient als Filter.

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.

Abfrageraten regeln

Worker, die die Warteschlange nach Aufgaben zur Freigaben abfragen, sollten erkennen, ob sie Aufgaben schneller freizugeben versuchen, als die Warteschlange sie bereitstellen kann. Wenn dieser Fehler auftritt, können die folgenden Ausnahmen von lease_tasks() generiert werden:

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


Der Code muss diese Ausnahmen abfangen, den Aufruf von lease_tasks() zurückstellen und es dann später noch einmal versuchen. Legen Sie ein höheres RPC-Zeitlimit beim Aufrufen von leaseTasks() fest, um dieses Problem zu vermeiden. Stellen Sie die Aufrufe auch dann zurück, wenn bei einer Freigabeanfrage eine leere Aufgabenliste zurückgegeben wird.

Wenn Sie mehr als 10 LeaseTasks-Anfragen pro Sekunde generieren, geben nur die ersten 10 Anfragen Ergebnisse zurück. Wenn Anfragen diesen Grenzwert überschreiten, wird OK mit null Ergebnissen geliefert.

Aufgaben in der Google Cloud Console überwachen

So rufen Sie Informationen zu allen Aufgaben und Warteschlangen in der Anwendung auf:

  1. Öffnen Sie in der Google Cloud Console die Seite Cloud Tasks und suchen Sie in der Spalte Typ nach dem Wert Pull.

    Zu Cloud Tasks

  2. Klicken Sie auf den Namen der gewünschten Warteschlange, um die zugehörige Detailseite zu öffnen. Hier werden alle Aufgaben angezeigt, die sich in der ausgewählten Warteschlange befinden.

Aufgaben löschen

Nachdem ein Worker eine Aufgabe durchgeführt hat, muss er die Aufgabe aus der Warteschlange löschen. Wenn Aufgaben in einer Warteschlange verbleiben, nachdem die Verarbeitung durch einen Worker abgeschlossen ist, ist der Worker-Vorgang wahrscheinlich fehlgeschlagen. In diesem Fall werden die Aufgaben von einem anderen Worker verarbeitet.

Sie können eine Aufgabenliste wie die von lease_task() zurückgegebene Liste löschen. Übergeben Sie sie dazu einfach an delete_tasks():

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

End-to-End-Beispiel für Pull-Warteschlangen

Ein einfaches, aber vollständiges Beispiel für die Verwendung von Pull-Warteschlangen in Python finden Sie unter appengine-pullqueue-counter.