Sobald Aufgaben sich in einer Pull-Warteschlange befinden, können sie von einem Worker freigegeben werden. Nachdem die Aufgaben verarbeitet wurden, müssen sie vom Worker gelöscht werden.
Hinweise
- Eine Pull-Warteschlange erstellen
- Aufgaben erstellen und der Pull-Warteschlange hinzufügen
Wichtiger Kontext
- Diese Methode gilt nur für Worker, die in einem Dienst in der Standardumgebung ausgeführt werden.
- Wenn Sie Pull-Warteschlangen verwenden, müssen Sie die Worker je nach Verarbeitungsvolumen skalieren.
Aufgaben leasen
Wenn sich die Aufgaben in der Warteschlange befinden, können eine oder mehrere Aufgaben mithilfe der Methode lease_tasks()
von einem Worker freigegeben werden. Es kann zu einer kurzen Verzögerung kommen, bis Aufgaben, die erst kürzlich mit add()
hinzugefügt wurden, über lease_tasks()
verfügbar werden.
Sie geben die Anzahl der freizugebenden Aufgaben (bis zu 1.000 Aufgaben) und die Freigabedauer in Sekunden (bis zu einer Woche) an, um die Freigabe anzufordern. Die Freigabedauer muss so lang sein, dass auch die langsamste Aufgabe vor Ablauf des Freigabezeitraums abgeschlossen werden kann. Sie können eine Aufgabenfreigabe mit modify_task_lease()
bearbeiten.
Sobald eine Aufgabe freigegeben wurde, ist sie nicht mehr für die Verarbeitung durch einen anderen Worker verfügbar und steht erst dann wieder zur Verfügung, wenn die Freigabe abgelaufen ist.
Die Methode lease_tasks()
gibt ein Task-Objekt mit einer Liste der Aufgaben zurück, die aus der Warteschlange freigegeben wurden.
pull-queue
für eine Stunde freigegeben:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.lease_tasks(3600, 100)
Stapelanfragen mit Aufgaben-Tags
Nicht alle Aufgaben sind identisch. Sie können Aufgaben im Code mit Tags kennzeichnen und Aufgaben dann nach Tags freigeben. Das Tag dient als Filter.
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
q.add(taskqueue.Task(payload='parse1', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='parse2', method='PULL', tag='parse'))
q.add(taskqueue.Task(payload='render1', method='PULL', tag='render'))
q.add(taskqueue.Task(payload='render2', method='PULL', tag='render'))
q.lease_tasks_by_tag(3600, 100, 'render') # leases render tasks, but not parse
q.lease_tasks_by_tag(3600, 100) # Leases up to 100 tasks that have same tag.
Abfrageraten regeln
Worker, die die Warteschlange nach Aufgaben zur Freigaben abfragen, sollten erkennen, ob sie Aufgaben schneller freizugeben versuchen, als die Warteschlange sie bereitstellen kann. Wenn dieser Fehler auftritt, können die folgenden Ausnahmen von lease_tasks()
generiert werden:
google.appengine.api.taskqueue.TransientError
google.appengine.runtime.apiproxy_errors.DeadlineExceededError
Der Code muss diese Ausnahmen abfangen, den Aufruf von lease_tasks()
zurückstellen und es dann später noch einmal versuchen. Legen Sie ein höheres RPC-Zeitlimit beim Aufrufen von lease_tasks() fest, um dieses Problem zu vermeiden. Stellen Sie die Aufrufe auch dann zurück, wenn bei einer Freigabeanfrage eine leere Aufgabenliste zurückgegeben wird.
Wenn Sie mehr als 10 LeaseTasks-Anfragen pro Sekunde generieren, geben nur die ersten 10 Anfragen Ergebnisse zurück. Wenn Anfragen diesen Grenzwert überschreiten, wird OK
mit null Ergebnissen geliefert.
Aufgaben in der Google Cloud Console überwachen
So rufen Sie Informationen zu allen Aufgaben und Warteschlangen in der Anwendung auf:
Öffnen Sie in der Google Cloud Console die Seite Cloud Tasks und suchen Sie in der Spalte Typ nach dem Wert Pull.
Klicken Sie auf den Namen der gewünschten Warteschlange, um die zugehörige Detailseite zu öffnen. Hier werden alle Aufgaben angezeigt, die sich in der ausgewählten Warteschlange befinden.
Aufgaben löschen
Nachdem ein Worker eine Aufgabe durchgeführt hat, muss er die Aufgabe aus der Warteschlange löschen. Wenn Aufgaben in einer Warteschlange verbleiben, nachdem die Verarbeitung durch einen Worker abgeschlossen ist, ist der Worker-Vorgang wahrscheinlich fehlgeschlagen. In diesem Fall werden die Aufgaben von einem anderen Worker verarbeitet.
Sie können eine Aufgabenliste wie die von lease_task()
zurückgegebene Liste löschen. Übergeben Sie sie dazu einfach an delete_tasks()
:
from google.appengine.api import taskqueue
q = taskqueue.Queue('pull-queue')
tasks = q.lease_tasks(3600, 100)
# Perform some work with the tasks here
q.delete_tasks(tasks)
End-to-End-Beispiel für Pull-Warteschlangen
Ein einfaches, aber vollständiges Beispiel für die Verwendung von Pull-Warteschlangen in Python finden Sie unter appengine-pullqueue-counter.