Cloud Composer 1 befindet sich im Modus „Nach der Wartung“. Google veröffentlicht keine weiteren Updates für Cloud Composer 1, einschließlich neuer Versionen von Airflow sowie Fehlerkorrekturen und Sicherheitsupdates. Wir empfehlen die Migration zu Cloud Composer 2.
Diese Seite enthält Schritte zur Fehlerbehebung und Informationen für gängige Workflows
Probleme.
Viele DAG-Ausführungsprobleme werden durch eine nicht optimale Umgebungsleistung verursacht.
Sie können Ihre Cloud Composer 2-Umgebung optimieren, indem Sie der Anleitung Optimize
Leitfaden zu Umgebungsleistung und -kosten.
Einige Probleme bei der DAG-Ausführung können durch den Airflow-Planer verursacht werden
nicht richtig oder optimal funktioniert. Bitte folgen Sie
Anleitung zur Fehlerbehebung für den Planer
um diese Probleme zu lösen.
Airflow ist ein verteiltes System mit vielen Entitäten wie Planer, Executor,
Worker, die über eine Aufgabenwarteschlange und den Airflow miteinander kommunizieren
und Signale senden (wie SIGTERM). Das folgende Diagramm zeigt eine
Übersicht über Verbindungen zwischen Airflow-Komponenten.
Abbildung 1. Interaktion zwischen Airflow-Komponenten (zum Vergrößern klicken)
In einem verteilten System wie Airflow gibt es möglicherweise
oder bei der zugrunde liegenden Infrastruktur zeitweise Probleme auftreten.
Dies kann zu Situationen führen, in denen Aufgaben fehlschlagen und neu geplant werden können.
oder Aufgaben nicht erfolgreich abgeschlossen werden (z. B. Zombie-
oder Aufgaben, deren Ausführung hängengeblieben ist). Airflow verfügt über Mechanismen,
in solchen Situationen und nimmt automatisch wieder den normalen Betrieb wieder auf. Folge ich
werden häufige Probleme bei der Ausführung von Aufgaben mit Airflow erläutert:
Zombie-Aufgaben, Giftpillen und SIGTERM-Signale.
Fehlerbehebung bei Zombie-Aufgaben
Airflow erkennt zwei Arten von Diskrepanzen zwischen einer Aufgabe und einem Prozess, der ausgeführt wird
die Aufgabe:
Zombie-Aufgaben sind Aufgaben, die eigentlich ausgeführt werden sollten, aber nicht
ausgeführt wird. Dies kann passieren, wenn der Prozess der Aufgabe beendet wurde oder nicht
antworten, wenn der Airflow-Worker den Aufgabenstatus nicht rechtzeitig gemeldet hat
weil sie überlastet ist oder die VM, auf der die Aufgabe ausgeführt wird, heruntergefahren wurde.
Airflow findet solche Aufgaben regelmäßig und schlägt entweder fehl oder wiederholt die Aufgabe.
je nach den Einstellungen der Aufgabe.
Untote Aufgaben sind Aufgaben, die normalerweise nicht ausgeführt werden. Airflow-Ergebnisse
und beendet sie regelmäßig.
Die häufigsten Gründe und Lösungen für Zombie-Aufgaben sind unten aufgeführt.
Nicht genügend Arbeitsspeicher für Airflow-Worker
Jeder Airflow-Worker könnte bis zu [celery]worker_concurrency Aufgabeninstanzen ausführen
gleichzeitig. Wenn der kumulative Arbeitsspeicherverbrauch dieser Aufgabeninstanzen
das Arbeitsspeicherlimit für einen Airflow-Worker überschreitet, wird ein zufälliger Prozess darauf ausgeführt.
um Ressourcen freizugeben.
Ereignisse mit unzureichendem Arbeitsspeicher in Airflow-Workern ermitteln
resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")
Aktualisieren Sie in Cloud Composer 2-Versionen vor 2.6.0
[celery]worker_concurrency unter Verwendung der aktuellen Formel, wenn
ist dieser Wert niedriger.
Pod-Bereinigungen sind ein normaler Teil der Ausführung von Arbeitslasten in Kubernetes.
GKE entfernt Pods, wenn kein Speicherplatz mehr vorhanden ist oder sie freigegeben werden.
und Ressourcen für Arbeitslasten
mit höherer Priorität bereitstellen.
Wenn eine Bereinigung durch zu wenig Speicherplatz verursacht wird, können Sie den Speicherplatz reduzieren
verwenden oder temporäre Dateien entfernen, sobald sie nicht mehr benötigt werden.
Alternativ können Sie
verfügbaren Speicher erweitern oder ausführen
Arbeitslasten in einem dedizierten Pod mithilfe von KubernetesPodOperator.
Airflow-Worker wurde beendet
Airflow-Worker können extern entfernt werden. Wenn derzeit ausgeführte Aufgaben
nach einer ordnungsgemäßen Kündigung beendet werden, werden sie unterbrochen und
als Zombies erkannt werden.
Sie können Wartungsfenster angeben, um
sich mit der Ausführung der kritischen Aufgaben überschneidet.
In Cloud Composer 2-Versionen vor 2.4.5: ein beendeter Airflow
kann der Worker das SIGTERM-Signal ignorieren und mit der Ausführung von Aufgaben fortfahren:
Herunterskalieren durch Composer-Autoscaling kennenlernen
Sie können ein Upgrade auf eine spätere Cloud Composer-Version durchführen, bei der diese
wurde behoben.
Airflow-Worker war stark ausgelastet
Die für einen Airflow-Worker verfügbare Menge an CPU- und Arbeitsspeicherressourcen ist begrenzt
der Umgebungskonfiguration. Wenn sich eine Auslastung den Limits nähert,
Dies würde zu Ressourcenkonflikten und unnötigen Verzögerungen während der Aufgabe führen.
Ausführung. In extremen Situationen, in denen während längerer Zeiträume nicht genügend Ressourcen zur Verfügung stehen
kann dies zu Zombie-Aufgaben führen.
Eine Datenbank wird von verschiedenen Airflow-Komponenten verwendet, um miteinander zu kommunizieren.
um Heartbeats von Aufgabeninstanzen zu speichern. Ressourcenmangel auf der
führen zu längeren Abfragezeiten
und können die Ausführung von Aufgaben beeinträchtigen.
Airflow-Datenbank war vorübergehend nicht verfügbar
Es kann einige Zeit dauern, bis Airflow-Worker zeitweise auftretende
wie temporäre Verbindungsprobleme. Möglicherweise wird der Standardwert überschritten
Schwellenwert für die Zombie-Erkennung.
Airflow-Heartbeat-Timeouts
resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"
Lösungen:
Erhöhen Sie das Zeitlimit für Zombie-Aufgaben und überschreiben Sie
Wert von [scheduler]scheduler_zombie_task_threshold
Airflow-Konfigurationsoption:
Bereich
Schlüssel
Wert
Hinweise
scheduler
scheduler_zombie_task_threshold
Neu
Zeitlimit
(in
Sekunden)
Standardeinstellung
Wert ist
300
Fehlerbehebung bei der Giftpille
Poison Pill ist ein Mechanismus, mit dem Airflow-Aufgaben beendet werden.
Airflow verwendet Giftpille in folgenden Situationen:
Wenn ein Planer eine Aufgabe beendet, die nicht rechtzeitig abgeschlossen wurde.
Wenn eine Aufgabe das Zeitlimit überschreitet oder zu lange ausgeführt wird.
Wenn Airflow Poison Pill verwendet, können Sie in den Logs eines Airflow-Workers, der die Aufgabe ausgeführt hat, die folgenden Logeinträge sehen:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Taking the poison pill.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Mögliche Lösungen:
Überprüfen Sie den Aufgabencode auf Fehler, die eine zu lange Ausführungszeit verursachen können.
Erhöhen Sie den Wert der
Airflow-Konfiguration für [celery_broker_transport_options]visibility-timeout
Option.
Daher wartet der Planer länger auf den Abschluss einer Aufgabe.
bevor er sie als Zombieaufgabe erachtet. Diese Option eignet sich besonders
nützlich für zeitaufwendige Aufgaben, die viele Stunden dauern. Wenn
der Wert zu niedrig ist (z. B. 3 Stunden), berücksichtigt der Planer
Aufgaben, die 5 oder 6 Stunden lang als „aufgehängt“ ausgeführt werden (Zombie-Aufgaben).
Wert von [core]killed_task_cleanup_time-Airflow erhöhen
Konfigurationsoption.
Mit einem längeren Wert haben Airflow-Worker mehr Zeit für die Ausführung ihrer Aufgaben.
anmutig. Wenn der Wert zu niedrig ist, werden Airflow-Aufgaben möglicherweise unterbrochen
abrupt, ohne genug Zeit zu haben, um ihre Arbeit anmutig zu beenden.
Fehlerbehebung bei SIGTERM-Signalen
SIGTERM-Signale werden von Linux verwendet,
Kubernetes, Airflow-Planer und Celery zum Beenden von Prozessen, die für
oder Airflow-Workern oder Airflow-Tasks.
Es kann mehrere Gründe geben, warum SIGTERM-Signale in einer Umgebung gesendet werden:
Eine Aufgabe wurde zur Zombieaufgabe und muss gestoppt werden.
Der Planer entdeckt ein Duplikat einer Aufgabe und schickt die Giftpille und
SIGTERM signalisiert der Aufgabe, sie zu stoppen.
Unter Horizontales Pod-Autoscaling hat die GKE
Die Steuerungsebene sendet SIGTERM-Signale, um nicht mehr vorhandene Pods zu entfernen
erforderlich.
Der Planer kann SIGTERM-Signale an den DagFileProcessorManager-Prozess senden.
Solche SIGTERM-Signale werden vom Planer zur Verwaltung
DagFileProcessorManager-Prozesslebenszyklus und kann ignoriert werden.
Beispiel:
Launched DagFileProcessorManager with pid: 353002
Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
Sending the signal Signals.SIGTERM to group 353002
Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Race-Bedingung zwischen dem Heartbeat-Callback und den Exit-Callbacks im
local_task_job, der die Ausführung der Aufgabe überwacht. Wenn der Herzschlag
erkennt, dass eine Aufgabe als erfolgreich markiert wurde, kann jedoch nicht unterscheiden,
die Aufgabe selbst erfolgreich war oder dass Airflow angewiesen wurde, die Aufgabe
erfolgreich war. Trotzdem wird ein Task-Runner beendet, ohne zu warten.
damit es beendet wird.
Solche SIGTERM-Signale können ignoriert werden. Die Aufgabe befindet sich bereits im
und die Ausführung des gesamten DAG wird nicht
betroffen sind.
Der Logeintrag Received SIGTERM. ist der einzige Unterschied zwischen dem regulären
Exit und Beendigung der Aufgabe im Status „Erfolgreich“.
<ph type="x-smartling-placeholder"></ph>
Abbildung 2: Race-Bedingung zwischen Heartbeat und Exit-Callback (zum Vergrößern klicken)
Eine Airflow-Komponente nutzt mehr Ressourcen (CPU, Arbeitsspeicher) als vom
Clusterknoten.
Der GKE-Dienst führt Wartungsvorgänge
sendet SIGTERM-Signale an Pods, die auf einem Knoten ausgeführt werden, der demnächst aktualisiert wird.
Wenn eine Aufgabeninstanz mit SIGTERM beendet wird, wird das folgende Log angezeigt:
Einträge in den Logs eines Airflow-Workers, der die Aufgabe ausgeführt hat:
{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception
Mögliche Lösungen:
Dieses Problem tritt auf, wenn eine VM, auf der die Aufgabe ausgeführt wird, nicht genügend Arbeitsspeicher hat. Dies ist nicht
die auf die Airflow-Konfigurationen, sondern auf den verfügbaren Arbeitsspeicher
VM:
Das Erhöhen des Arbeitsspeichers hängt von der Cloud Composer-Version ab
die Sie verwenden. Beispiel:
In Cloud Composer 2 können Sie Airflow weitere CPU- und Arbeitsspeicherressourcen zuweisen
Arbeiter.
Bei Cloud Composer 1 können Sie die Umgebung mithilfe eines
Maschinentyp mit mehr Leistung.
In beiden Versionen von Cloud Composer können Sie den Wert
Die Airflow-Konfigurationsoption für Nebenläufigkeit [celery]worker_concurrency.
Diese Option bestimmt, wie viele Aufgaben gleichzeitig von einer bestimmten
Airflow-Worker
Cloud Logging-Abfragen zur Ermittlung von Gründen für Pod-Neustarts oder -Bereinigungen
Cloud Composer-Umgebungen verwenden GKE-Cluster als Computing-Infrastruktur
Ebene. In diesem Abschnitt finden Sie nützliche Abfragen,
Gründe für Neustarts oder Bereinigungen von Airflow-Workern oder Airflow-Planern ermitteln.
Die unten dargestellten Abfragen könnten folgendermaßen optimiert werden:
können Sie einen für Sie interessanten Zeitplan in Cloud Logging festlegen.
z. B. die letzten 6 Stunden oder 3 Tage. Sie können auch einen benutzerdefinierten Zeitraum festlegen.
sollten Sie Cloud Composer
CLUSTER_NAME
Sie können die Suche auch auf einen bestimmten Pod beschränken, indem Sie den Parameter
POD_NAME
Neu gestartete Container ermitteln
resource.type="k8s_node"
log_id("kubelet")
jsonPayload.MESSAGE:"will be restarted"
resource.labels.cluster_name="CLUSTER_NAME"
Alternative Abfrage, um die Ergebnisse auf einen bestimmten Pod zu beschränken:
resource.type="k8s_node"
log_id("kubelet")
jsonPayload.MESSAGE:"will be restarted"
resource.labels.cluster_name="CLUSTER_NAME"
"POD_NAME"
Erkennen, dass Container aufgrund eines Ereignisses des unzureichenden Arbeitsspeichers heruntergefahren wurden
resource.type="k8s_node"
log_id("events")
(jsonPayload.reason:("OOMKilling" OR "SystemOOM")
OR jsonPayload.message:("OOM encountered" OR "out of memory"))
severity=WARNING
resource.labels.cluster_name="CLUSTER_NAME"
Alternative Abfrage, um die Ergebnisse auf einen bestimmten Pod zu beschränken:
resource.type="k8s_node"
log_id("events")
(jsonPayload.reason:("OOMKilling" OR "SystemOOM")
OR jsonPayload.message:("OOM encountered" OR "out of memory"))
severity=WARNING
resource.labels.cluster_name="CLUSTER_NAME"
"POD_NAME"
Container ermitteln, die nicht mehr ausgeführt werden
Auswirkungen von Aktualisierungs- oder Upgradevorgängen auf die Ausführung von Airflow-Aufgaben
Durch Aktualisierungs- oder Upgradevorgänge werden
aktuell ausgeführte Airflow-Aufgaben unterbrochen,
es sei denn, eine Aufgabe wird im deferrable-Modus ausgeführt.
Wir empfehlen, diese Vorgänge dann durchzuführen, wenn Sie minimale Auswirkungen erwarten
und richten Sie entsprechende Wiederholungsmechanismen in der
DAGs und Aufgaben.
Fehlerbehebung bei KubernetesExecutor-Aufgaben
CeleryKubernetesExecutor ist ein Executor-Typ in Cloud Composer 3
die CeleryExecutor und KubernetesExecutor gleichzeitig verwenden können,
.
Weitere Informationen finden Sie auf der Seite CeleryKubernetesExecutor verwenden.
Informationen zur Fehlerbehebung bei Aufgaben, die mit KubernetesExecutor ausgeführt werden.
Allgemeine Probleme
In den folgenden Abschnitten werden Symptome und mögliche Lösungen für einige häufig auftretende DAG-Probleme beschrieben.
Airflow-Task wurde von Negsignal.SIGKILL unterbrochen
Manchmal beansprucht Ihre Aufgabe mehr Arbeitsspeicher, als dem Airflow-Worker zugewiesen ist.
In einer solchen Situation kann es zu einer Unterbrechung durch Negsignal.SIGKILL kommen. Das System
sendet dieses Signal, um
weiteren Speicherverbrauch zu vermeiden,
anderer Airflow-Aufgaben ausgeführt werden. Im Log des Airflow-Workers sehen Sie
folgenden Logeintrag:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL kann auch als Code -9 angezeigt werden.
Mögliche Lösungen:
Niedrigere worker_concurrency von Airflow-Workern.
Erhöhen Sie im Fall von Cloud Composer 2 den Arbeitsspeicher der Airflow-Worker.
Führen Sie für Cloud Composer 1 ein Upgrade auf einen größeren Maschinentyp aus, der in
Cloud Composer-Cluster.
Optimieren Sie Ihre Aufgaben, um weniger Arbeitsspeicher zu verbrauchen.
Verwalten Sie ressourcenintensive Aufgaben in Cloud Composer mit
KubernetesPodOperator
oder GKEStartPodOperator für
Aufgabenisolierung und
benutzerdefinierte Ressourcenzuweisung.
Aufgabe schlägt fehl, ohne Logs aufgrund von DAG-Parsing-Fehlern auszugeben
Manchmal können subtile DAG-Fehler auftreten,
Ein Airflow-Planer und DAG-Prozessor können Aufgaben für die Ausführung planen
und eine DAG-Datei zu parsen, aber der Airflow-Worker kann Aufgaben nicht ausführen
aus einem solchen DAG, da die Python-DAG-Datei
Programmierfehler enthält. Dies könnte
zu einer Situation führen, in der eine Airflow-Aufgabe als Failed markiert ist
und es gibt kein Protokoll
für die Ausführung.
Lösungen:
Prüfen Sie in den Airflow-Worker-Logs, dass keine Fehler auftreten,
Airflow-Worker im Zusammenhang mit fehlenden DAG- oder DAG-Parsing-Fehlern.
Anzahl der Parameter für das DAG-Parsing erhöhen:
Erhöhung
dagbag-import-timeout
auf mindestens 120 Sekunden (oder länger, falls erforderlich).
Erhöhung
dag-file-processor-timeout
auf mindestens 180 Sekunden (oder mehr, falls erforderlich). Dieser Wert muss
höher als dagbag-import-timeout.
Aufgabe schlägt fehl, ohne dass Logs aufgrund von Ressourcenmangel ausgegeben werden
Symptom: Während der Ausführung einer Aufgabe ist der Unterprozess des Airflow-Workers für
für die Airflow-Taskausführung abrupt unterbrochen wird.
Der im Log des Airflow-Workers angezeigte Fehler sieht in etwa so aus:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Lösung:
Erstellen Sie in Cloud Composer 1 eine neue Umgebung mit
einen größeren Maschinentyp als die aktuelle Maschine.
Typ. Erwägen Sie, Ihrer Umgebung mehr Knoten hinzuzufügen und den Wert [celery]worker_concurrency für Ihre Worker zu senken.
Aufgabe schlägt fehl, ohne dass Logs aufgrund der Pod-Bereinigung ausgegeben werden
Google Kubernetes Engine-Pods unterliegen den
Kubernetes-Pod-Lebenszyklus und Pod-Entfernung Aufgabe
Spitzen und eine gemeinsame Planung von Workern sind zwei der häufigsten Ursachen für die Bereinigung von Pods
in Cloud Composer.
Eine Pod-Bereinigung kann auftreten, wenn ein bestimmter Pod Ressourcen eines Knotens relativ zu den konfigurierten Erwartungen in Sachen Ressourcenverbrauch für den Knoten übernutzt. Für
z. B. wenn mehrere speicherintensive Aufgaben
in einem Pod ausgeführt werden,
und ihre kombinierte Last führt dazu,
dass der Knoten, auf dem dieser Pod ausgeführt wird,
die Arbeitsspeichernutzungsgrenze.
Wenn ein Airflow-Worker-Pod bereinigt wird, werden alle auf diesem Pod ausgeführten Aufgabeninstanzen unterbrochen und später von Airflow als fehlgeschlagen markiert.
Logs werden zwischengespeichert. Wenn ein Worker-Pod entfernt wird, bevor der Zwischenspeicher geleert wurde, werden keine Logs ausgegeben. Ein Aufgabenfehler ohne Logs ist ein Hinweis darauf, dass die Airflow-Worker aufgrund von zu wenig Arbeitsspeicher neu gestartet werden. In Cloud Logging sind möglicherweise einige Logs vorhanden, obwohl die Airflow-Logs nicht ausgegeben wurden.
So können die Logs angezeigt werden:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung.
Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf.
Logs einzelner Worker unter Alle Logs ansehen ->
Airflow-Logs -> Worker -> (einzelner Worker).
Die DAG-Ausführung ist arbeitsspeicherbegrenzt. Jede Ausführung einer Aufgabe beginnt mit zwei Airflow-Prozessen: Ausführung und Monitoring. Jeder Knoten kann bis zu 6 Aufgaben gleichzeitig ausführen (ungefähr 12 Prozesse mit Airflow-Modulen).
Je nach Art des DAG wird möglicherweise mehr Arbeitsspeicher belegt.
Symptom:
Rufen Sie in der Google Cloud Console die Seite Arbeitslasten auf.
Wenn airflow-worker-Pods vorhanden sind, die Evicted anzeigen, klicken Sie auf die einzelnen bereinigten Pods und suchen Sie oben im Fenster nach folgender Meldung: The node was low on resource: memory.
Lösung:
Erstellen Sie in Cloud Composer 1 eine neue Cloud Composer-Umgebung mit
einen größeren Maschinentyp als die aktuelle Maschine.
Typ.
Prüfen Sie die Logs von airflow-worker-Pods auf mögliche Ursachen für die Bereinigung. Weitere Informationen
Informationen zum Abrufen von Logs aus einzelnen Pods finden Sie unter
Fehlerbehebung bei bereitgestellten Arbeitslasten
Achten Sie darauf, dass die Aufgaben im DAG idempotent und wiederholbar sind.
Vermeiden Sie das Herunterladen unnötiger Dateien auf das lokale Dateisystem der Airflow-Worker.
Airflow-Worker haben eine begrenzte lokale Dateisystemkapazität. Beispiel:
Cloud Composer 2, ein Worker kann 1 GB bis 10 GB Speicherplatz haben. Wenn der Parameter
nicht genügend Speicherplatz vorhanden ist, wird der Airflow-Worker-Pod
GKE-Steuerungsebene. Dadurch schlägt alle bereinigten Aufgaben fehl.
ausgeführt wurde.
Beispiele für problematische Vorgänge:
Dateien oder Objekte herunterladen und lokal in einem Airflow speichern
Worker. Speichern Sie diese Objekte stattdessen direkt in einem geeigneten Dienst, z. B. in einem Cloud Storage-Bucket.
Über einen Airflow-Worker auf große Objekte im Ordner /data zugreifen
Der Airflow-Worker lädt das Objekt in sein lokales Dateisystem herunter. Implementieren Sie stattdessen Ihre DAGs, damit große Dateien außerhalb des Airflow-Worker-Pods verarbeitet werden.
Zeitüberschreitung beim Laden des DAG
Symptom:
Auf der Airflow-Weboberfläche wird oben auf der Seite mit der Liste der DAGs eine rote Warnung angezeigt.
Feld zeigt Broken DAG: [/path/to/dagfile] Timeout.
In Cloud Monitoring: Die airflow-scheduler-Logs enthalten Einträge
ähnlich wie:
ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
Lösung:
dag_file_processor_timeout-Airflow überschreiben
Konfigurationsoption und lassen Sie mehr Zeit für das DAG-Parsing:
Bereich
Schlüssel
Wert
core
dag_file_processor_timeout
Neuer Wert für die Zeitüberschreitung
DAG-Ausführung endet nicht innerhalb der erwarteten Zeit
Symptom:
Manchmal endet eine DAG-Ausführung nicht, weil Airflow-Aufgaben hängen bleiben und DAG ausgeführt werden
länger dauert als erwartet. Unter normalen Bedingungen bleiben
im Status „Warteschlange“ oder „Wird ausgeführt“ unbegrenzt, da Airflow
Bereinigungsverfahren, die dazu beitragen,
diese Situation zu vermeiden.
Lösung:
Verwenden Sie die Methode
dagrun_timeout
-Parameter für die DAGs. Beispiel:
dagrun_timeout=timedelta(minutes=120). Daher muss jede DAG-Ausführung
die innerhalb des Zeitlimits für die DAG-Ausführung abgeschlossen wurden und nicht abgeschlossene Aufgaben
als Failed oder Upstream Failed. Weitere Informationen zum Airflow-Taskstatus finden Sie unter
Apache Airflow-Dokumentation
Verwenden Sie die Methode
Zeitlimit für Aufgabenausführung
Parameter zum Definieren eines Standardzeitlimits für Aufgaben, die auf Apache-Basis ausgeführt werden
Airflow-Operatoren.
DAG-Ausführungen nicht ausgeführt
Symptom:
Wenn ein Planungsdatum für einen DAG dynamisch festgelegt wird, kann dies zu verschiedenen
unerwartete Nebenwirkungen. Beispiel:
Eine DAG-Ausführung liegt immer in der Zukunft und der DAG wird nie ausgeführt.
Frühere DAG-Ausführungen werden als ausgeführt und erfolgreich markiert, obwohl sie
ausgeführt haben.
Legen Sie den statischen start_date für DAGs fest. Optional können Sie catchup=False verwenden
um die Ausführung des DAG für vergangene Zeiträume zu deaktivieren.
Verwenden Sie datetime.now() oder days_ago(<number of days>) nur, wenn
sich der Nebenwirkungen dieses Ansatzes bewusst.
Erhöhter Netzwerktraffic zur und von der Airflow-Datenbank
Die Menge des Netzwerk-Traffics zwischen dem GKE-Cluster Ihrer Umgebung und der Airflow-Datenbank hängt von der Anzahl der DAGs, der Anzahl der Aufgaben in DAGs und der Art des Zugriffs der DAGs auf Daten in der Airflow-Datenbank ab. Folgende Faktoren können sich auf die Netzwerknutzung auswirken:
Abfragen an die Airflow-Datenbank Wenn Ihre DAGs viele Abfragen ausführen, generieren sie große Mengen an Traffic. Beispiele: Status prüfen, bevor andere Aufgaben ausgeführt werden, XCom-Tabelle abfragen und Inhalt der Airflow-Datenbank auslesen.
Große Anzahl von Aufgaben Je mehr Aufgaben geplant sind, desto mehr Netzwerktraffic wird generiert. Dieser Aspekt gilt sowohl für die Gesamtzahl der Aufgaben in Ihren DAGs als auch für die Planungshäufigkeit. Wenn der Airflow-Planer DAG-Ausführungen plant, sendet er Abfragen an die Airflow-Datenbank und generiert Traffic.
Die Airflow-Weboberfläche generiert Netzwerkverkehr, da sie Abfragen an die Airflow-Datenbank sendet. Die intensive Verwendung von Seiten mit Grafiken, Aufgaben und Diagrammen kann große Mengen an Netzwerkverkehr generieren.
DAG führt zum Absturz des Airflow-Webservers oder verursacht einen 502 gateway timeout-Fehler
Webserverausfälle können aus verschiedenen Gründen auftreten. Prüfen
Die airflow-webserver-Logs
Cloud Logging, um die Ursache des Fehlers
502 gateway timeout Fehler.
Komplexe Berechnungen
Dieser Abschnitt gilt nur für Cloud Composer 1.
Führen Sie keine komplexen Berechnungen während der DAG-Analyse durch.
Im Gegensatz zu den Worker- und Planerknoten, deren Maschinentypen
größere CPU- und Arbeitsspeicherkapazität haben, verwendet der Webserver einen festen Maschinentyp,
was zu Fehlern beim DAG-Parsing führen kann, wenn die
sehr schwer.
Beachten Sie, dass der Webserver 2 vCPUs und 2 GB Arbeitsspeicher hat.
Der Standardwert für core-dagbag_import_timeout beträgt 30 Sekunden. Dieser Zeitlimitwert ist die Obergrenze für den Zeitraum, den Airflow für das Laden eines Python-Moduls in den Ordner dags/ benötigt.
Falsche Berechtigungen
Dieser Abschnitt gilt nur für Cloud Composer 1.
Der Webserver wird nicht unter demselben Dienstkonto ausgeführt wie die Worker und der Planer. Worker und der Planer können somit gegebenenfalls auf vom Nutzer verwaltete Ressourcen zugreifen, auf die der Webserver keinen Zugriff hat.
Vermeiden Sie den Zugriff auf nicht öffentliche Ressourcen während der DAG-Analyse. Sollte dies nicht möglich sein, müssen Sie dem Dienstkonto des Webservers Berechtigungen erteilen. Der Name des Dienstkontos wird von der Webserverdomain abgeleitet. Wenn beispielsweise die Domain
ist example-tp.appspot.com, das Dienstkonto ist
example-tp@appspot.gserviceaccount.com.
DAG-Fehler
Dieser Abschnitt gilt nur für Cloud Composer 1.
Der Webserver wird in App Engine ausgeführt und ist vom GKE-Cluster Ihrer Umgebung getrennt. Der Webserver parst die DAG-Definitionsdateien. Wenn dabei Fehler im DAG auftreten, kann es zum 502 gateway timeout kommen. Airflow funktioniert normal ohne einen funktionierenden Webserver, wenn der
Der problematische DAG beeinträchtigt keine in GKE ausgeführten Prozesse.
In diesem Fall können Sie mit gcloud composer environments run abrufen,
Details aus Ihrer Umgebung und als Behelfslösung, falls der Webserver
nicht verfügbar.
In anderen Fällen können Sie die DAG-Analyse in GKE ausführen und nach DAGs suchen, die schwerwiegende Python-Ausnahmen oder eine Zeitüberschreitung auslösen (Standardeinstellung: 30 Sekunden).
Stellen Sie zur Fehlerbehebung eine Verbindung zu einer Remote-Shell in einem Airflow-Worker-Container her und testen Sie, ob Syntaxfehler vorliegen. Weitere Informationen finden Sie unter DAGs testen.
Eine große Anzahl von DAGs und Plug-ins in dags- und Plug-in-Ordnern verarbeiten
Der Inhalt der Ordner „/dags“ und „/plugins“ wird synchronisiert von
lokalen Dateisysteme von Airflow-Workern und
Planer.
Je mehr Daten in diesen Ordnern gespeichert sind, desto länger dauert es,
Synchronisierung. Gehen Sie dazu folgendermaßen vor:
Begrenzen Sie die Anzahl der Dateien in den Ordnern „/dags“ und „/plugins“. Nur die
mindestens erforderliche Dateien.
Erhöhen Sie nach Möglichkeit den Speicherplatz für Airflow-Planer und
Arbeiter.
Erhöhen Sie nach Möglichkeit CPU und Arbeitsspeicher von Airflow-Planern und -Workern, damit
dass die Synchronisierung schneller durchgeführt wird.
Bei einer sehr großen Anzahl von DAGs können Sie DAGs in Batches aufteilen,
in ZIP-Archiven und im Ordner /dags bereitstellen.
Dieser Ansatz beschleunigt den Synchronisierungsprozess von DAGs. Airflow-Komponenten
ZIP-Archive vor der Verarbeitung von DAGs dekomprimieren.
Auch das Generieren von DAGs in einer programmatischen
Die Anzahl der im Ordner /dags gespeicherten DAG-Dateien.
Lesen Sie den Abschnitt zu programmatischen DAGs, um zu vermeiden,
Probleme bei der Planung und Ausführung von programmatisch generierten DAGs.
Programmatisch generierte DAGs nicht gleichzeitig planen
Das programmatische Generieren von DAG-Objekten aus einer DAG-Datei ist eine effiziente Methode
zum Erstellen vieler ähnlicher DAGs,
die sich nur geringfügig unterscheiden.
Es ist wichtig, die Ausführung all dieser DAGs nicht sofort einzuplanen. Es
besteht eine hohe Wahrscheinlichkeit, dass die Airflow-Worker nicht genügend CPU und Arbeitsspeicher haben
Ressourcen zur Ausführung aller zur gleichen Zeit geplanten Aufgaben.
So vermeiden Sie Probleme bei der Planung programmatischer DAGs:
Erhöhen Sie die Nebenläufigkeit der Worker und skalieren Sie Ihre Umgebung, damit sie
um mehr Aufgaben gleichzeitig auszuführen.
DAGs so generieren, dass die Zeitpläne gleichmäßig über einen bestimmten Zeitraum verteilt werden,
müssen Sie nicht Hunderte von Aufgaben gleichzeitig planen, damit die Airflow-Worker
alle geplanten Aufgaben ausführen können.
Die Lost connection to Postgres server during query-Ausnahme wird während der Ausführung der Aufgabe oder unmittelbar danach ausgelöst.
Lost connection to Postgres server during query Ausnahmen
treten häufig auf, wenn die folgenden Bedingungen erfüllt sind:
Ihr DAG verwendet PythonOperator oder einen benutzerdefinierten Operator.
Ihr DAG stellt Abfragen an die Airflow-Datenbank.
Wenn mehrere Abfragen von einer aufrufbaren Funktion gesendet werden, verweisen Tracebacks möglicherweise fälschlicherweise auf die Zeile self.refresh_from_db(lock_for_update=True) im Airflow-Code. Sie ist die erste Datenbankabfrage nach der Aufgabenausführung. Die tatsächliche Ursache der Ausnahme tritt davor auf, wenn eine SQLAlchemy-Sitzung nicht ordnungsgemäß geschlossen wird.
SQLAlchemy-Sitzungen sind auf einen Thread beschränkt und werden in einer aufrufbaren Funktionssitzung erstellt, die später innerhalb des Airflow-Codes fortgesetzt werden kann. Wenn es signifikante
Verzögerungen zwischen Abfragen innerhalb einer Sitzung auftreten, ist die Verbindung möglicherweise bereits
durch den Postgres-Server geschlossen. Das Zeitlimit für die Verbindung in Cloud Composer-Umgebungen ist auf etwa zehn Minuten festgelegt.
Lösung:
Verwenden Sie den Decorator airflow.utils.db.provide_session. Dieser Decorator stellt eine gültige Sitzung für die Airflow-Datenbank im session-Parameter bereit und schließt die Sitzung am Ende der Funktion ordnungsgemäß.
Verwenden Sie keine einzelne Funktion mit langer Ausführungszeit. Verschieben Sie stattdessen alle Datenbankabfragen in separate Funktionen, sodass es mehrere Funktionen mit dem Decorator airflow.utils.db.provide_session gibt. In diesem Fall werden Sitzungen nach dem Abrufen von Abfrageergebnissen automatisch geschlossen.
Ausführungszeit von DAGs, Aufgaben und parallelen Ausführungen desselben DAG steuern
Wenn Sie steuern möchten, wie lange eine einzelne DAG-Ausführung für einen bestimmten DAG dauert
bleibt, können Sie
den DAG-Parameter dagrun_timeout,
also. Wenn Sie z. B. erwarten, dass ein einzelner DAG (unabhängig davon, ob
Ausführung mit Erfolg oder Misserfolg) darf nicht länger als eine Stunde dauern,
legen Sie diesen Parameter auf 3.600 Sekunden fest.
Sie können auch festlegen, wie lange eine einzelne Airflow-Aufgabe dauern soll. Aufgabe
Sie können also execution_timeout verwenden.
Wenn Sie steuern möchten, wie viele aktive DAG-Ausführungen
bestimmten DAG verwenden, können Sie den [core]max-active-runs-per-dagAirflow-Konfigurationsoption dazu.
Wenn nur eine einzige Instanz eines DAG gleichzeitig ausgeführt werden soll, legen Sie
max-active-runs-per-dag-Parameter auf 1.
Probleme bei der Synchronisierung von DAGs und Plug-ins mit Planern, Workern und Webservern
Cloud Composer synchronisiert den Inhalt der Ordner /dags und /plugins
für Planer und Worker. Bestimmte Objekte in den Ordnern „/dags“ und „/plugins“
kann diese Synchronisierung verhindern, dass sie richtig funktioniert, oder sie zumindest verlangsamen.
Der Ordner „/dags“ wird mit Planern und Workern synchronisiert. Dieser Ordner ist nicht synchronisiert
auf Webserver in Cloud Composer 2 oder wenn Sie DAG Serialization in Cloud Composer 1 aktivieren.
Der Ordner „/plugins“ wird mit Planern, Workern und Webservern synchronisiert.
Die folgenden Probleme können auftreten:
Sie haben mit GZIP komprimierte Dateien hochgeladen, die
Komprimierungstranscodierung in /dags und /plugins
Ordner. Das passiert in der Regel, wenn Sie zum Hochladen den Befehl gsutil cp -Z verwenden
in den Bucket.
Lösung: Löschen Sie das Objekt, das die Komprimierungstranscodierung verwendet, und laden Sie die Datei noch einmal hoch.
in den Bucket verschieben.
Eines der Objekte hat den Namen „.“, das heißt, ein Objekt wird nicht mit
Planungs- und Worker-Instanzen
und die Synchronisierung wird möglicherweise unterbrochen.
Lösung: Benennen Sie das problematische Objekt um.
Ein Ordner und eine Python-Datei in DAG haben denselben Namen, z. B. a.py.
In diesem Fall wird die DAG-Datei nicht richtig mit den Airflow-Komponenten synchronisiert.
Lösung: Entfernen Sie den Ordner, der denselben Namen wie eine DAG-Python-Datei hat.
Eines der Objekte in den Ordnern /dags oder /plugins enthält das Symbol /
am Ende des Objektnamens hinzu. Solche Objekte können den Synchronisierungsprozess in die Irre führen.
da das Symbol / bedeutet, dass ein Objekt ein Ordner und keine Datei ist.
Lösung: Entfernen Sie das Symbol / aus dem Namen des problematischen Objekts.
Speichern Sie keine unnötigen Dateien in den Ordnern /dags und /plugins.
Manchmal werden von Ihnen implementierte DAGs und Plug-ins
zusätzliche Dateien, z. B. Dateien,
die Tests für diese Komponenten speichern. Diese
Dateien mit Mitarbeitern und Planern synchronisiert, was sich auf die für die
kopieren Sie diese Dateien in Planer, Worker und Webserver.
Lösung: Speichern Sie keine zusätzlichen und unnötigen Dateien in /dags und
/plugins Ordner.
Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' Fehler wird von Planern und Workern generiert
Dieses Problem tritt auf, weil Objekte
überlappenden Namespace in Cloud Storage
Planer und Worker verwenden
herkömmliche Dateisysteme. Zum Beispiel ist es möglich,
um einen Ordner und ein Objekt mit demselben Namen zum
Bucket. Wenn der Bucket mit den Planern und Workern der Umgebung synchronisiert ist,
wird dieser Fehler generiert, was
zu Aufgabenfehlern führen kann.
Um dieses Problem zu beheben, achten Sie darauf, dass sich im
im Bucket der Umgebung. Wenn beispielsweise sowohl /dags/misc (eine Datei) als auch
/dags/misc/example_file.txt (eine andere Datei) befinden sich in einem Bucket, wird folgender Fehler ausgegeben:
die vom Planer generiert wurden.
Vorübergehende Unterbrechungen beim Herstellen einer Verbindung zur Airflow-Metadatendatenbank
Cloud Composer wird auf einer verteilten Cloud-Infrastruktur ausgeführt.
Von Zeit zu Zeit können vorübergehende Probleme auftreten,
die Ausführung Ihrer Airflow-Aufgaben unterbrechen.
In solchen Fällen werden möglicherweise die folgenden Fehlermeldungen im Protokolle:
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
oder
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Solche vorübergehenden Probleme können auch durch Wartungsvorgänge verursacht werden.
die für Ihre Cloud Composer-Umgebungen durchgeführt wurden.
Normalerweise treten solche Fehler zeitweise auf und wenn Ihre Airflow-Aufgaben idempotent sind
und Sie Wiederholungsversuche konfiguriert haben, sollten Sie dagegen immun sein. Sie können auch
erwägen Sie das Definieren von Wartungsfenstern.
Ein weiterer Grund für solche Fehler könnte ein Mangel an Ressourcen in Ihrem Projekt sein.
Cluster der Umgebung. In solchen Fällen können Sie Ihre Kampagnen
wie in den
Skalierungsumgebungen oder
Anleitung zur Optimierung der Umgebung
Eine DAG-Ausführung ist als erfolgreich markiert, enthält aber keine ausgeführten Aufgaben
Wenn die DAG-Ausführung execution_date vor der start_date des DAG liegt, gilt Folgendes:
werden DAG-Ausführungen angezeigt, die keine Aufgabenausführungen haben, aber dennoch als erfolgreich gekennzeichnet sind.
<ph type="x-smartling-placeholder"></ph>
Abbildung 3: Eine erfolgreiche DAG-Ausführung ohne ausgeführte Aufgaben (zum Vergrößern klicken)
Ursache
Diese Situation kann in einem der folgenden Fälle auftreten:
Eine Abweichung wird durch Zeitzonenunterschiede zwischen den
execution_date und start_date. Das kann zum Beispiel passieren,
mit pendulum.parse(...), um start_date festzulegen.
Der start_date des DAG ist auf einen dynamischen Wert festgelegt, z. B.
airflow.utils.dates.days_ago(1)
Lösung
Achten Sie darauf, dass execution_date und start_date dieselbe Zeitzone verwenden.
Geben Sie einen statischen start_date an und kombinieren Sie ihn mit catchup=False, um zu vermeiden,
Ausführen von DAGs mit vergangenen Startdaten
Ein DAG ist in der Airflow-UI oder der DAG-UI nicht sichtbar und wird vom Planer nicht geplant
Der DAG-Prozessor parst jeden DAG, bevor er vom Planer geplant werden kann
und bevor ein DAG in
der Airflow-UI oder der DAG-UI
Die folgenden Airflow-Konfigurationsoptionen definieren Zeitlimits für das Parsen von DAGs:
Wenn ein DAG in der Airflow- oder DAG-UI nicht sichtbar ist:
Prüfen Sie die DAG-Prozessorlogs, ob der DAG-Prozessor in der Lage ist,
Ihren DAG. Bei Problemen werden möglicherweise die folgenden Logeinträge angezeigt
in den DAG-Prozessor- oder Planerlogs:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
Prüfen Sie die Planerlogs, um festzustellen, ob sie ordnungsgemäß funktionieren. Im Fall von
werden möglicherweise die folgenden Logeinträge in Planerlogs angezeigt:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496
Lösungen:
Beheben Sie alle DAG-Parsing-Fehler. Der DAG-Prozessor parst mehrere DAGs und
in seltenen Fällen kann das Parsen von DAG-Fehlern
anderen DAGs.
Wenn das Parsen aller DAGs mehr als die festgelegte Anzahl von Sekunden dauert
in
[core]dag_file_processor_timeout,
erhöhen Sie dieses Zeitlimit.
Wenn das Parsen des DAG lange dauert, kann dies auch bedeuten,
optimal implementiert werden. Wenn z. B. viele
Umgebungsvariablen oder Aufrufe an externe Dienste oder Airflow
Datenbank. Vermeiden Sie es nach Möglichkeit, solche Vorgänge
globalen Abschnitten von DAGs.
Erhöhen Sie die CPU- und Arbeitsspeicherressourcen für den Scheduler, damit er schneller arbeitet.