Pull-Aufgaben freigeben

Sobald Aufgaben sich in einer Pull-Warteschlange befinden, können sie von einem Worker geleast werden. Nachdem die Aufgaben verarbeitet wurden, müssen sie vom Worker gelöscht werden.

Vorbereitung

Wichtiger Kontext

  • Diese Methode gilt nur für Worker, die in einem Dienst in der Standardumgebung ausgeführt werden.
  • Wenn Sie Pull-Warteschlangen verwenden, müssen Sie die Worker je nach Verarbeitungsvolumen skalieren.

Aufgaben leasen

Sobald sich die Aufgaben in der Warteschlange befinden, hat ein Worker die Möglichkeit, für eine oder mehrere Aufgaben mit der Methode lease_tasks() die Freigabe anzufordern. Es kann zu einer kurzen Verzögerung kommen, bis Aufgaben, die erst kürzlich mit add() hinzugefügt wurden, über lease_tasks() verfügbar werden.

Sie geben die Anzahl der freizugebenden Aufgaben (bis zu 1.000 Aufgaben) und die Freigabedauer in Sekunden (bis zu einer Woche) an, um die Freigabe anzufordern. Die Lease-Dauer muss so lang sein, dass auch die langsamste Aufgabe vor Ablauf des Lease-Zeitraums abgeschlossen werden kann. Sie können eine Aufgabenfreigabe mit modify_task_lease() bearbeiten.

Sobald eine Aufgabe freigegeben wurde, ist sie nicht mehr für die Verarbeitung durch einen anderen Worker verfügbar und steht erst dann wieder zur Verfügung, wenn die Freigabe abgelaufen ist.

Die Methode lease_tasks() gibt ein Task-Objekt mit einer Liste der Aufgaben zurück, die aus der Warteschlange freigegeben wurden.

Im folgenden Codebeispiel werden 100 Aufgaben aus der Warteschlange pull-queue für eine Stunde freigegeben:

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)

Batches mit Aufgaben-Tags erstellen

Nicht alle Aufgaben sind identisch. Sie können Aufgaben im Code mit Tags kennzeichnen und die Freigabe von Aufgaben anhand dieser Tags anfordern. Das Tag dient als Filter.

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))

q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse

q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.

Abfrageraten regeln

Worker, die die Warteschlange nach Aufgaben zum Leasen abfragen, sollten erkennen, ob sie Aufgaben schneller freizugeben versuchen, als die Warteschlange sie bereitstellen kann. Wenn dieser Fehler auftritt, können folgende Ausnahmen von lease_tasks() generiert werden:

  • google.appengine.api.taskqueue.TransientError
  • google.appengine.runtime.apiproxy_errors.DeadlineExceededError


Der Code muss diese Ausnahmen abfangen, den Aufruf von lease_tasks() zurückstellen und es dann später noch einmal versuchen. Sie können ein höheres RPC-Zeitlimit beim Aufrufen von lease_tasks() festlegen, um dieses Problem zu verhindern. Stellen Sie die Aufrufe auch dann zurück, wenn bei einer Lease-Anfrage eine leere Aufgabenliste geliefert wird.

Wenn Sie mehr als 10 LeaseTasks-Anfragen pro Warteschlange und Sekunde generieren, liefern nur die ersten 10 Anfragen Ergebnisse. Wenn Anfragen diesen Grenzwert überschreiten, wird OK mit null Ergebnissen geliefert.

Aufgaben in der GCP Console überwachen

So rufen Sie Informationen zu allen Aufgaben und Warteschlangen in der Anwendung auf:

  1. Öffnen Sie die Seite Aufgabenwarteschlangen in der GCP Console und wählen Sie in der Menüleiste oben auf der Seite den Tab Pull-Warteschlangen aus.

    Tab Pull-Warteschlangen aufrufen

  2. Klicken Sie auf den Namen der gewünschten Warteschlange, um die zugehörige Detailseite zu öffnen. Hier werden alle Aufgaben angezeigt, die sich in der ausgewählten Warteschlange befinden.

Aufgaben löschen

Nachdem ein Worker eine Aufgabe durchgeführt hat, muss er die Aufgabe aus der Warteschlange löschen. Wenn Aufgaben in einer Warteschlange verbleiben, nachdem die Verarbeitung durch einen Worker abgeschlossen ist, ist der Worker-Vorgang wahrscheinlich fehlgeschlagen. In diesem Fall werden die Aufgaben von einem anderen Worker verarbeitet.

Sie können eine Aufgabenliste wie die von lease_task() zurückgegebene Liste löschen. Übergeben Sie sie dazu einfach an delete_tasks():

from google.appengine.api import taskqueue

q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)

End-to-End-Beispiel für Pull-Warteschlangen

Ein einfaches, aber vollständiges Beispiel für die Verwendung von Pull-Warteschlangen in Python finden Sie unter appengine-pullqueue-counter.

Hat Ihnen diese Seite weitergeholfen? Teilen Sie uns Ihr Feedback mit:

Feedback geben zu...

App Engine-Standardumgebung für Python 2