Airflow-Triggerprobleme beheben

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Diese Seite enthält Schritte zur Fehlerbehebung und Informationen zu häufigen Problemen mit dem Airflow-Trigger.

Vorgänge im Trigger blockieren

Asynchrone Aufgaben werden gelegentlich in Triggern blockiert. In den meisten Fällen sind die Probleme auf unzureichende Trigger-Ressourcen zurückzuführen. oder Probleme mit benutzerdefiniertem Code für asynchrone Operatoren.

Triggerprotokolle enthalten Warnmeldungen, anhand derer Sie die Ursachen für eine geringere Triggerleistung ermitteln können. Es gibt zwei nach wichtigen Warnungen suchen.

  1. Async-Thread blockiert

    Triggerer's async thread was blocked for 1.2 seconds, likely due to the highly utilized environment.
    

    Diese Warnung weist auf Leistungsprobleme hin, die auf eine große Anzahl asynchroner Aufgaben zurückzuführen sind.

    Lösung: So beheben Sie das Problem: den Triggern mehr Ressourcen zuweisen, die Anzahl der ausgesetzten Aufgaben zu reduzieren, die gleichzeitig ausgeführt werden, oder erhöhen Sie die Anzahl der Trigger in Ihrer Umgebung. Beachten Sie, dass die Auslöser zwar für verschiebbare Aufgaben zuständig sind, die einzelnen Aufgaben aber von den Workern gestartet und schließlich abgeschlossen werden. Wenn Sie die Anzahl der Trigger anpassen, sollten Sie auch die Anzahl der Worker-Instanzen skalieren.

  2. Eine bestimmte Aufgabe hat den asynchronen Thread blockiert.

    WARNING - Executing <Task finished coro=<TriggerRunner.run_trigger() done, defined at /opt/***/***/jobs/my-custom-code.py:609> result=None> took 0.401 second
    

    Diese Warnung bezieht sich auf einen bestimmten Operatorcode, der von Cloud Composer ausgeführt wird. Aus Gründen der Zuverlässigkeit sollten Trigger die asyncio-Bibliothek für die Ausführung von Vorgängen im Hintergrund verwenden. Eine benutzerdefinierte Implementierung eines Triggers nicht ordnungsgemäße Einhaltung von asyncio-Verträgen (z. B. aufgrund falscher Verwendung der Schlüsselwörter await und async im Python-Code).

    Lösung: Prüfen Sie den Code, der in der Warnung gemeldet wurde, und prüfen Sie, ob der asynchrone Vorgang richtig implementiert ist.

Zu viele Trigger

Die Anzahl der verschobenen Aufgaben ist im Messwert task_count sichtbar: wird auch im Monitoring-Dashboard Ihrer Umgebung angezeigt. Jeder Trigger erzeugt Ressourcen wie Verbindungen zu externen Ressourcen, die Speicherplatz verbrauchen.

Ausgesetzte Aufgaben, die im Monitoring-Dashboard angezeigt werden
Abbildung 1. Auf dem Monitoring-Dashboard angezeigte verzögerte Aufgaben (zum Vergrößern klicken)

Die Diagramme für Arbeitsspeicher- und CPU-Verbrauch zeigen, dass unzureichende Ressourcen zu Neustarts führen, weil der Liveness-Test aufgrund fehlender Heartbeats fehlschlägt:

Trigger wird aufgrund unzureichender Ressourcen neu gestartet
Abbildung 2. Trigger wird aufgrund unzureichender Ressourcen neu gestartet (zum Vergrößern klicken)

Lösung: So beheben Sie das Problem: den Triggern mehr Ressourcen zuweisen, die Anzahl der ausgesetzten Aufgaben zu reduzieren, die gleichzeitig ausgeführt werden, oder erhöhen Sie die Anzahl der Trigger in Ihrer Umgebung.

Absturz eines Airflow-Workers während der Callback-Ausführung

Nachdem der Trigger die Ausführung beendet hat, kehrt die Steuerung zu einem Airflow-Worker zurück, der eine Rückrufmethode mit einem Ausführungsslot ausführt. Diese Phase umfasst die von Celery Executor gesteuert werden, und daher die entsprechende Konfiguration und Es gelten Ressourcenlimits (z. B. parallelism oder worker_concurrency).

Wenn die Callback-Methode im Airflow-Worker fehlschlägt, der Worker fehlschlägt oder der Worker, der die Methode ausführt, neu gestartet wird, wird die Aufgabe als FAILED gekennzeichnet. In diesem Fall wird bei der Wiederholung der gesamte Vorgang, nicht nur die Rückrufmethode, noch einmal ausgeführt.

Endlosschleife in einem Trigger

Es ist möglich, einen benutzerdefinierten Triggeroperator so zu implementieren, dass er die Hauptschleife des Triggerers vollständig blockiert, sodass jeweils nur der fehlerhafte Trigger ausgeführt wird. In diesem Fall wird eine Warnung wird in den Trigger-Logs generiert, Der problematische Trigger wurde beendet.

Triggerklasse nicht gefunden

Da der DAGs-Ordner nicht mit dem Airflow-Triggerer synchronisiert wird, Der Inline-Triggercode fehlt, wenn der Trigger ausgeführt wird. Der Fehler lautet in den Logs der fehlgeschlagenen Aufgabe generiert:

ImportError: Module "PACKAGE_NAME" does not define a "CLASS_NAME" attribute/
class

Lösung: Importieren Sie den fehlenden Code aus PyPI.

Warnmeldung zum Trigger in der Airflow-Benutzeroberfläche

In einigen Fällen wird nach dem Deaktivieren des Auslösers möglicherweise die folgende Warnmeldung in der Airflow-Benutzeroberfläche angezeigt:

The triggerer does not appear to be running. Last heartbeat was received
4 hours ago. Triggers will not run, and any deferred operator will remain
deferred until it times out or fails.

Airflow kann diese Meldung anzeigen, weil unvollständige Trigger in der Airflow-Datenbank verbleiben. Diese Meldung bedeutet in der Regel, dass der Auslöser deaktiviert wurde, bevor alle Trigger in Ihrer Umgebung abgeschlossen wurden.

Sie können sich alle Trigger ansehen, die in der Umgebung ausgeführt werden. Rufen Sie dazu in der Airflow-Benutzeroberfläche die Seite Durchsuchen > Trigger auf. Sie benötigen dazu die Rolle Admin.

Lösungen:

Aufgaben bleiben im Status „Verschoben“, nachdem der Auslöser deaktiviert wurde

Wenn der Trigger deaktiviert ist, werden Aufgaben, die sich bereits im Status „verzögert“ befinden, bleiben in diesem Status, bis das Zeitlimit erreicht ist. Dieses Zeitlimit kann je nach Airflow- und DAG-Konfiguration unendlich sein.

Verwenden Sie eine der folgenden Lösungen:

  • Markieren Sie die Aufgaben manuell als fehlgeschlagen.
  • Aktivieren Sie den Auslöser, damit die Aufgaben ausgeführt werden.

Wir empfehlen, den Trigger nur zu deaktivieren, wenn in Ihrer Umgebung keine zurückgestellte Operatoren oder Aufgaben und alle ausgesetzten Aufgaben sind abgeschlossen.

Nächste Schritte