Airflow-Triggerprobleme beheben

Cloud Composer 1 Cloud Composer 2

Diese Seite enthält Schritte zur Fehlerbehebung und Informationen für häufige Probleme mit dem Airflow-Trigger.

Vorgänge im Trigger blockieren

Asynchrone Aufgaben können gelegentlich in Triggern blockiert werden. In den meisten Fällen sind die Probleme auf unzureichende Trigger-Ressourcen oder Probleme mit benutzerdefiniertem asynchronem Operatorcode zurückzuführen.

Triggerlogs zeigen alle Warnmeldungen an, mit denen Sie die Ursachen für eine verminderte Triggerleistung ermitteln können. Es gibt zwei wichtige Warnungen.

  1. Asynchroner Thread blockiert

    Triggerer's async thread was blocked for 1.2 seconds, likely due to the highly utilized environment.
    

    Diese Warnung weist auf Leistungsprobleme aufgrund einer großen Anzahl asynchroner Aufgaben hin.

    Lösung: Zur Behebung dieses Problems weisen Sie den Triggern mehr Ressourcen zu, reduzieren die Anzahl der zurückgestellten Aufgaben, die gleichzeitig ausgeführt werden, oder erhöhen die Anzahl der Trigger in Ihrer Umgebung. Beachten Sie, dass die Trigger zwar Trigger zurückstellende Aufgaben ausführen, aber dafür verantwortlich sind, die einzelnen Aufgaben zu starten und schließlich abzuschließen. Wenn Sie die Anzahl der Trigger anpassen, sollten Sie auch die Anzahl der Worker-Instanzen skalieren.

  2. Der asynchrone Thread wurde durch eine bestimmte Aufgabe blockiert.

    WARNING - Executing <Task finished coro=<TriggerRunner.run_trigger() done, defined at /opt/***/***/jobs/my-custom-code.py:609> result=None> took 0.401 second
    

    Diese Warnung verweist auf einen bestimmten Operatorcode, der von Cloud Composer ausgeführt wird. Trigger sollten grundsätzlich auf die Bibliothek asyncio zurückgreifen, um Vorgänge im Hintergrund auszuführen. Eine benutzerdefinierte Implementierung eines Triggers kann asyncio-Verträge nicht richtig einhalten (z. B. aufgrund einer falschen Verwendung der Schlüsselwörter await und async im Python-Code).

    Lösung: Prüfen Sie den in der Warnung gemeldeten Code und prüfen Sie, ob der asynchrone Vorgang ordnungsgemäß implementiert wurde.

Zu viele Trigger

Die Anzahl der zurückgestellten Aufgaben ist im Messwert task_count sichtbar, der auch im Monitoring-Dashboard Ihrer Umgebung angezeigt wird. Jeder Trigger erstellt einige Ressourcen, z. B. Verbindungen zu externen Ressourcen, die Arbeitsspeicher belegen.

Ausgesetzte Aufgaben, die auf dem Monitoring-Dashboard angezeigt werden
Abbildung 1. Zurückgestellte Aufgaben, die auf dem Monitoring-Dashboard angezeigt werden (zum Vergrößern klicken)

Grafiken zum Arbeitsspeicher- und CPU-Verbrauch zeigen, dass unzureichende Ressourcen Neustarts verursachen, da die Aktivitätsprüfung aufgrund fehlender Heartbeats fehlschlägt:

Trigger wird aufgrund unzureichender Ressourcen neu gestartet
Abbildung 2. Trigger wird aufgrund unzureichender Ressourcen neu gestartet (zum Vergrößern klicken)

Lösung: Zur Behebung dieses Problems weisen Sie den Triggern mehr Ressourcen zu, reduzieren die Anzahl der zurückgestellten Aufgaben, die gleichzeitig ausgeführt werden, oder erhöhen die Anzahl der Trigger in Ihrer Umgebung.

Absturz eines Airflow-Workers während der Callback-Ausführung

Nachdem der Trigger die Ausführung beendet hat, kehrt die Steuerung an einen Airflow-Worker zurück, der eine Callback-Methode in einem Ausführungsslot ausführt. Diese Phase wird von Celery Executor gesteuert. Daher gelten die entsprechenden Konfigurations- und Ressourcenlimits (z. B. parallelism oder worker_concurrency).

Wenn die Callback-Methode im Airflow-Worker fehlschlägt, der Worker fehlschlägt oder der Worker, der die Methode ausführt, neu gestartet wird, wird die Aufgabe als FAILED gekennzeichnet. In diesem Fall wird beim Wiederholungsvorgang die gesamte Aufgabe noch einmal ausgeführt, nicht nur die Callback-Methode.

Endlosschleife in einem Trigger

Es ist möglich, einen benutzerdefinierten Trigger-Operator so zu implementieren, dass er die Haupt-Triggerer-Schleife vollständig blockiert, sodass jeweils nur der eine fehlerhafte Trigger ausgeführt wird. In diesem Fall wird in den Triggerlogs eine Warnung generiert, nachdem der problematische Trigger abgeschlossen ist.

Triggerklasse nicht gefunden

Da der DAGs-Ordner nicht mit dem Airflow-Triggerer synchronisiert wird, fehlt der Inline-Triggercode, wenn der Trigger ausgeführt wird. Der Fehler wird in den Logs der fehlgeschlagenen Aufgabe generiert:

ImportError: Module "PACKAGE_NAME" does not define a "CLASS_NAME" attribute/
class

Lösung: Importieren Sie den fehlenden Code aus PyPI.

Warnmeldung zum Trigger in der Airflow-UI

In einigen Fällen wird nach dem Deaktivieren des Triggers möglicherweise die folgende Warnmeldung in der Airflow-UI angezeigt:

The triggerer does not appear to be running. Last heartbeat was received
4 hours ago. Triggers will not run, and any deferred operator will remain
deferred until it times out or fails.

Airflow kann diese Meldung anzeigen, da unvollständige Trigger in der Airflow-Datenbank verbleiben. Diese Meldung bedeutet in der Regel, dass der Trigger deaktiviert wurde, bevor alle Trigger in Ihrer Umgebung abgeschlossen wurden.

Sie können alle in der Umgebung ausgeführten Trigger auf der Seite Durchsuchen > Trigger der Airflow-UI anzeigen lassen (die Rolle Admin ist erforderlich).

Lösungen:

Aufgaben verbleiben im Status „Ausgesetzt“, nachdem der Trigger deaktiviert wurde

Wenn der Trigger deaktiviert ist, verbleiben Aufgaben, die sich bereits im Status „Ausgesetzt“ befinden, in diesem Status, bis das Zeitlimit erreicht ist. Dieses Zeitlimit kann je nach Airflow- und DAG-Konfiguration unendlich sein.

Verwenden Sie eine der folgenden Lösungen:

  • Aufgaben manuell als fehlgeschlagen markieren.
  • Aktivieren Sie den Trigger, um die Aufgaben abzuschließen.

Wir empfehlen, den Trigger nur zu deaktivieren, wenn in Ihrer Umgebung keine verzögerten Operatoren oder Aufgaben ausgeführt werden und alle verzögerten Aufgaben abgeschlossen sind.

Nächste Schritte