Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Diese Seite enthält Schritte zur Fehlerbehebung und Informationen zu häufigen Workflow-Problemen.
Viele Probleme bei der DAG-Ausführung sind auf eine nicht optimale Umgebungsleistung zurückzuführen. Sie können Ihre Umgebung mithilfe des Leitfadens Umgebungsleistung und -kosten optimieren optimieren.
Einige Probleme bei der DAG-Ausführung können durch eine fehlerhafte oder nicht optimale Funktion des Airflow-Schedulers verursacht werden. Folgen Sie der Anleitung zur Fehlerbehebung für den Zeitplaner, um diese Probleme zu beheben.
Fehlerbehebung beim Workflow
So beginnen Sie mit der Fehlerbehebung:
Prüfen Sie die Airflow-Logs.
Sie können die Logging-Ebene von Airflow erhöhen, indem Sie die folgende Airflow-Konfigurationsoption überschreiben.
Bereich Schlüssel Wert logging
logging_level
Der Standardwert ist INFO
. Legen SieDEBUG
fest, um mehr Details in den Protokollmeldungen zu erhalten.Prüfen Sie das Monitoring-Dashboard.
Suchen Sie in der Google Cloud Console nach Fehlern auf den Seiten für die Komponenten Ihrer Umgebung.
Prüfen Sie in der Airflow-Weboberfläche in der Diagrammansicht des DAG, ob Aufgabeninstanzen fehlgeschlagen sind.
Bereich Schlüssel Wert webserver
dag_orientation
LR
,TB
,RL
oderBT
Fehlerbehebung bei Operatorfehlern
So beheben Sie Operatorfehler:
- Suchen Sie nach aufgabenspezifischen Fehlern.
- Prüfen Sie die Airflow-Logs.
- Cloud Monitoring
- Prüfen Sie die anbieterspezifischen Protokolle.
- Beheben Sie die Fehler.
- Laden Sie den DAG in den Ordner
/dags
hoch. - In der Airflow-Weboberfläche löschen Sie die vergangenen Status für den DAG.
- Setzen Sie den DAG fort oder führen Sie ihn aus.
Fehlerbehebung bei der Aufgabenausführung
Airflow ist ein verteiltes System mit vielen Entitäten wie Scheduler, Executor und Workern, die über eine Task-Warteschlange und die Airflow-Datenbank miteinander kommunizieren und Signale senden (z. B. SIGTERM). Das folgende Diagramm zeigt eine Übersicht über die Verbindungen zwischen Airflow-Komponenten.
In einem verteilten System wie Airflow kann es zu Netzwerkverbindungsproblemen oder zu zeitweiligen Problemen mit der zugrunde liegenden Infrastruktur kommen. Dies kann dazu führen, dass Aufgaben fehlschlagen und neu geplant werden oder nicht erfolgreich abgeschlossen werden (z. B. Zombie-Aufgaben oder Aufgaben, die bei der Ausführung hängen geblieben sind). Airflow bietet Mechanismen, um mit solchen Situationen umzugehen und den normalen Betrieb automatisch wieder aufzunehmen. In den folgenden Abschnitten werden häufige Probleme beschrieben, die bei der Ausführung von Aufgaben durch Airflow auftreten: Zombie-Aufgaben, Beendigung der Instanz und SIGTERM-Signale.
Fehlerbehebung bei Zombie-Aufgaben
Airflow erkennt zwei Arten von Abweichungen zwischen einer Aufgabe und einem Prozess, der die Aufgabe ausführt:
Zombie-Aufgaben sind Aufgaben, die ausgeführt werden sollten, aber nicht ausgeführt werden. Dies kann passieren, wenn der Prozess der Aufgabe beendet wurde oder nicht reagiert, wenn der Airflow-Worker aufgrund einer Überlastung nicht rechtzeitig einen Aufgabenstatus meldet oder wenn die VM, auf der die Aufgabe ausgeführt wird, heruntergefahren wurde. Airflow findet solche Aufgaben regelmäßig und schlägt sie je nach den Einstellungen der Aufgabe fehl oder versucht, sie noch einmal auszuführen.
Zombie-Aufgaben erkennen
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-scheduler") textPayload:"Detected zombie job"
Undead-Tasks sind Aufgaben, die nicht ausgeführt werden sollten. Airflow findet solche Aufgaben regelmäßig und beendet sie.
In den folgenden Abschnitten werden die häufigsten Gründe und Lösungen für Zombie-Aufgaben beschrieben.
Airflow-Worker hat nicht genügend Arbeitsspeicher
Jeder Airflow-Worker kann bis zu [celery]worker_concurrency
Aufgabeninstanzen gleichzeitig ausführen. Wenn die kumulative Arbeitsspeichernutzung dieser Aufgabeninstanzen das Arbeitsspeicherlimit für einen Airflow-Worker überschreitet, wird ein zufälliger Prozess darauf beendet, um Ressourcen freizugeben.
Manchmal kann ein Mangel an Arbeitsspeicher auf einem Airflow-Worker dazu führen, dass während einer SQL Alchemy-Sitzung fehlerhafte Pakete an die Datenbank, an einen DNS-Server oder an einen anderen Dienst gesendet werden, der von einem DAG aufgerufen wird. In diesem Fall werden Verbindungen vom Airflow-Worker möglicherweise vom anderen Ende der Verbindung abgelehnt oder getrennt. Beispiel:
"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"
Lösungen:
Optimieren Sie Aufgaben, um weniger Arbeitsspeicher zu verbrauchen, indem Sie beispielsweise Code auf oberster Ebene vermeiden.
Verringern Sie
[celery]worker_concurrency
.Erhöhen Sie den Arbeitsspeicher für Airflow-Worker, um
[celery]worker_concurrency
-Änderungen zu berücksichtigen.
Airflow-Worker wurde entfernt
Pod-Bereinigungen sind ein normaler Teil der Ausführung von Arbeitslasten in Kubernetes. GKE entfernt Pods, wenn der Speicherplatz aufgebraucht ist oder um Ressourcen für Arbeitslasten mit höherer Priorität freizugeben.
Lösungen:
- Wenn eine Auslagerung auf einen Mangel an Speicherplatz zurückzuführen ist, können Sie die Speichernutzung reduzieren oder temporäre Dateien entfernen, sobald sie nicht mehr benötigt werden.
Alternativ können Sie den verfügbaren Speicherplatz erhöhen oder Arbeitslasten in einem speziellen Pod mit
KubernetesPodOperator
ausführen.
Airflow-Arbeiter wurde beendet
Airflow-Worker können extern entfernt werden. Wenn derzeit ausgeführte Aufgaben während der ordnungsgemäßen Beendigungszeit nicht abgeschlossen werden, werden sie unterbrochen und möglicherweise als Zombies erkannt.
Mögliche Szenarien und Lösungen:
Airflow-Worker werden bei Umgebungsänderungen wie Upgrades oder Paketinstallationen neu gestartet:
Änderungen an Composer-Umgebungen erkennen
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("cloudaudit.googleapis.com%2Factivity")
Sie können solche Vorgänge ausführen, wenn keine kritischen Aufgaben ausgeführt werden, oder die Wiederaufführbarkeit von Aufgaben aktivieren.
Verschiedene Komponenten sind während der Wartungsarbeiten möglicherweise vorübergehend nicht verfügbar.
Sie können Wartungsfenster angeben, ummit der Ausführung der kritischen Aufgaben überschneidet.
Airflow-Worker war stark ausgelastet
Die Menge der CPU- und Arbeitsspeicherressourcen, die für einen Airflow-Worker verfügbar sind, wird durch die Konfiguration der Umgebung begrenzt. Wenn die Ressourcennutzung näher an den Limits liegt, kann es zu Ressourcenkonflikten und unnötigen Verzögerungen bei der Aufgabenausführung kommen. In extremen Fällen, wenn über einen längeren Zeitraum hinweg Ressourcen fehlen, kann dies zu Zombie-Aufgaben führen.
Lösungen:
- Überwachen Sie die CPU- und Arbeitsspeichernutzung der Worker und passen Sie sie an, damit sie 80 % nicht überschreitet.
Die Airflow-Datenbank war stark ausgelastet
Eine Datenbank wird von verschiedenen Airflow-Komponenten verwendet, um miteinander zu kommunizieren und insbesondere Heartbeats von Aufgabeninstanzen zu speichern. Ein Ressourcenmangel in der Datenbank führt zu längeren Abfragezeiten und kann sich auf die Aufgabenausführung auswirken.
Manchmal sind in den Logs eines Airflow-Workers die folgenden Fehler enthalten:
(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly
This probably means the server terminated abnormally before or while
processing the request.
Lösungen:
- Vermeiden Sie viele
Variables.get
-Anweisungen im DAG-Code der obersten Ebene. Verwenden Sie stattdessen Jinja-Vorlagen, um Werte von Airflow-Variablen abzurufen. - Optimieren (reduzieren) Sie die Verwendung von xcom_push- und xcom_pull-Anweisungen in Jinja-Vorlagen im DAG-Code der obersten Ebene.
- Erwägen Sie ein Upgrade auf eine größere Umgebungsgröße (mittel oder groß).
- Reduzieren Sie die Anzahl der Planer.
- Verringern Sie die Häufigkeit des DAG-Parsings.
- CPU- und Arbeitsspeichernutzung der Datenbank überwachen
Airflow-Datenbank vorübergehend nicht verfügbar
Es kann einige Zeit dauern, bis ein Airflow-Worker sporadische Fehler wie vorübergehende Verbindungsprobleme erkennt und ordnungsgemäß verarbeitet. Möglicherweise wird der Standardgrenzwert für die Erkennung von Zombies überschritten.
Airflow-Zeitüberschreitungen für Heartbeats ermitteln
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker") textPayload:"Heartbeat time limit exceeded"
Lösungen:
Erhöhen Sie das Zeitlimit für Zombie-Aufgaben und überschreiben Sie den Wert der Airflow-Konfigurationsoption
[scheduler]scheduler_zombie_task_threshold
:Bereich Schlüssel Wert Hinweise scheduler
scheduler_zombie_task_threshold
Neues Zeitlimit (in Sekunden) Der Standardwert ist 300
.
Fehlerbehebung bei der Beendigung einer Instanz
Airflow verwendet den Mechanismus terminating instance, um Airflow-Aufgaben herunterzufahren. Dieser Mechanismus wird in den folgenden Fällen verwendet:
- Wenn ein Scheduler eine Aufgabe beendet, die nicht rechtzeitig abgeschlossen wurde.
- Wenn bei einer Aufgabe ein Zeitlimit überschritten wird oder die Ausführung zu lange dauert.
Wenn Airflow Aufgabeninstanzen beendet, sehen Sie in den Logs eines Airflow-Workers, der die Aufgabe ausgeführt hat, die folgenden Logeinträge:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Terminating instance.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Mögliche Lösungen:
Prüfen Sie den Aufgabencode auf Fehler, die dazu führen könnten, dass die Ausführung zu lange dauert.
Erhöhen Sie die CPU und den Arbeitsspeicher für Airflow-Worker, damit Aufgaben schneller ausgeführt werden.
Erhöhen Sie den Wert der Airflow-Konfigurationsoption
[celery_broker_transport_options]visibility-timeout
.Daher wartet der Scheduler länger, bis eine Aufgabe abgeschlossen ist, bevor er sie als Zombie-Aufgabe betrachtet. Diese Option ist besonders für zeitaufwendige Aufgaben geeignet, die viele Stunden dauern. Wenn der Wert zu niedrig ist (z. B. 3 Stunden), betrachtet der Scheduler Aufgaben, die 5 oder 6 Stunden laufen, als „hängend“ (Zombie-Aufgaben).
Erhöhen Sie den Wert der Airflow-Konfigurationsoption
[core]killed_task_cleanup_time
.Ein längerer Wert gibt Airflow-Workern mehr Zeit, ihre Aufgaben ordnungsgemäß abzuschließen. Ist der Wert zu niedrig, werden Airflow-Aufgaben möglicherweise abrupt unterbrochen, ohne dass sie ordnungsgemäß abgeschlossen werden können.
Fehlerbehebung bei SIGTERM-Signalen
SIGTERM-Signale werden von Linux, Kubernetes, Airflow Scheduler und Celery verwendet, um Prozesse zu beenden, die für die Ausführung von Airflow-Workern oder Airflow-Aufgaben verantwortlich sind.
Es kann mehrere Gründe dafür geben, dass SIGTERM-Signale in einer Umgebung gesendet werden:
Eine Aufgabe ist zu einer Zombie-Aufgabe geworden und muss beendet werden.
Der Scheduler hat ein Duplikat einer Aufgabe gefunden und sendet der Aufgabe die Signale „Terminating instance“ und „SIGTERM“, um sie zu beenden.
Beim horizontalen Pod-Autoscaling sendet die GKE-Steuerungsebene SIGTERM-Signale, um Pods zu entfernen, die nicht mehr benötigt werden.
Der Planer kann SIGTERM-Signale an den DagFileProcessorManager-Prozess senden. Solche SIGTERM-Signale werden vom Scheduler verwendet, um den Lebenszyklus des DagFileProcessorManager-Prozesses zu verwalten, und können ignoriert werden.
Beispiel:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Race Condition zwischen dem Heartbeat-Callback und den Exit-Callbacks in local_task_job, der die Ausführung der Aufgabe überwacht. Wenn der Heartbeat erkennt, dass eine Aufgabe als erfolgreich markiert wurde, kann nicht unterschieden werden, ob die Aufgabe selbst erfolgreich war oder ob Airflow angewiesen wurde, die Aufgabe als erfolgreich zu betrachten. Trotzdem wird ein Task-Runner beendet, ohne auf das Ende zu warten.
Solche SIGTERM-Signale können ignoriert werden. Die Aufgabe hat bereits den Status „Erfolgreich“ und die Ausführung der DAG-Ausführung insgesamt wird nicht beeinträchtigt.
Der Logeintrag
Received SIGTERM.
ist der einzige Unterschied zwischen dem regulären Beenden und dem Beenden der Aufgabe im Status „Erfolgreich“.Abbildung 2. Race Condition zwischen dem Heartbeat und dem Exit-Callback (zum Vergrößern anklicken) Eine Airflow-Komponente belegt mehr Ressourcen (CPU, Arbeitsspeicher) als vom Clusterknoten zulässig.
Der GKE-Dienst führt Wartungsvorgänge aus und sendet SIGTERM-Signale an Pods, die auf einem Knoten ausgeführt werden, der gerade aktualisiert wird.
Wenn eine Aufgabeninstanz mit SIGTERM beendet wird, sehen Sie in den Logs eines Airflow-Workers, der die Aufgabe ausgeführt hat, die folgenden Logeinträge:
{local_task_job.py:211} WARNING - State of this instance has been externally set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed with exception
Mögliche Lösungen:
Dieses Problem tritt auf, wenn einer VM, auf der die Aufgabe ausgeführt wird, nicht genügend Arbeitsspeicher zur Verfügung steht. Dies hängt nicht mit Airflow-Konfigurationen zusammen, sondern mit der Menge an Arbeitsspeicher, die der VM zur Verfügung steht.
In Cloud Composer 3 können Sie Airflow-Workern mehr CPU- und Arbeitsspeicherressourcen zuweisen.
Sie können den Wert der Airflow-Konfigurationsoption „
[celery]worker_concurrency
concurrency“ senken. Mit dieser Option wird festgelegt, wie viele Aufgaben von einem bestimmten Airflow-Worker gleichzeitig ausgeführt werden.
Weitere Informationen zum Optimieren Ihrer Umgebung finden Sie unter Umgebungsleistung und -kosten optimieren.
Auswirkungen von Aktualisierungs- oder Upgradevorgängen auf die Ausführung von Airflow-Aufgaben
Aktualisierungs- oder Upgradevorgänge unterbrechen derzeit ausgeführte Airflow-Aufgaben, es sei denn, eine Aufgabe wird im Modus mit verzögerbarer Ausführung ausgeführt.
Wir empfehlen, diese Vorgänge auszuführen, wenn Sie mit minimalen Auswirkungen auf die Ausführung von Airflow-Aufgaben rechnen, und entsprechende Wiederholungsmechanismen in Ihren DAGs und Aufgaben einzurichten.
Fehlerbehebung bei KubernetesExecutor-Aufgaben
CeleryKubernetesExecutor ist ein Executor-Typ in Cloud Composer 3, der CeleryExecutor und KubernetesExecutor gleichzeitig verwenden kann.
Weitere Informationen zur Fehlerbehebung bei Aufgaben, die mit KubernetesExecutor ausgeführt werden, finden Sie auf der Seite CeleryKubernetesExecutor verwenden.
Allgemeine Probleme
In den folgenden Abschnitten werden Symptome und mögliche Lösungen für einige häufig auftretende DAG-Probleme beschrieben.
Airflow-Aufgabe wurde von Negsignal.SIGKILL
unterbrochen
Manchmal benötigt Ihre Aufgabe mehr Arbeitsspeicher als dem Airflow-Worker zugewiesen ist.
In diesem Fall wird sie möglicherweise von Negsignal.SIGKILL
unterbrochen. Das System sendet dieses Signal, um eine weitere Arbeitsspeicherbelastung zu vermeiden, die sich auf die Ausführung anderer Airflow-Aufgaben auswirken könnte. Im Log des Airflow-Workers wird möglicherweise der folgende Logeintrag angezeigt:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
kann auch als Code -9
angezeigt werden.
Mögliche Lösungen:
Verringern Sie
worker_concurrency
der Airflow-Worker.Erhöhen Sie den für Airflow-Worker verfügbaren Arbeitsspeicher.
Verwalten Sie ressourcenintensive Aufgaben in Cloud Composer mit KubernetesPodOperator oder GKEStartPodOperator für die Aufgabenisolierung und benutzerdefinierte Ressourcenzuweisung.
Optimieren Sie Ihre Aufgaben, damit sie weniger Arbeitsspeicher verbrauchen.
Aufgabe schlägt aufgrund von Fehlern beim DAG-Parsen fehl, ohne Protokolle auszugeben
Manchmal gibt es subtile DAG-Fehler, die dazu führen, dass der Airflow-Planer Aufgaben zur Ausführung planen und der DAG-Prozessor die DAG-Datei parsen kann, der Airflow-Worker aber keine Aufgaben aus dem DAG ausführen kann, weil es Programmierfehler in der DAG-Datei gibt. Dies kann dazu führen, dass eine Airflow-Aufgabe als Failed
markiert ist und es kein Protokoll zur Ausführung gibt.
Lösungen:
Prüfen Sie in den Airflow-Worker-Logs, ob vom Airflow-Worker keine Fehler aufgrund eines fehlenden DAG oder von DAG-Parsing-Fehlern gemeldet werden.
Erhöhen Sie die Parameter für das DAG-Parsing:
Erhöhen Sie dagbag-import-timeout auf mindestens 120 Sekunden (bei Bedarf auch mehr).
Erhöhen Sie dag-file-processor-timeout auf mindestens 180 Sekunden (bei Bedarf auch mehr). Dieser Wert muss größer als
dagbag-import-timeout
sein.
Siehe auch DAG-Prozessor-Logs prüfen.
Aufgabe schlägt aufgrund von Ressourcenmangel fehl, ohne Logs auszugeben
Symptom: Während der Ausführung einer Aufgabe wird der für die Ausführung der Airflow-Aufgabe zuständige Unterprozess des Airflow-Workers abrupt unterbrochen. Der im Airflow-Worker-Log sichtbare Fehler sieht möglicherweise so aus:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Lösung:
In Cloud Composer 3 können Sie die Arbeitsspeicherlimits für Airflow-Worker erhöhen.
Wenn in Ihrer Umgebung auch Zombie-Aufgaben generiert werden, lesen Sie den Hilfeartikel Fehlerbehebung bei Zombie-Aufgaben.
Eine Anleitung zum Beheben von Problemen mit unzureichendem Arbeitsspeicher finden Sie unter Fehlerbehebung bei DAG-Problemen aufgrund von unzureichendem Arbeitsspeicher oder Speicherplatz.
Aufgabe schlägt fehl, ohne Logs auszugeben, weil Pod entfernt wurde
Google Kubernetes Engine-Pods unterliegen dem Kubernetes-Pod-Lebenszyklus und der Pod-Bereinigung. Aufgabenspitzen sind die häufigste Ursache für die Pod-Bereinigung in Cloud Composer.
Eine Pod-Bereinigung kann auftreten, wenn ein bestimmter Pod Ressourcen eines Knotens relativ zu den konfigurierten Erwartungen in Sachen Ressourcenverbrauch für den Knoten übernutzt. Beispiel: Eine Bereinigung kann auftreten, wenn mehrere speicherintensive Aufgaben in einem Pod ausgeführt werden und ihre kombinierte Last dazu führt, dass der Knoten, auf dem dieser Pod ausgeführt wird, das Limit für den Arbeitsspeicherverbrauch überschreitet.
Wenn ein Airflow-Worker-Pod entfernt wird, werden alle auf diesem Pod ausgeführten Aufgabeninstanzen unterbrochen und später von Airflow als fehlgeschlagen markiert.
Logs werden zwischengespeichert. Wenn ein Worker-Pod entfernt wird, bevor der Zwischenspeicher geleert wurde, werden keine Logs ausgegeben. Ein Aufgabenfehler ohne Logs ist ein Hinweis darauf, dass die Airflow-Worker aufgrund von zu wenig Arbeitsspeicher neu gestartet werden. In Cloud Logging sind möglicherweise einige Logs vorhanden, obwohl die Airflow-Logs nicht ausgegeben wurden.
So können die Logs angezeigt werden:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Protokolle auf.
Die Logs einzelner Airflow-Worker finden Sie unter Alle Logs > Airflow-Logs > Worker.
Lösung:
Erhöhen Sie die Arbeitsspeicherlimits für Airflow-Worker.
Achten Sie darauf, dass die Aufgaben im DAG idempotent und abrufbar sind.
Laden Sie keine unnötigen Dateien in das lokale Dateisystem von Airflow-Workern herunter.
Airflow-Worker haben eine begrenzte Kapazität des lokalen Dateisystems. Ein Airflow-Worker kann zwischen 1 GB und 10 GB Speicherplatz haben. Wenn der Speicherplatz aufgebraucht ist, wird der Airflow-Worker-Pod von der GKE-Steuerungsebene entfernt. Dadurch schlagen alle Aufgaben fehl, die der entfernte Worker ausgeführt hat.
Beispiele für problematische Vorgänge:
- Dateien oder Objekte herunterladen und lokal in einem Airflow-Worker speichern Speichern Sie diese Objekte stattdessen direkt in einem geeigneten Dienst wie einem Cloud Storage-Bucket.
- Zugriff auf große Objekte im Ordner
/data
über einen Airflow-Worker Der Airflow-Worker lädt das Objekt in sein lokales Dateisystem herunter. Implementieren Sie Ihre DAGs stattdessen so, dass große Dateien außerhalb des Airflow-Worker-Pods verarbeitet werden.
Zeitüberschreitung beim Laden des DAG
Symptom:
- In der Airflow-Weboberfläche wird oben auf der Seite mit der DAG-Liste in einem roten Warnfeld
Broken DAG: [/path/to/dagfile] Timeout
angezeigt. In Cloud Monitoring: Die
airflow-scheduler
-Logs enthalten Einträge wie beispielsweise die folgenden:ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
Lösung:
Überschreiben Sie die Airflow-Konfiguration dag_file_processor_timeout
und legen Sie mehr Zeit für das DAG-Parsen fest:
Bereich | Schlüssel | Wert |
---|---|---|
core |
dag_file_processor_timeout |
Neuer Wert für die Zeitüberschreitung |
DAG-Ausführung endet nicht innerhalb der erwarteten Zeit
Symptom:
Manchmal endet eine DAG-Ausführung nicht, weil Airflow-Aufgaben hängen bleiben und die DAG-Ausführung länger als erwartet dauert. Unter normalen Bedingungen bleiben Airflow-Aufgaben nicht unbegrenzt im Status „In der Warteschlange“ oder „Ausgeführt“, da Airflow Zeitüberschreitungen und Bereinigungsverfahren hat, die diese Situation vermeiden helfen.
Lösung:
Verwenden Sie den Parameter
dagrun_timeout
für die DAGs. Beispiel:dagrun_timeout=timedelta(minutes=120)
. Daher muss jede DAG-Ausführung innerhalb des Zeitlimits für die DAG-Ausführung abgeschlossen sein. Nicht abgeschlossene Aufgaben sind mitFailed
oderUpstream Failed
gekennzeichnet. Weitere Informationen zu Airflow-Aufgabenstatus finden Sie in der Apache Airflow-Dokumentation.Mit dem Parameter task execution timeout (Zeitüberschreitung für die Aufgabenausführung) können Sie ein Standardzeitlimit für Aufgaben definieren, die auf Apache Airflow-Operatoren basieren.
DAG-Ausführungen werden nicht ausgeführt
Symptom:
Wenn ein Planungsdatum für einen DAG dynamisch festgelegt wird, kann dies zu verschiedenen unerwarteten Nebenwirkungen führen. Beispiel:
Eine DAG-Ausführung liegt immer in der Zukunft und die DAG wird nie ausgeführt.
Vergangene DAG-Ausführungen werden als ausgeführt und erfolgreich markiert, obwohl sie nicht ausgeführt wurden.
Weitere Informationen finden Sie in der Apache Airflow-Dokumentation.
Mögliche Lösungen:
Folgen Sie den Empfehlungen in der Apache Airflow-Dokumentation.
Legen Sie eine statische
start_date
für DAGs fest. Alternativ können Sie mitcatchup=False
die Ausführung des DAG für vergangene Zeiträume deaktivieren.Verwenden Sie
datetime.now()
oderdays_ago(<number of days>)
nur, wenn Sie sich der Nebenwirkungen dieses Ansatzes bewusst sind.
Erhöhter Netzwerktraffic zur und von der Airflow-Datenbank
Die Menge des Netzwerk-Traffics zwischen dem GKE-Cluster Ihrer Umgebung und der Airflow-Datenbank hängt von der Anzahl der DAGs, der Anzahl der Aufgaben in DAGs und der Art des Zugriffs der DAGs auf Daten in der Airflow-Datenbank ab. Folgende Faktoren können sich auf die Netzwerknutzung auswirken:
Abfragen an die Airflow-Datenbank Wenn Ihre DAGs viele Abfragen ausführen, generieren sie große Mengen an Traffic. Beispiele: Status prüfen, bevor andere Aufgaben ausgeführt werden, XCom-Tabelle abfragen und Inhalt der Airflow-Datenbank auslesen.
Große Anzahl von Aufgaben Je mehr Aufgaben geplant sind, desto mehr Netzwerktraffic wird generiert. Dieser Aspekt gilt sowohl für die Gesamtzahl der Aufgaben in Ihren DAGs als auch für die Planungshäufigkeit. Wenn der Airflow-Planer DAG-Ausführungen plant, sendet er Abfragen an die Airflow-Datenbank und generiert Traffic.
Die Airflow-Weboberfläche generiert Netzwerkverkehr, da sie Abfragen an die Airflow-Datenbank sendet. Die intensive Verwendung von Seiten mit Grafiken, Aufgaben und Diagrammen kann große Mengen an Netzwerkverkehr generieren.
DAG verursacht den Absturz des Airflow-Webservers oder einen „502 gateway timeout“-Fehler
Webserverausfälle können aus verschiedenen Gründen auftreten. Prüfen Sie die airflow-webserver-Logs in Cloud Logging, um die Ursache des Fehlers 502 gateway timeout
zu ermitteln.
Umgang mit einer großen Anzahl von DAGs und Plug-ins in den Ordnern „DAGs“ und „Plug-ins“
Der Inhalt der Ordner /dags
und /plugins
wird aus dem Bucket Ihrer Umgebung mit den lokalen Dateisystemen von Airflow-Workern und -Planern synchronisiert.
Je mehr Daten in diesen Ordnern gespeichert sind, desto länger dauert die Synchronisierung. So gehen Sie in solchen Fällen vor:
Begrenzen Sie die Anzahl der Dateien in den Ordnern
/dags
und/plugins
. Speichern Sie nur die minimal erforderlichen Dateien.Erhöhen Sie den für Airflow-Planer und -Worker verfügbaren Speicherplatz.
Erhöhen Sie die CPU und den Arbeitsspeicher der Airflow-Planer und ‑Worker, damit die Synchronisierung schneller ausgeführt wird.
Bei einer sehr großen Anzahl von DAGs sollten Sie die DAGs in Batches unterteilen, in ZIP-Archive komprimieren und diese Archive im Ordner
/dags
bereitstellen. Dadurch wird die Synchronisierung der DAGs beschleunigt. Airflow-Komponenten entpacken ZIP-Archive, bevor DAGs verarbeitet werden.Das programmatische Generieren von DAGs kann auch eine Methode sein, um die Anzahl der im Ordner
/dags
gespeicherten DAG-Dateien zu begrenzen. Im Abschnitt zu programmatischen DAGs finden Sie Informationen dazu, wie Sie Probleme beim Planen und Ausführen programmatisch generierter DAGs vermeiden.
Programmisch generierte DAGs nicht zur selben Zeit planen
Das programmgesteuerte Erzeugen von DAG-Objekten aus einer DAG-Datei ist eine effiziente Methode, um viele ähnliche DAGs mit nur kleinen Unterschieden zu erstellen.
Es ist wichtig, nicht alle diese DAGs sofort zur Ausführung zu planen. Es besteht eine hohe Wahrscheinlichkeit, dass Airflow-Worker nicht genügend CPU- und Arbeitsspeicherressourcen haben, um alle gleichzeitig geplanten Aufgaben auszuführen.
So vermeiden Sie Probleme beim Planen programmatischer DAGs:
- Erhöhen Sie die Nebenläufigkeit der Worker und skalieren Sie Ihre Umgebung, damit mehr Aufgaben gleichzeitig ausgeführt werden können.
- Erstellen Sie DAGs so, dass ihre Zeitpläne gleichmäßig über die Zeit verteilt sind, um zu vermeiden, dass Hunderte von Aufgaben gleichzeitig geplant werden. So haben Airflow-Worker Zeit, alle geplanten Aufgaben auszuführen.
Fehler 504 beim Zugriff auf den Airflow-Webserver
Weitere Informationen finden Sie unter Fehler 504 beim Zugriff auf die Airflow-Benutzeroberfläche.
Die Verbindung zu Postgres-Server wird während der Abfrage unterbrochen. Ausnahme wird während der Aufgabenausführung oder unmittelbar danach ausgelöst.
Lost connection to Postgres server during query
-Ausnahmen treten häufig auf, wenn folgende Bedingungen erfüllt sind:
- Ihr DAG verwendet
PythonOperator
oder einen benutzerdefinierten Operator. - Ihr DAG stellt Abfragen an die Airflow-Datenbank.
Wenn mehrere Abfragen von einer aufrufbaren Funktion gesendet werden, verweisen Tracebacks möglicherweise fälschlicherweise auf die Zeile self.refresh_from_db(lock_for_update=True)
im Airflow-Code. Sie ist die erste Datenbankabfrage nach der Aufgabenausführung. Die tatsächliche Ursache der Ausnahme tritt davor auf, wenn eine SQLAlchemy-Sitzung nicht ordnungsgemäß geschlossen wird.
SQLAlchemy-Sitzungen sind auf einen Thread beschränkt und werden in einer aufrufbaren Funktionssitzung erstellt, die später innerhalb des Airflow-Codes fortgesetzt werden kann. Wenn es innerhalb einer Sitzung zu erheblichen Verzögerungen zwischen Abfragen kommt, wurde die Verbindung möglicherweise bereits vom Postgres-Server geschlossen. Das Zeitlimit für die Verbindung in Cloud Composer-Umgebungen ist auf etwa zehn Minuten festgelegt.
Lösung:
- Verwenden Sie den Decorator
airflow.utils.db.provide_session
. Dieser Decorator stellt eine gültige Sitzung für die Airflow-Datenbank imsession
-Parameter bereit und schließt die Sitzung am Ende der Funktion ordnungsgemäß. - Verwenden Sie keine einzelne Funktion mit langer Ausführungszeit. Verschieben Sie stattdessen alle Datenbankabfragen in separate Funktionen, sodass es mehrere Funktionen mit dem Decorator
airflow.utils.db.provide_session
gibt. In diesem Fall werden Sitzungen nach dem Abrufen von Abfrageergebnissen automatisch geschlossen.
Ausführungszeit von DAGs, Aufgaben und parallelen Ausführungen desselben DAG steuern
Wenn Sie steuern möchten, wie lange eine einzelne DAG-Ausführung für einen bestimmten DAG dauert, können Sie dazu den DAG-Parameter dagrun_timeout
verwenden. Wenn Sie beispielsweise davon ausgehen, dass eine einzelne DAG-Ausführung (unabhängig davon, ob die Ausführung erfolgreich oder fehlgeschlagen ist) nicht länger als eine Stunde dauern darf, legen Sie diesen Parameter auf 3.600 Sekunden fest.
Sie können auch festlegen, wie lange eine einzelne Airflow-Aufgabe dauern darf. Dazu können Sie execution_timeout
verwenden.
Wenn Sie festlegen möchten, wie viele aktive DAG-Ausführungen für einen bestimmten DAG ausgeführt werden sollen, können Sie dazu die [core]max-active-runs-per-dag
-Airflow-Konfigurationsoption verwenden.
Wenn zu einem bestimmten Zeitpunkt nur eine einzige Instanz eines DAG ausgeführt werden soll, legen Sie den Parameter max-active-runs-per-dag
auf 1
fest.
Probleme bei der Synchronisierung von DAGs und Plug-ins mit Planern, Workern und Webservern
Cloud Composer synchronisiert den Inhalt der Ordner /dags
und /plugins
mit Planern und Workern. Bestimmte Objekte in den Ordnern /dags
und /plugins
können diese Synchronisierung verhindern oder verlangsamen.
Der Ordner
/dags
wird mit den Planern und Workern synchronisiert.Dieser Ordner wird nicht mit dem Webserver synchronisiert.
Der Ordner
/plugins
wird mit Planern, Workern und Webservern synchronisiert.
Die folgenden Probleme können auftreten:
Sie haben gzip-komprimierte Dateien mit Komprimierungs-Transcodierung in die Ordner
/dags
und/plugins
hochgeladen. Das passiert in der Regel, wenn Sie das Flag--gzip-local-all
in einemgcloud storage cp
-Befehl verwenden, um Daten in den Bucket hochzuladen.Lösung: Lösche das Objekt, für das die Komprimierungstranscodierung verwendet wurde, und lade es noch einmal in den Bucket hoch.
Eines der Objekte heißt „.“. Ein solches Objekt wird nicht mit Schedulern und Workern synchronisiert und die Synchronisierung wird möglicherweise vollständig beendet.
Lösung: Benennen Sie das Objekt um.
Ein Ordner und eine DAG-Python-Datei haben denselben Namen, z. B.
a.py
. In diesem Fall wird die DAG-Datei nicht richtig mit den Airflow-Komponenten synchronisiert.Lösung: Entfernen Sie den Ordner mit demselben Namen wie die DAG-Python-Datei.
Eines der Objekte in den Ordnern
/dags
oder/plugins
enthält am Ende des Objektnamens das Symbol/
. Solche Objekte können den Synchronisierungsprozess beeinträchtigen, da das Symbol/
bedeutet, dass es sich bei einem Objekt um einen Ordner und nicht um eine Datei handelt.Lösung: Entfernen Sie das Symbol
/
aus dem Namen des problematischen Objekts.Speichern Sie keine unnötigen Dateien in den Ordnern
/dags
und/plugins
.Manchmal sind in den von Ihnen implementierten DAGs und Plug-ins zusätzliche Dateien enthalten, z. B. Dateien, in denen Tests für diese Komponenten gespeichert sind. Diese Dateien werden mit Workern und Planern synchronisiert und beeinflussen die Zeit, die zum Kopieren dieser Dateien auf Planer, Worker und Webserver benötigt wird.
Lösung: Speichern Sie keine zusätzlichen und nicht erforderlichen Dateien in den Ordnern
/dags
und/plugins
.
Fertig [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' error is generated by schedulers and workers
Dieses Problem tritt auf, weil Objekte in Cloud Storage einen sich überschneidenden Namespace haben können, während Scheduler und Worker gleichzeitig traditionelle Dateisysteme verwenden. So ist es beispielsweise möglich, dem Bucket einer Umgebung sowohl einen Ordner als auch ein Objekt mit demselben Namen hinzuzufügen. Wenn der Bucket mit den Planern und Workern der Umgebung synchronisiert wird, wird dieser Fehler generiert, was zu Aufgabenausfällen führen kann.
Achten Sie darauf, dass sich im Bucket der Umgebung keine überlappenden Namespaces befinden, um dieses Problem zu beheben. Wenn sich beispielsweise sowohl /dags/misc
(eine Datei) als auch /dags/misc/example_file.txt
(eine andere Datei) in einem Bucket befinden, wird vom Scheduler ein Fehler generiert.
Vorübergehende Unterbrechungen bei der Verbindung zur Airflow-Metadaten-Datenbank
Cloud Composer wird auf einer verteilten Infrastruktur ausgeführt. Das bedeutet, dass gelegentlich vorübergehende Probleme auftreten können, die die Ausführung Ihrer Airflow-Aufgaben unterbrechen.
In solchen Fällen werden in den Logs der Airflow-Worker möglicherweise die folgenden Fehlermeldungen angezeigt:
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
oder
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Solche sporadischen Probleme können auch durch Wartungsarbeiten an Ihren Cloud Composer-Umgebungen verursacht werden.
Normalerweise treten solche Fehler nur sporadisch auf. Wenn Ihre Airflow-Aufgaben idempotent sind und Sie Wiederholungsversuche konfiguriert haben, haben sie keine Auswirkungen auf Sie. Sie können auch Wartungsfenster definieren.
Ein weiterer Grund für solche Fehler kann der Mangel an Ressourcen im Cluster Ihrer Umgebung sein. In solchen Fällen können Sie Ihre Umgebung wie in den Anleitungen Umgebungen skalieren oder Umgebung optimieren beschrieben skalieren oder optimieren.
Eine DAG-Ausführung ist als erfolgreich gekennzeichnet, es wurden aber keine Aufgaben ausgeführt
Wenn das execution_date
eines DAG-Laufs vor dem start_date
des DAG liegt, sehen Sie möglicherweise DAG-Ausführungen, die keine Aufgabenausführungen haben, aber trotzdem als erfolgreich gekennzeichnet sind.

Ursache
Das kann in den folgenden Fällen passieren:
Die Abweichung ist auf die Zeitzonendifferenz zwischen
execution_date
undstart_date
des DAG zurückzuführen. Das kann beispielsweise passieren, wenn Siependulum.parse(...)
verwenden, umstart_date
festzulegen.start_date
der DAG ist auf einen dynamischen Wert festgelegt, z. B.airflow.utils.dates.days_ago(1)
.
Lösung
Achten Sie darauf, dass in
execution_date
undstart_date
dieselbe Zeitzone verwendet wird.Geben Sie ein statisches
start_date
an und kombinieren Sie es mitcatchup=False
, um zu vermeiden, dass DAGs mit vergangenen Startdaten ausgeführt werden.
Ein DAG ist in der Airflow-Benutzeroberfläche oder der DAG-Benutzeroberfläche nicht sichtbar und wird vom Planer nicht geplant
Der DAG-Prozessor parst jeden DAG, bevor er vom Planer geplant werden kann und bevor er in der Airflow-UI oder DAG-UI sichtbar wird.
Mit den folgenden Airflow-Konfigurationsoptionen werden Zeitüberschreitungen für das Parsen von DAGs definiert:
[core]dagrun_import_timeout
definiert, wie viel Zeit der DAG-Prozessor zum Parsen eines einzelnen DAG hat.[core]dag_file_processor_timeout
definiert die Gesamtzeit, die der DAG-Prozessor für das Parsen aller DAGs aufwenden kann.
Wenn ein DAG nicht in der Airflow-UI oder DAG-UI angezeigt wird:
Prüfen Sie in den Logs des DAG-Prozessors, ob der DAG-Prozessor Ihren DAG korrekt verarbeiten kann. Bei Problemen werden in den DAG-Prozessor- oder Scheduler-Logs möglicherweise die folgenden Logeinträge angezeigt:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for /usr/local/airflow/dags/example_dag.py with PID 21903 started at 2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
Prüfen Sie die Scheduler-Logs, um festzustellen, ob der Scheduler ordnungsgemäß funktioniert. Bei Problemen werden in den Scheduler-Logs möglicherweise die folgenden Logeinträge angezeigt:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it Process timed out, PID: 68496
Lösungen:
Beheben Sie alle Fehler beim DAG-Parsing. Der DAG-Prozessor parst mehrere DAGs. In seltenen Fällen können Parsingfehler bei einem DAG sich negativ auf das Parsen anderer DAGs auswirken.
Wenn das Parsen Ihres DAG länger als die in
[core]dagrun_import_timeout
angegebene Anzahl von Sekunden dauert, erhöhen Sie dieses Zeitlimit.Wenn das Parsen aller DAGs länger als die in
[core]dag_file_processor_timeout
angegebene Anzahl von Sekunden dauert, erhöhen Sie diese Zeitüberschreitung.Wenn das Parsen Ihres DAGs lange dauert, kann das auch daran liegen, dass er nicht optimal implementiert ist. Das ist beispielsweise der Fall, wenn viele Umgebungsvariablen gelesen oder externe Dienste oder die Airflow-Datenbank aufgerufen werden. Vermeiden Sie möglichst solche Vorgänge in globalen Abschnitten von DAGs.
Erhöhen Sie die CPU- und Arbeitsspeicherressourcen für den DAG-Prozessor, damit er schneller arbeiten kann.
Symptome für eine hohe Auslastung der Airflow-Datenbank
Weitere Informationen finden Sie unter Symptome für eine überlastete Airflow-Datenbank.
Nächste Schritte
- Fehlerbehebung bei der Installation von PyPI-Paketen
- Fehlerbehebung bei Umgebungsupdates und -upgrades