Fehlerbehebung bei DAGs

Cloud Composer 1 Cloud Composer 2

Diese Seite enthält Schritte zur Fehlerbehebung und Informationen zu häufigen Workflowproblemen.

Viele Probleme bei der DAG-Ausführung werden durch eine nicht optimale Umgebungsleistung verursacht. Weitere Informationen zum Optimieren Ihrer Cloud Composer 2-Umgebung finden Sie im Leitfaden Umgebungsleistung und -kosten optimieren.

Einige Probleme bei der DAG-Ausführung werden möglicherweise dadurch verursacht, dass der Airflow-Planer nicht richtig oder nicht optimal funktioniert. Folgen Sie der Anleitung zur Fehlerbehebung in Scheduler, um diese Probleme zu beheben.

Fehlerbehebung beim Workflow

So beginnen Sie mit der Fehlerbehebung:

  1. Prüfen Sie die Airflow-Logs.

    Sie können die Logging-Ebene von Airflow erhöhen, indem Sie die folgende Airflow-Konfigurationsoption überschreiben.

    Airflow 2

    Bereich Schlüssel Wert
    logging logging_level Der Standardwert ist INFO. Legen Sie diese Option auf DEBUG fest, um eine höhere Ausführlichkeit in Logeinträgen zu erhalten.

    Airflow 1

    Bereich Schlüssel Wert
    core logging_level Der Standardwert ist INFO. Legen Sie diese Option auf DEBUG fest, um eine höhere Ausführlichkeit in Logeinträgen zu erhalten.
  2. Sehen Sie sich das Monitoring-Dashboard an.

  3. Prüfen Sie Cloud Monitoring.

  4. Suchen Sie in der Google Cloud Console auf den Seiten für die Komponenten Ihrer Umgebung nach Fehlern.

  5. Suchen Sie auf der Airflow-Weboberfläche in der Grafikansicht des DAG nach fehlgeschlagenen Aufgabeninstanzen.

    Bereich Schlüssel Wert
    webserver dag_orientation LR, TB, RL oder BT

Fehlerbehebung bei Operatorfehlern

So beheben Sie Operatorfehler:

  1. Suchen Sie nach aufgabenspezifischen Fehlern.
  2. Prüfen Sie die Airflow-Logs.
  3. Prüfen Sie Cloud Monitoring.
  4. Prüfen Sie Operatorspezifische Logs.
  5. Beheben Sie die Fehler.
  6. Laden Sie den DAG in den Ordner dags/ hoch.
  7. Löschen Sie in der Airflow-Weboberfläche die vergangenen Status für den DAG.
  8. Setzen Sie den DAG fort oder führen Sie ihn aus.

Fehlerbehebung bei der Aufgabenausführung

Airflow ist ein verteiltes System mit vielen Entitäten wie Planer, Executor und Workern, die über eine Aufgabenwarteschlange und die Airflow-Datenbank miteinander kommunizieren und Signale (wie SIGTERM) senden. Das folgende Diagramm bietet einen Überblick über die Verbindungen zwischen Airflow-Komponenten.

Interaktion zwischen Airflow-Komponenten
Abbildung 1. Interaktion zwischen Airflow-Komponenten (zum Vergrößern klicken)

In einem verteilten System wie Airflow können Probleme mit der Netzwerkverbindung vorliegen oder in der zugrunde liegenden Infrastruktur zeitweise Probleme auftreten. Dies kann zu Situationen führen, in denen Aufgaben fehlschlagen und zur Ausführung neu geplant werden oder Aufgaben nicht erfolgreich abgeschlossen werden (z. B. Zombieaufgaben oder Aufgaben, die bei der Ausführung hängen geblieben sind). Airflow verfügt über Mechanismen, um mit solchen Situationen umzugehen und die normale Funktion automatisch wiederherzustellen. In den folgenden Abschnitten werden häufige Probleme bei der Ausführung von Aufgaben mit Airflow erläutert: Zombieaufgaben, Poison Pills und SIGTERM-Signale.

Fehlerbehebung bei Zombieaufgaben

Airflow erkennt zwei Arten von Abweichungen zwischen einer Aufgabe und einem Prozess, der die Aufgabe ausführt:

  • Zombie-Tasks sind Aufgaben, die ausgeführt werden sollten, aber nicht ausgeführt werden. Dies kann vorkommen, wenn der Prozess der Aufgabe beendet wurde oder nicht reagiert, wenn der Airflow-Worker einen Aufgabenstatus nicht rechtzeitig gemeldet hat, weil er überlastet ist, oder wenn die VM, auf der die Aufgabe ausgeführt wird, heruntergefahren wurde. Airflow sucht diese Aufgaben regelmäßig und schlägt je nach den Einstellungen der Aufgabe entweder fehl oder wiederholt sie.

  • Nicht ausgefallene Tasks sind Aufgaben, die nicht ausgeführt werden sollten. Airflow findet diese Aufgaben regelmäßig und beendet sie.

Der häufigste Grund für Zombieaufgaben ist ein Mangel an CPU- und Arbeitsspeicherressourcen im Cluster Ihrer Umgebung. Dies kann dazu führen, dass ein Airflow-Worker den Status einer Aufgabe nicht melden kann oder ein Sensor abrupt unterbrochen wird. In diesem Fall markiert der Planer eine bestimmte Aufgabe als Zombieaufgabe. Weisen Sie Ihrer Umgebung mehr Ressourcen zu, um Zombieaufgaben zu vermeiden.

Weitere Informationen zum Skalieren der Cloud Composer-Umgebung finden Sie im Leitfaden zur Skalierungsumgebung. Wenn Zombieaufgaben auftreten, ist eine mögliche Lösung, das Zeitlimit für Zombieaufgaben zu erhöhen. Daher wartet der Planer länger, bevor er eine Aufgabe als Zombie behandelt. Auf diese Weise hat ein Airflow-Worker mehr Zeit, den Status der Aufgabe zu melden.

Um das Zeitlimit für Zombie-Aufgaben zu erhöhen, überschreiben Sie den Wert der Airflow-Konfigurationsoption [scheduler]scheduler_zombie_task_threshold:

Bereich Schlüssel Wert Notes
scheduler scheduler_zombie_task_threshold Neues Zeitlimit (in Sekunden) Der Standardwert ist 300.

Fehlerbehebung bei Giftpille

Poison Pill ist ein Mechanismus, mit dem Airflow Airflow-Aufgaben herunterfährt.

Airflow verwendet Poison Pill in folgenden Situationen:

  • Wenn ein Planer eine Aufgabe beendet, die nicht rechtzeitig abgeschlossen wurde.
  • Wenn eine Aufgabe das Zeitlimit überschreitet oder zu lange ausgeführt wird.

Wenn Airflow Poison Pill verwendet, werden in den Logs eines Airflow-Workers, der die Task ausgeführt hat, die folgenden Logeinträge angezeigt:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

Mögliche Lösungen:

  • Prüfen Sie den Taskcode auf Fehler, die dazu führen könnten, dass er zu lange ausgeführt wird.
  • (Cloud Composer 2) Erhöhen Sie die CPU und den Arbeitsspeicher für Airflow-Worker, damit Aufgaben schneller ausgeführt werden.
  • Erhöhen Sie den Wert der Airflow-Konfigurationsoption [celery_broker_transport_options]visibility-timeout.

    Daher wartet der Planer länger auf die Fertigstellung einer Aufgabe, bevor er die Aufgabe als Zombieaufgabe betrachtet. Diese Option ist besonders nützlich für zeitaufwändige Aufgaben, die viele Stunden dauern. Wenn der Wert zu niedrig ist (z. B. 3 Stunden), betrachtet der Planer Aufgaben, die 5 oder 6 Stunden lang ausgeführt werden, als „aufgehängt“ (Zombie-Aufgaben).

  • Erhöhen Sie den Wert der Airflow-Konfigurationsoption [core]killed_task_cleanup_time.

    Ein längerer Wert bietet Airflow-Workern mehr Zeit, ihre Aufgaben ordnungsgemäß abzuschließen. Wenn der Wert zu niedrig ist, werden Airflow-Aufgaben möglicherweise abrupt unterbrochen, ohne genügend Zeit, um ihre Arbeit ordnungsgemäß abzuschließen.

Fehlerbehebung bei SIGTERM-Signalen

SIGTERM-Signale werden von Linux, Kubernetes, Airflow-Planer und Celery verwendet, um Prozesse zu beenden, die für die Ausführung von Airflow-Workern oder Airflow-Aufgaben verantwortlich sind.

Es kann mehrere Gründe dafür geben, dass SIGTERM-Signale in einer Umgebung gesendet werden:

  • Eine Aufgabe wurde zu einer Zombieaufgabe und muss beendet werden.

  • Der Planer hat ein Duplikat einer Aufgabe erkannt und sendet Poison Pill- und SIGTERM-Signale an die Aufgabe, um sie zu stoppen.

  • Beim horizontalen Pod-Autoscaling sendet die GKE-Steuerungsebene SIGTERM-Signale, um nicht mehr benötigte Pods zu entfernen.

  • Der Planer kann SIGTERM-Signale an den DagFileProcessorManager-Prozess senden. Solche SIGTERM-Signale werden vom Planer zur Verwaltung des DagFileProcessorManager-Prozesslebenszyklus verwendet und können ignoriert werden.

    Beispiel:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • Race-Bedingung zwischen dem Heartbeat-Callback und den Exit-Callbacks im „local_task_job“, der die Ausführung der Aufgabe überwacht. Wenn der Heartbeat erkennt, dass eine Aufgabe als erfolgreich markiert wurde, kann nicht unterschieden werden, ob die Aufgabe selbst erfolgreich war oder dass Airflow angewiesen wurde, die Aufgabe als erfolgreich zu betrachten. Trotzdem wird ein Aufgaben-Runner beendet, ohne auf das Beenden zu warten.

    Solche SIGTERM-Signale können ignoriert werden. Die Aufgabe ist bereits erfolgreich und die Ausführung der DAG-Ausführung als Ganzes ist nicht betroffen.

    Der Logeintrag Received SIGTERM. ist der einzige Unterschied zwischen dem regulären Exit und dem Beenden der Aufgabe im erfolgreichen Status.

    Race-Bedingung zwischen Heartbeat und Exit-Callbacks
    Abbildung 2. Race-Bedingung zwischen Heartbeat und Exit-Callbacks (zum Vergrößern klicken)
  • Eine Airflow-Komponente verwendet mehr Ressourcen (CPU, Arbeitsspeicher) als vom Clusterknoten zulässig.

  • Der GKE-Dienst führt Wartungsvorgänge aus und sendet SIGTERM-Signale an Pods, die auf einem Knoten ausgeführt werden, der demnächst aktualisiert wird. Wenn eine Taskinstanz mit SIGTERM beendet wird, werden in den Logs eines Airflow-Workers, der die Task ausgeführt hat, die folgenden Logeinträge angezeigt:

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

Mögliche Lösungen:

Dieses Problem tritt auf, wenn eine VM, auf der die Aufgabe ausgeführt wird, nicht mehr genügend Arbeitsspeicher hat. Dies hat nichts mit den Airflow-Konfigurationen zu tun, sondern auf die Menge des für die VM verfügbaren Arbeitsspeichers.

Das Vergrößern des Arbeitsspeichers hängt von der verwendeten Cloud Composer-Version ab. Beispiel:

  • In Cloud Composer 2 können Sie Airflow-Workern mehr CPU- und Arbeitsspeicherressourcen zuweisen.

  • Bei Cloud Composer 1 können Sie Ihre Umgebung mit einem Maschinentyp mit höherer Leistung neu erstellen.

  • In beiden Versionen von Cloud Composer können Sie den Wert der Airflow-Konfigurationsoption [celery]worker_concurrency für die Nebenläufigkeit senken. Diese Option bestimmt, wie viele Tasks von einem bestimmten Airflow-Worker gleichzeitig ausgeführt werden.

Weitere Informationen zur Optimierung Ihrer Cloud Composer 2-Umgebung finden Sie unter Umgebungsleistung und -kosten optimieren.

Cloud Logging-Abfragen zum Ermitteln von Gründen für Pod-Neustarts oder -Entfernungen

In den Cloud Composer-Umgebungen werden GKE-Cluster als Computing-Infrastrukturebene verwendet. In diesem Abschnitt finden Sie nützliche Abfragen, mit denen Sie Gründe für Neustarts oder Bereinigungen von Airflow-Worker- oder Airflow-Planern ermitteln können.

Die unten dargestellten Abfragen können so abgestimmt werden:

  • Sie können in Cloud Logging einen für Sie interessanten Zeitraum angeben, z. B. die letzten 6 Stunden oder 3 Tage, oder einen benutzerdefinierten Zeitraum definieren.

  • sollten Sie die CLUSTER_NAME von Cloud Composer angeben

  • Sie können die Suche auch auf einen bestimmten Pod beschränken. Fügen Sie dazu POD_NAME hinzu.

Neu gestartete Container ermitteln

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

Alternative Abfrage, um die Ergebnisse auf einen bestimmten Pod zu beschränken:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

Erkennen, dass Container aufgrund eines Ereignisses mit unzureichendem Arbeitsspeicher heruntergefahren werden

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

Alternative Abfrage, um die Ergebnisse auf einen bestimmten Pod zu beschränken:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Container ermitteln, die nicht mehr ausgeführt wurden

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

Alternative Abfrage, um die Ergebnisse auf einen bestimmten Pod zu beschränken:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

Auswirkungen von Aktualisierungs- oder Upgradevorgängen auf die Ausführung von Airflow-Aufgaben

Aktualisierungs- oder Upgradevorgänge unterbrechen aktuell ausgeführte Airflow-Aufgaben, es sei denn, eine Aufgabe wird im Modus „Aussetzen“ ausgeführt.

Wir empfehlen, diese Vorgänge auszuführen, wenn eine minimale Auswirkung auf die Ausführung von Airflow-Aufgaben zu erwarten ist, und geeignete Wiederholungsmechanismen in Ihren DAGs und Aufgaben einzurichten.

Allgemeine Probleme

In den folgenden Abschnitten werden Symptome und mögliche Lösungen für einige häufig auftretende DAG-Probleme beschrieben.

Airflow-Aufgabe wurde durch Negsignal.SIGKILL unterbrochen

Manchmal belegt Ihre Aufgabe mehr Arbeitsspeicher, als dem Airflow-Worker zugewiesen ist. In diesem Fall kann er durch Negsignal.SIGKILL unterbrochen werden. Das System sendet dieses Signal, um eine weitere Speichernutzung zu vermeiden, die sich auf die Ausführung anderer Airflow-Aufgaben auswirken könnte. Im Log des Airflow-Workers wird möglicherweise der folgende Logeintrag angezeigt:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL kann auch als Code -9 angezeigt werden.

Mögliche Lösungen:

  • Unterer worker_concurrency der Airflow-Worker.

  • Erhöhen Sie bei Cloud Composer 2 den Arbeitsspeicher der Airflow-Worker.

  • Führen Sie bei Cloud Composer 1 ein Upgrade auf einen größeren Maschinentyp durch, der im Cloud Composer-Cluster verwendet wird.

  • Optimieren Sie Ihre Aufgaben, um weniger Arbeitsspeicher zu verbrauchen.

  • Verwalten Sie ressourcenintensive Aufgaben in Cloud Composer mit KubernetesPodOperator oder GKEStartPodOperator für die Aufgabenisolierung und die benutzerdefinierte Ressourcenzuweisung.

Aufgabe schlägt aufgrund von DAG-Parsing-Fehlern fehl, ohne Protokolle auszugeben

Manchmal können subtile DAG-Fehler auftreten, die zu einer Situation führen, in der ein Airflow-Planer und ein DAG-Prozessor Aufgaben zur Ausführung planen und jeweils eine DAG-Datei parsen können. Der Airflow-Worker kann jedoch Aufgaben aus einem solchen DAG nicht ausführen, da Programmierfehler in der Python-DAG-Datei vorliegen. Dies kann zu einer Situation führen, in der eine Airflow-Aufgabe als Failed gekennzeichnet ist und für die Ausführung kein Log vorhanden ist.

Lösungen:

  • Prüfen Sie in den Airflow-Worker-Logs, ob vom Airflow-Worker Fehler aufgrund von Fehlern beim fehlenden DAG oder DAG-Parsing ausgegeben wurden.

  • Anzahl der Parameter für das DAG-Parsing erhöhen:

    • Erhöhen Sie den Wert für dagbag-import-timeout auf mindestens 120 Sekunden (oder mehr, falls erforderlich).

    • Erhöhen Sie den Wert für dag-file-processor-timeout auf mindestens 180 Sekunden (oder mehr, falls erforderlich). Dieser Wert muss größer als dagbag-import-timeout sein.

  • Weitere Informationen finden Sie unter DAG-Prozessorlogs prüfen.

Aufgabe schlägt aufgrund von Ressourcenauslastung fehl, ohne Logs auszugeben

Symptom: Während der Ausführung einer Aufgabe wird der Unterprozess des Airflow-Workers, der für die Ausführung von Airflow-Aufgaben zuständig ist, abrupt unterbrochen. Der im Log des Airflow-Workers sichtbare Fehler kann wie folgt aussehen:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

Lösung:

Aufgabe schlägt aufgrund der Pod-Bereinigung fehl, ohne Logs auszugeben

Google Kubernetes Engine-Pods unterliegen dem Kubernetes-Pod-Lebenszyklus und der Pod-Entfernung. Aufgabenspitzen und die gemeinsame Planung von Workern sind zwei der häufigsten Ursachen für die Pod-Entfernung in Cloud Composer.

Eine Pod-Bereinigung kann auftreten, wenn ein bestimmter Pod Ressourcen eines Knotens relativ zu den konfigurierten Erwartungen in Sachen Ressourcenverbrauch für den Knoten übernutzt. Eine Bereinigung kann beispielsweise erfolgen, wenn mehrere speicherintensive Aufgaben in einem Pod ausgeführt werden und deren kombinierte Last dazu führt, dass der Knoten, auf dem dieser Pod ausgeführt wird, das Limit für die Arbeitsspeichernutzung überschreitet.

Wenn ein Airflow-Worker-Pod bereinigt wird, werden alle auf diesem Pod ausgeführten Aufgabeninstanzen unterbrochen und später von Airflow als fehlgeschlagen markiert.

Logs werden zwischengespeichert. Wenn ein Worker-Pod entfernt wird, bevor der Zwischenspeicher geleert wurde, werden keine Logs ausgegeben. Ein Aufgabenfehler ohne Logs ist ein Hinweis darauf, dass die Airflow-Worker aufgrund von zu wenig Arbeitsspeicher neu gestartet werden. In Cloud Logging sind möglicherweise einige Logs vorhanden, obwohl die Airflow-Logs nicht ausgegeben wurden.

So können die Logs angezeigt werden:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Logs auf.

  4. Sie können Logs einzelner Worker unter Alle Logs -> Airflow-Logs -> Worker -> (einzelner Worker) aufrufen.

Die DAG-Ausführung ist arbeitsspeicherbegrenzt. Jede Ausführung einer Aufgabe beginnt mit zwei Airflow-Prozessen: Ausführung und Monitoring. Jeder Knoten kann bis zu 6 Aufgaben gleichzeitig ausführen (ungefähr 12 Prozesse mit Airflow-Modulen). Je nach Art des DAG wird möglicherweise mehr Arbeitsspeicher belegt.

Symptom:

  1. Rufen Sie in der Google Cloud Console die Seite Arbeitslasten auf.

    Zu Arbeitslasten

  2. Wenn airflow-worker-Pods vorhanden sind, die Evicted anzeigen, klicken Sie auf die einzelnen bereinigten Pods und suchen Sie oben im Fenster nach folgender Meldung: The node was low on resource: memory.

Lösung:

  • Erstellen Sie in Cloud Composer 1 eine neue Cloud Composer-Umgebung mit einem größeren Maschinentyp als dem aktuellen Maschinentyp.
  • Erhöhen Sie in Cloud Composer 2 die Arbeitsspeicherlimits für Airflow-Worker.
  • Prüfen Sie die Logs von airflow-worker-Pods auf mögliche Ursachen für die Bereinigung. Weitere Informationen zum Abrufen von Logs aus einzelnen Pods finden Sie unter Probleme mit bereitgestellten Arbeitslasten beheben.
  • Achten Sie darauf, dass die Aufgaben im DAG idempotent und wiederholbar sind.
  • Vermeiden Sie das Herunterladen unnötiger Dateien in das lokale Dateisystem von Airflow-Workern.

    Airflow-Worker haben eine begrenzte lokale Dateisystemkapazität. In Cloud Composer 2 kann ein Worker beispielsweise zwischen 1 GB und 10 GB Speicherplatz haben. Wenn der Speicherplatz aufgebraucht ist, wird der Airflow-Worker-Pod von der GKE-Steuerungsebene entfernt. Alle Aufgaben, die der entfernte Worker ausgeführt hat, schlägt fehl.

    Beispiele für problematische Vorgänge:

    • Dateien oder Objekte herunterladen und lokal in einem Airflow-Worker speichern Speichern Sie diese Objekte stattdessen direkt in einem geeigneten Dienst wie einem Cloud Storage-Bucket.
    • Über einen Airflow-Worker auf große Objekte im Ordner /data zugreifen Der Airflow-Worker lädt das Objekt in sein lokales Dateisystem herunter. Implementieren Sie stattdessen Ihre DAGs, damit große Dateien außerhalb des Airflow-Worker-Pods verarbeitet werden.

Zeitüberschreitung beim Laden des DAG

Symptom:

  • In der Airflow-Weboberfläche wird oben auf der Seite mit der DAGs-Liste ein rotes Benachrichtigungsfeld mit dem Wert Broken DAG: [/path/to/dagfile] Timeout angezeigt.
  • In Cloud Monitoring: Die airflow-scheduler-Logs enthalten Einträge ähnlich den folgenden:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Lösung:

Überschreiben Sie die Airflow-Konfigurationsoption dag_file_processor_timeout und gewähren Sie mehr Zeit für das DAG-Parsing:

Bereich Schlüssel Wert
core dag_file_processor_timeout Neuer Wert für die Zeitüberschreitung

DAG-Ausführung endet nicht innerhalb der erwarteten Zeit

Symptom:

Manchmal endet eine DAG-Ausführung nicht, weil Airflow-Aufgaben hängen bleiben und die DAG-Ausführung länger als erwartet dauert. Unter normalen Bedingungen bleiben Airflow-Aufgaben nicht unbegrenzt in der Warteschlange oder im Status "Wird ausgeführt", da in Airflow Zeitüberschreitungs- und Bereinigungsverfahren eingesetzt werden, mit denen diese Situation vermieden werden kann.

Lösung:

  • Verwenden Sie für die DAGs den Parameter dagrun_timeout. Beispiel: dagrun_timeout=timedelta(minutes=120). Daher muss jede DAG-Ausführung innerhalb des Zeitlimits für die DAG-Ausführung abgeschlossen sein. Abgeschlossene Aufgaben werden als Failed oder Upstream Failed gekennzeichnet. Weitere Informationen zu den Aufgabenstatus von Airflow finden Sie in der Apache Airflow-Dokumentation.

  • Verwenden Sie den Parameter Zeitlimit für die Ausführung von Aufgaben, um ein Standardzeitlimit für Aufgaben zu definieren, die basierend auf Apache Airflow-Operatoren ausgeführt werden.

DAG-Ausführungen nicht ausgeführt

Symptom:

Wenn ein Zeitplandatum für einen DAG dynamisch festgelegt wird, kann dies zu verschiedenen unerwarteten Nebenwirkungen führen. Beispiel:

  • Eine DAG-Ausführung liegt immer in der Zukunft und der DAG wird nie ausgeführt.

  • Frühere DAG-Ausführungen werden als ausgeführt markiert und sind erfolgreich, obwohl sie nicht ausgeführt wurden.

Weitere Informationen finden Sie in der Apache Airflow-Dokumentation.

Lösung:

  • Folgen Sie den Empfehlungen in der Apache Airflow-Dokumentation.

  • Legen Sie eine statische start_date für DAGs fest. Optional können Sie catchup=False verwenden, um die Ausführung des DAG für vergangene Zeiträume zu deaktivieren.

  • Vermeiden Sie die Verwendung von datetime.now() oder days_ago(<number of days>), es sei denn, Sie kennen die Nebenwirkungen dieses Ansatzes.

Erhöhter Netzwerktraffic zur und von der Airflow-Datenbank

Die Menge des Netzwerk-Traffics zwischen dem GKE-Cluster Ihrer Umgebung und der Airflow-Datenbank hängt von der Anzahl der DAGs, der Anzahl der Aufgaben in DAGs und der Art des Zugriffs der DAGs auf Daten in der Airflow-Datenbank ab. Folgende Faktoren können sich auf die Netzwerknutzung auswirken:

  • Abfragen an die Airflow-Datenbank Wenn Ihre DAGs viele Abfragen ausführen, generieren sie große Mengen an Traffic. Beispiele: Status prüfen, bevor andere Aufgaben ausgeführt werden, XCom-Tabelle abfragen und Inhalt der Airflow-Datenbank auslesen.

  • Große Anzahl von Aufgaben Je mehr Aufgaben geplant sind, desto mehr Netzwerktraffic wird generiert. Dieser Aspekt gilt sowohl für die Gesamtzahl der Aufgaben in Ihren DAGs als auch für die Planungshäufigkeit. Wenn der Airflow-Planer DAG-Ausführungen plant, sendet er Abfragen an die Airflow-Datenbank und generiert Traffic.

  • Die Airflow-Weboberfläche generiert Netzwerkverkehr, da sie Abfragen an die Airflow-Datenbank sendet. Die intensive Verwendung von Seiten mit Grafiken, Aufgaben und Diagrammen kann große Mengen an Netzwerkverkehr generieren.

DAG führt zum Absturz des Airflow-Webservers oder verursacht einen 502 gateway timeout-Fehler

Webserverausfälle können aus verschiedenen Gründen auftreten. Prüfen Sie die airflow-webserver-Logs in Cloud Logging, um die Ursache des 502 gateway timeout-Fehlers zu ermitteln.

Komplexe Berechnungen

Dieser Abschnitt gilt nur für Cloud Composer 1.

Führen Sie keine komplexen Berechnungen während der DAG-Analyse durch.

Im Gegensatz zu den Worker- und Planerknoten, deren Maschinentypen für eine höhere CPU- und Arbeitsspeicherkapazität angepasst werden können, verwendet der Webserver einen festen Maschinentyp. Dies kann zu DAG-Parsing-Fehlern führen, wenn die Parserzeit-Berechnung zu aufwendig ist.

Beachten Sie, dass der Webserver 2 vCPUs und 2 GB Arbeitsspeicher hat. Der Standardwert für core-dagbag_import_timeout beträgt 30 Sekunden. Dieser Zeitlimitwert ist die Obergrenze für den Zeitraum, den Airflow für das Laden eines Python-Moduls in den Ordner dags/ benötigt.

Falsche Berechtigungen

Dieser Abschnitt gilt nur für Cloud Composer 1.

Der Webserver wird nicht unter demselben Dienstkonto ausgeführt wie die Worker und der Planer. Worker und der Planer können somit gegebenenfalls auf vom Nutzer verwaltete Ressourcen zugreifen, auf die der Webserver keinen Zugriff hat.

Vermeiden Sie den Zugriff auf nicht öffentliche Ressourcen während der DAG-Analyse. Sollte dies nicht möglich sein, müssen Sie dem Dienstkonto des Webservers Berechtigungen erteilen. Der Name des Dienstkontos wird von der Webserverdomain abgeleitet. Wenn die Domain beispielsweise example-tp.appspot.com lautet, ist das Dienstkonto example-tp@appspot.gserviceaccount.com.

DAG-Fehler

Dieser Abschnitt gilt nur für Cloud Composer 1.

Der Webserver wird in App Engine ausgeführt und ist vom GKE-Cluster Ihrer Umgebung getrennt. Der Webserver parst die DAG-Definitionsdateien. Wenn dabei Fehler im DAG auftreten, kann es zum 502 gateway timeout kommen. Airflow funktioniert problemlos ohne funktionierenden Webserver, wenn der problematische DAG keine in GKE ausgeführten Prozesse zum Absturz bringt. In diesem Fall können Sie mit gcloud composer environments run Details aus Ihrer Umgebung abrufen und das Problem so umgehen, dass der Webserver ausfällt.

In anderen Fällen können Sie die DAG-Analyse in GKE ausführen und nach DAGs suchen, die schwerwiegende Python-Ausnahmen oder eine Zeitüberschreitung auslösen (Standardeinstellung: 30 Sekunden). Stellen Sie zur Fehlerbehebung eine Verbindung zu einer Remote-Shell in einem Airflow-Worker-Container her und testen Sie, ob Syntaxfehler vorliegen. Weitere Informationen finden Sie unter DAGs testen.

Verarbeitung einer großen Anzahl von DAGs und Plug-ins in DAG- und Plug-in-Ordnern

Der Inhalt der Ordner /dags und /plugins wird vom Bucket Ihrer Umgebung mit lokalen Dateisystemen von Airflow-Workern und Planern synchronisiert.

Je mehr Daten in diesen Ordnern gespeichert sind, desto länger dauert die Synchronisierung. Gehen Sie in solchen Situationen wie folgt vor:

  • Beschränken Sie die Anzahl der Dateien in den Ordnern /dags und /plugins. Speichern Sie nur die mindestens erforderlichen Dateien.

  • Erhöhen Sie nach Möglichkeit den für Airflow-Planer und -Worker verfügbaren Speicherplatz.

  • Erhöhen Sie nach Möglichkeit die CPU und den Arbeitsspeicher von Airflow-Planern und -Workern, damit der Synchronisierungsvorgang schneller ausgeführt wird.

  • Unterteilen Sie bei einer sehr großen Anzahl von DAGs die DAGs in Batches, komprimieren Sie sie in ZIP-Archiven und stellen Sie sie im Ordner /dags bereit. Dieser Ansatz beschleunigt die DAG-Synchronisierung. Airflow-Komponenten dekomprimieren ZIP-Archive vor der Verarbeitung von DAGs.

  • Das Generieren von DAGs in einem programmatischen Ansatz kann auch eine Methode zur Begrenzung der Anzahl der im Ordner /dags gespeicherten DAG-Dateien sein. Im Abschnitt zu programmatischen DAGs erfahren Sie, wie Sie Probleme mit der Planung und Ausführung programmatisch generierter DAGs vermeiden.

Programmatisch generierte DAGs nicht gleichzeitig planen

Das programmatische Generieren von DAG-Objekten aus einer DAG-Datei ist eine effiziente Methode, um viele ähnliche DAGs mit nur geringen Unterschieden zu erstellen.

Solche DAGs sollten nicht sofort zur Ausführung geplant werden. Es besteht eine hohe Wahrscheinlichkeit, dass Airflow-Worker nicht genügend CPU- und Arbeitsspeicherressourcen haben, um alle geplanten Aufgaben gleichzeitig auszuführen.

So vermeiden Sie Probleme mit der Planung programmatischer DAGs:

  • Erhöhen Sie die Nebenläufigkeit von Workern und skalieren Sie Ihre Umgebung, damit mehr Aufgaben gleichzeitig ausgeführt werden können.
  • Generieren Sie DAGs so, dass ihre Zeitpläne gleichmäßig über einen bestimmten Zeitraum verteilt sind. So vermeiden Sie, dass Hunderte von Aufgaben gleichzeitig geplant werden, sodass die Airflow-Worker Zeit haben, alle geplanten Aufgaben auszuführen.

Fehler 504 beim Zugriff auf den Airflow-Webserver

Siehe Fehler 504 beim Zugriff auf die Airflow-UI.

Die Lost connection to Postgres server during query-Ausnahme wird während der Ausführung der Aufgabe oder unmittelbar danach ausgelöst.

Lost connection to Postgres server during query-Ausnahmen treten häufig auf, wenn die folgenden Bedingungen erfüllt sind:

  • Ihr DAG verwendet PythonOperator oder einen benutzerdefinierten Operator.
  • Ihr DAG stellt Abfragen an die Airflow-Datenbank.

Wenn mehrere Abfragen von einer aufrufbaren Funktion gesendet werden, verweisen Tracebacks möglicherweise fälschlicherweise auf die Zeile self.refresh_from_db(lock_for_update=True) im Airflow-Code. Sie ist die erste Datenbankabfrage nach der Aufgabenausführung. Die tatsächliche Ursache der Ausnahme tritt davor auf, wenn eine SQLAlchemy-Sitzung nicht ordnungsgemäß geschlossen wird.

SQLAlchemy-Sitzungen sind auf einen Thread beschränkt und werden in einer aufrufbaren Funktionssitzung erstellt, die später innerhalb des Airflow-Codes fortgesetzt werden kann. Wenn es innerhalb einer Sitzung zu erheblichen Verzögerungen zwischen Abfragen kommt, wurde die Verbindung möglicherweise bereits vom Postgres-Server getrennt. Das Zeitlimit für die Verbindung in Cloud Composer-Umgebungen ist auf etwa zehn Minuten festgelegt.

Lösung:

  • Verwenden Sie den Decorator airflow.utils.db.provide_session. Dieser Decorator stellt eine gültige Sitzung für die Airflow-Datenbank im session-Parameter bereit und schließt die Sitzung am Ende der Funktion ordnungsgemäß.
  • Verwenden Sie keine einzelne Funktion mit langer Ausführungszeit. Verschieben Sie stattdessen alle Datenbankabfragen in separate Funktionen, sodass es mehrere Funktionen mit dem Decorator airflow.utils.db.provide_session gibt. In diesem Fall werden Sitzungen nach dem Abrufen von Abfrageergebnissen automatisch geschlossen.

Ausführungszeit von DAGs, Aufgaben und parallelen Ausführungen desselben DAG steuern

Wenn Sie steuern möchten, wie lange eine einzelne DAG-Ausführung für einen bestimmten DAG dauert, können Sie dazu den DAG-Parameter dagrun_timeout verwenden. Wenn Sie beispielsweise erwarten, dass eine einzelne DAG-Ausführung unabhängig davon, ob sie erfolgreich oder fehlgeschlagen ist, nicht länger als eine Stunde dauern darf, legen Sie für diesen Parameter 3.600 Sekunden fest.

Sie können auch festlegen, wie lange eine einzelne Airflow-Aufgabe dauern soll. Dazu können Sie execution_timeout verwenden.

Wenn Sie festlegen möchten, wie viele aktive DAG-Ausführungen für einen bestimmten DAG möglich sein sollen, können Sie dies mit der Airflow-Konfigurationsoption [core]max-active-runs-per-dag tun.

Wenn nur eine Instanz eines DAG zu einem bestimmten Zeitpunkt ausgeführt werden soll, setzen Sie den Parameter max-active-runs-per-dag auf 1.

Probleme, die sich auf die Synchronisierung von DAGs und Plug-ins mit Planern, Workern und Webservern auswirken

Cloud Composer synchronisiert den Inhalt der Ordner /dags und /plugins mit Planern und Workern. Bestimmte Objekte in den Ordnern /dags und /plugins könnten verhindern, dass diese Synchronisierung richtig funktioniert, oder sie verlangsamt zumindest.

  • /dags Ordner wird mit Planern und Workern synchronisiert. Dieser Ordner wird nicht mit Webservern in Cloud Composer 2 synchronisiert oder wenn Sie DAG Serialization in Cloud Composer 1 aktiviert haben.

  • /plugins Ordner wird mit Planern, Workern und Webservern synchronisiert.

Folgende Probleme können auftreten:

  • Sie haben mit GZIP komprimierte Dateien, die Komprimierungstranscodierung verwenden, in die Ordner /dags und /plugins hochgeladen. Dies geschieht normalerweise, wenn Sie Daten mit dem Befehl gsutil cp -Z in den Bucket hochladen.

    Lösung: Löschen Sie das Objekt, das die Komprimierungstranscodierung verwendet hat, und laden Sie es noch einmal in den Bucket hoch.

  • Eines der Objekte hat den Namen „.“. Ein solches Objekt wird nicht mit Planern und Workern synchronisiert und wird möglicherweise überhaupt nicht mehr synchronisiert.

    Lösung: Benennen Sie das problematische Objekt um.

  • Ein Ordner und eine DAG-Python-Datei haben dieselben Namen, z. B. a.py. In diesem Fall wird die DAG-Datei nicht korrekt mit den Airflow-Komponenten synchronisiert.

    Lösung: Entfernen Sie den Ordner, der denselben Namen wie eine DAG-Python-Datei hat.

  • Eines der Objekte in den Ordnern /dags oder /plugins enthält am Ende des Objektnamens ein /-Symbol. Solche Objekte können den Synchronisierungsprozess täuschen, da das /-Symbol bedeutet, dass ein Objekt ein Ordner und keine Datei ist.

    Lösung: Entfernen Sie das /-Symbol aus dem Namen des problematischen Objekts.

  • Speichern Sie keine unnötigen Dateien in den Ordnern /dags und /plugins.

    Manchmal enthalten DAGs und Plug-ins, die Sie implementieren, zusätzliche Dateien, z. B. Dateien, in denen Tests für diese Komponenten gespeichert werden. Diese Dateien werden mit Workern und Planern synchronisiert und wirken sich auf die Zeit aus, die für das Kopieren dieser Dateien in Planer, Worker und Webserver benötigt wird.

    Lösung: Speichern Sie keine zusätzlichen und unnötigen Dateien in den Ordnern /dags und /plugins.

Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' Fehler wird von Planern und Workern generiert

Dieses Problem tritt auf, weil Objekte einen überlappenden Namespace in Cloud Storage haben können, während Planer und Worker gleichzeitig traditionelle Dateisysteme verwenden. Es ist beispielsweise möglich, dem Bucket einer Umgebung sowohl einen Ordner als auch ein Objekt mit demselben Namen hinzuzufügen. Wenn der Bucket mit den Planern und Workern der Umgebung synchronisiert wird, wird dieser Fehler generiert, der zu Aufgabenfehlern führen kann.

Achten Sie zum Beheben dieses Problems darauf, dass sich im Bucket der Umgebung keine Namespaces überschneiden. Befinden sich beispielsweise sowohl /dags/misc (eine Datei) als auch /dags/misc/example_file.txt (eine andere Datei) in einem Bucket, generiert der Planer einen Fehler.

Vorübergehende Unterbrechungen beim Herstellen einer Verbindung zur Airflow-Metadatendatenbank

Cloud Composer wird auf einer verteilten Cloud-Infrastruktur ausgeführt. Dies bedeutet, dass gelegentlich einige vorübergehende Probleme auftreten können, die die Ausführung Ihrer Airflow-Aufgaben stören können.

In solchen Situationen können folgende Fehlermeldungen in den Logs der Airflow-Worker angezeigt werden:

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

oder

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

Diese zeitweise auftretenden Probleme können auch durch Wartungsvorgänge verursacht werden, die für Ihre Cloud Composer-Umgebungen ausgeführt werden.

In der Regel treten solche Fehler zeitweise auf. Wenn Ihre Airflow-Aufgaben idempotent sind und Sie Wiederholungsversuche konfiguriert haben, sollten Sie dagegen immun sein. Sie können auch Wartungsfenster definieren.

Ein weiterer Grund für solche Fehler könnte ein Mangel an Ressourcen im Cluster Ihrer Umgebung sein. In solchen Fällen können Sie die Umgebung entsprechend den Anweisungen Umgebungen skalieren oder Umgebung optimieren vertikal skalieren oder optimieren.

Eine DAG-Ausführung ist als erfolgreich markiert, enthält aber keine ausgeführten Aufgaben

Wenn die execution_date einer DAG-Ausführung vor dem start_date des DAG liegt, werden möglicherweise DAG-Ausführungen angezeigt, für die keine Aufgaben ausgeführt wurden, die aber trotzdem als erfolgreich markiert sind.

Erfolgreiche DAG-Ausführung ohne ausgeführte Tasks
Abbildung 3: Erfolgreiche DAG-Ausführung ohne ausgeführte Tasks (zum Vergrößern klicken)

Ursache

Diese Situation kann in einem der folgenden Fälle auftreten:

  • Eine Abweichung wird durch den Zeitzonenunterschied zwischen execution_date und start_date des DAG verursacht. Dies kann beispielsweise passieren, wenn start_date mit pendulum.parse(...) festgelegt wird.

  • Der start_date des DAG ist auf einen dynamischen Wert festgelegt, z. B. airflow.utils.dates.days_ago(1).

Lösung

  • execution_date und start_date müssen dieselbe Zeitzone verwenden.

  • Geben Sie einen statischen start_date an und kombinieren Sie ihn mit catchup=False, um die Ausführung von DAGs mit vergangenen Startdaten zu vermeiden.

Ein DAG ist in der Airflow- oder DAG-UI nicht sichtbar und wird vom Planer nicht geplant

Der DAG-Prozessor parst jeden DAG, bevor er vom Planer geplant werden kann und bevor ein DAG in der Airflow-UI oder der DAG-UI angezeigt wird.

Die folgenden Airflow-Konfigurationsoptionen definieren Zeitlimits für das Parsen von DAGs:

Wenn ein DAG nicht in der Airflow- oder DAG-UI sichtbar ist:

  • Prüfen Sie die Logs des DAG-Prozessors, ob der DAG-Prozessor Ihren DAG korrekt verarbeiten kann. Bei Problemen werden in den Logs des DAG-Prozessors oder des Planers folgende Logeinträge angezeigt:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • Prüfen Sie in den Planerlogs, ob der Planer ordnungsgemäß funktioniert. Falls Probleme auftreten, sehen Sie in den Planerlogs möglicherweise die folgenden Logeinträge:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

Lösungen:

  • Alle DAG-Parsing-Fehler beheben. Der DAG-Prozessor parst mehrere DAGs. In seltenen Fällen kann sich das Parsen von Fehlern eines DAG negativ auf das Parsen anderer DAGs auswirken.

  • Wenn das Parsen Ihres DAG länger als die in [core]dagrun_import_timeout definierte Anzahl von Sekunden dauert, erhöhen Sie dieses Zeitlimit.

  • Wenn das Parsen aller DAGs mehr als die in [core]dag_file_processor_timeout definierte Anzahl von Sekunden in Anspruch nimmt, erhöhen Sie das Zeitlimit.

  • Wenn das Parsen des DAG sehr lange dauert, kann dies auch bedeuten, dass er nicht optimal implementiert wurde. Dies ist beispielsweise der Fall, wenn viele Umgebungsvariablen gelesen oder externe Dienste oder Airflow-Datenbanken aufgerufen werden. Vermeiden Sie solche Vorgänge soweit möglich in globalen Abschnitten von DAGs.

  • Erhöhen Sie die CPU- und Arbeitsspeicherressourcen für Planer, damit er schneller arbeiten kann.

  • Passen Sie die Anzahl der Planer an.

  • Erhöhen Sie die Anzahl der DAG-Prozessorprozesse, damit das Parsen schneller erfolgen kann. Erhöhen Sie dazu den Wert von [scheduler]parsing_process.

  • Verringern Sie die Häufigkeit des DAG-Parsings.

  • Verringern Sie die Belastung der Airflow-Datenbank.

Symptome einer hohen Auslastung der Airflow-Datenbank

Weitere Informationen finden Sie unter Symptome von Airflow-Datenbanken unter Lastdruck.

Nächste Schritte