Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Diese Seite enthält Schritte zur Fehlerbehebung und Informationen zu häufigen Problemen mit Airflow-Planern und DAG-Prozessoren.
Problemursache identifizieren
Ermitteln Sie zu Beginn der Fehlerbehebung, ob das Problem auftritt:
- Zur DAG-Parsing-Zeit, während der DAG von einem Airflow-DAG-Prozessor geparst wird
- Zur Ausführungszeit, während der DAG von einem Airflow-Planer verarbeitet wird
Weitere Informationen zur Parsing- und zur Ausführungszeit finden Sie unter Unterschied zwischen der DAG-Parsing-Zeit und der DAG-Ausführungszeit.
Probleme bei der DAG-Verarbeitung untersuchen
Laufende und in der Warteschlange befindliche Aufgaben überwachen
So prüfen Sie, ob Aufgaben in einer Warteschlange hängen:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Monitoring auf.
Prüfen Sie auf dem Tab Monitoring das Diagramm Airflow-Aufgaben im Abschnitt DAG-Ausführungen und identifizieren Sie mögliche Probleme. Airflow-Aufgaben sind Aufgaben, die in Airflow in der Warteschlange stehen. Sie können entweder in die Broker-Warteschlange von Celery oder Kubernetes Executor verschoben werden. Tasks in der Celery-Warteschlange sind Taskinstanzen, die in die Celery-Broker-Warteschlange gestellt wurden.
Probleme zur DAG-Parsing-Zeit beheben
In folgenden Abschnitten werden Symptome und mögliche Lösungen für einige während der DAG-Parsing-Zeit häufig auftretende Probleme beschrieben.
Anzahl und Zeitverteilung der Aufgaben
Bei der gleichzeitigen Planung einer großen Anzahl von DAGs oder Aufgaben kann es in Airflow zu Problemen kommen. So kannst du Probleme bei der Planung vermeiden:
- Passen Sie Ihre DAGs so an, dass sie eine kleinere Anzahl konsolidierter Aufgaben verwenden.
- Passen Sie die Zeitplanintervalle Ihrer DAGs an, um DAG-Ausführungen gleichmäßiger über die Zeit zu verteilen.
Airflow-Konfiguration skalieren
Airflow bietet Konfigurationsoptionen, die steuern, wie viele Aufgaben und DAGs es gleichzeitig ausführen kann. Um diese Konfigurationsoptionen festzulegen, überschreiben Sie deren Werte für Ihre Umgebung. Einige dieser Werte können Sie auch auf DAG- oder Aufgabenebene festlegen.
-
Der Parameter
[celery]worker_concurrency
steuert die maximale Anzahl von Aufgaben, die ein Airflow-Worker gleichzeitig ausführen kann. Wenn Sie den Wert dieses Parameters mit der Anzahl der Airflow-Worker in Ihrer Cloud Composer-Umgebung multiplizieren, erhalten Sie die maximale Anzahl von Aufgaben, die zu einem bestimmten Zeitpunkt in Ihrer Umgebung ausgeführt werden können. Diese Zahl ist durch die Airflow-Konfigurationsoption[core]parallelism
begrenzt, die unten weiter beschrieben wird.In Cloud Composer 3-Umgebungen wird der Standardwert von
[celery]worker_concurrency
automatisch anhand der Anzahl der nebenläufigen Lightweight-Aufgabeninstanzen berechnet, die ein Worker verarbeiten kann. Das bedeutet, dass der Wert von den Ressourcenlimits für Worker abhängt. Der Wert für die Worker-Nebenläufigkeit hängt nicht von der Anzahl der Worker in Ihrer Umgebung ab. Maximale Anzahl aktiver DAG-Ausführungen
Die Airflow-Konfigurationsoption
[core]max_active_runs_per_dag
steuert die maximale Anzahl aktiver DAG-Ausführungen pro DAG. Der Planer erstellt keine weiteren DAG-Ausführungen, wenn das Limit erreicht ist.Ist dieser Parameter falsch eingestellt, kann ein Problem auftreten, bei dem der Planer die DAG-Ausführung drosselt, da er zu einer bestimmten Zeit keine DAG-Ausführungsinstanzen mehr erstellen kann.
Sie können diesen Wert auch auf DAG-Ebene mit dem Parameter
max_active_runs
festlegen.Maximale Anzahl aktiver Aufgaben pro DAG
Die Airflow-Konfigurationsoption
[core]max_active_tasks_per_dag
steuert die maximale Anzahl an Aufgabeninstanzen, die pro DAG gleichzeitig ausgeführt werden können.Ist dieser Parameter falsch festgelegt, kann ein Problem auftreten, bei dem die Ausführung einer einzelnen DAG-Instanz langsam läuft, da nur eine begrenzte Anzahl an DAG-Aufgaben zu einer bestimmten Zeit ausgeführt werden können. In diesem Fall können Sie den Wert dieser Konfigurationsoption erhöhen.
Sie können diesen Wert auch auf DAG-Ebene mit dem Parameter
max_active_tasks
festlegen.Mit den Parametern
max_active_tis_per_dag
undmax_active_tis_per_dagrun
auf Aufgabenebene können Sie steuern, wie viele Instanzen mit einer bestimmten Aufgaben-ID pro DAG und pro DAG-Lauf ausgeführt werden dürfen.Parallelität und Poolgröße
Die Airflow-Konfigurationsoption
[core]parallelism
steuert, wie viele Aufgaben der Airflow-Planer in die Warteschlange des Executors stellen kann, nachdem alle Abhängigkeiten für diese Aufgaben erfüllt wurden.Dies ist ein globaler Parameter für die gesamte Airflow-Einrichtung.
Aufgaben werden in die Warteschlange gestellt und in einem Pool ausgeführt. Cloud Composer-Umgebungen verwenden nur einen Pool. Die Größe dieses Pools steuert, wie viele Aufgaben der Planer in einem bestimmten Moment zur Ausführung in die Warteschlange stellen kann. Wenn die Poolgröße zu klein ist, kann der Planer Aufgaben nicht für die Ausführung in die Warteschlange stellen, auch wenn Grenzwerte, die durch die Konfigurationsoption
[core]parallelism
und die Konfigurationsoption[celery]worker_concurrency
multipliziert mit der Anzahl der Airflow-Worker definiert sind, noch nicht erreicht sind.Sie können die Poolgröße in der Airflow-UI konfigurieren (Menü > Administrator > Pools). Passen Sie die Poolgröße an das Maß an Parallelität an, das Sie in Ihrer Umgebung erwarten.
Normalerweise wird
[core]parallelism
als Produkt aus der maximalen Anzahl von Workern und[celery]worker_concurrency
festgelegt.
Fehlerbehebung bei laufenden und in der Warteschlange befindlichen Aufgaben
In folgenden Abschnitten werden Symptome und mögliche Lösungen für einige bei laufenden und in der Warteschlange befindlichen Aufgaben häufig auftretenden Problemen beschrieben.
DAG-Ausführungen werden nicht ausgeführt
Symptom:
Wenn ein Zeitplandatum für einen DAG dynamisch festgelegt wird, kann dies zu verschiedenen unerwarteten Nebenwirkungen führen. Beispiel:
Eine DAG-Ausführung findet immer in der Zukunft statt und die DAG wird nie ausgeführt.
Vergangene DAG-Ausführungen werden als ausgeführt und erfolgreich markiert, obwohl sie nicht ausgeführt wurden.
Weitere Informationen finden Sie in der Apache Airflow-Dokumentation.
Mögliche Lösungen:
Folgen Sie den Empfehlungen in der Apache Airflow-Dokumentation.
Statische
start_date
für DAGs festlegen Alternativ können Siecatchup=False
verwenden, um die Ausführung des DAG für vergangene Datumsangaben zu deaktivieren.Vermeiden Sie die Verwendung von
datetime.now()
oderdays_ago(<number of days>)
, es sei denn, Sie sind sich der Nebenwirkungen dieses Ansatzes bewusst.
TimeTable-Funktion des Airflow-Planers verwenden
Cloud Composer 3 unterstützt keine benutzerdefinierten Plug-ins für den Airflow-Scheduler, einschließlich Zeitplänen, die in DAGs implementiert sind. Plug-ins werden nicht mit den Schedulern in Ihrer Umgebung synchronisiert.
Sie können integrierte Zeitpläne weiterhin in Cloud Composer 3 verwenden.
Aufgabenplanung während Wartungsfenstern vermeiden
Sie können Wartungsfenster für Ihre Umgebung definieren, damit die Wartung der Umgebung außerhalb der Zeiten erfolgt, in denen Sie Ihre DAGs ausführen. Sie können Ihre DAGs weiterhin während Wartungsfenstern ausführen, sofern es akzeptabel ist, dass einige Aufgaben unterbrochen und wiederholt werden. Weitere Informationen dazu, wie sich Wartungsfenster auf Ihre Umgebung auswirken, finden Sie unter Wartungsfenster angeben.
Verwendung von "wait_for_downstream" in Ihren DAGs
Wenn Sie den Parameter wait_for_downstream
in den DAGs auf True
festlegen, müssen, damit eine Aufgabe erfolgreich ist, alle Aufgaben, die in Bezug auf diese Aufgabe unmittelbar nachgelagert sind, ebenfalls erfolgreich ausgeführt werden. Dies bedeutet, dass die Ausführung von Aufgaben, die zu einer bestimmten DAG-Ausführung gehören, durch die Ausführung von Aufgaben aus der vorherigen DAG-Ausführung verlangsamt werden kann. Weitere Informationen dazu finden Sie in der Airflow-Dokumentation.
Aufgaben, die zu lange in der Warteschlange stehen, werden abgebrochen und neu geplant
Wenn eine Airflow-Aufgabe zu lange in der Warteschlange verbleibt, plant der Planer sie nach Ablauf der in der Airflow-Konfigurationsoption [scheduler]task_queued_timeout
festgelegten Zeit noch einmal für die Ausführung ein. Der Standardwert ist 2400
.
Eine Möglichkeit, die Symptome dieser Situation zu beobachten, besteht darin, sich das Diagramm mit der Anzahl der in die Warteschlange eingereihten Aufgaben anzusehen (Tab „Monitoring“ in der Cloud Composer-UI). Wenn die Spitzen in diesem Diagramm nicht innerhalb von etwa zwei Stunden abnehmen, werden die Aufgaben höchstwahrscheinlich neu geplant (ohne Logs). Danach werden Logeinträge vom Typ „Adopted tasks were still pending ...“ in den Scheduler-Logs angezeigt. In solchen Fällen wird in den Airflow-Aufgabenlogs möglicherweise die Meldung „Logdatei nicht gefunden…“ angezeigt, da die Aufgabe nicht ausgeführt wurde.
Im Allgemeinen ist dieses Verhalten zu erwarten und die nächste Instanz der geplanten Aufgabe soll gemäß dem Zeitplan ausgeführt werden. Wenn Sie in Ihren Cloud Composer-Umgebungen viele solcher Fälle beobachten, bedeutet dies möglicherweise, dass nicht genügend Airflow-Worker in Ihrer Umgebung vorhanden sind, um alle geplanten Aufgaben zu verarbeiten.
Lösung: Um dieses Problem zu beheben, müssen Sie dafür sorgen, dass in Airflow-Workern immer Kapazität zum Ausführen von in die Warteschlange eingereihten Aufgaben vorhanden ist. Sie können beispielsweise die Anzahl der Worker oder die Nebenläufigkeit von Workern erhöhen. Sie können auch die Parallelität oder Pools anpassen, um zu verhindern, dass mehr Aufgaben in die Warteschlange gestellt werden, als Sie Kapazität haben.
Cloud Composer-Ansatz für den Parameter „min_file_process_interval“
Cloud Composer ändert die Art und Weise, wie [scheduler]min_file_process_interval
vom Airflow-Planer verwendet wird.
Der Airflow-Planer wird nach einer bestimmten Anzahl von Malen neu gestartet, wenn alle DAGs geplant sind. Der Parameter [scheduler]num_runs
steuert, wie oft dies vom Planer erfolgt. Wenn der Scheduler [scheduler]num_runs
Planungsschleifen erreicht, wird er neu gestartet. Der Scheduler ist eine zustandslose Komponente. Ein Neustart ist ein Mechanismus zur automatischen Reparatur von Problemen, die beim Scheduler auftreten können. Der Standardwert von [scheduler]num_runs
ist 5.000.
Mit [scheduler]min_file_process_interval
kann konfiguriert werden, wie oft DAGs geparst werden. Dieser Parameter darf jedoch nicht länger sein als die Zeit, die ein Planer für die Ausführung von [scheduler]num_runs
-Schleifen beim Planen Ihrer DAGs benötigt.
Aufgaben nach Erreichen von „dagrun_timeout“ als fehlgeschlagen markieren
Der Planer markiert Aufgaben, die nicht abgeschlossen sind (werden ausgeführt, geplant und in der Warteschlange), als fehlgeschlagen, wenn eine DAG-Ausführung nicht innerhalb von dagrun_timeout
(ein DAG-Parameter) abgeschlossen wird.
Lösung:
Erweitern Sie
dagrun_timeout
, um das Zeitlimit zu erreichen.Erhöhen Sie die Anzahl der Worker oder erhöhen Sie die Leistungsparameter der Worker, damit die DAG schneller ausgeführt wird.
Symptome einer hohen Last der Airflow-Datenbank
In den Airflow-Worker-Logs wird manchmal der folgende Warnungslogeintrag angezeigt:
psycopg2.OperationalError: connection to server at ... failed
Solche Fehler oder Warnungen können ein Symptom dafür sein, dass die Airflow-Datenbank durch die Anzahl der offenen Verbindungen oder die Anzahl der gleichzeitig ausgeführten Abfragen überlastet ist. Dies kann durch Planer oder andere Airflow-Komponenten wie Worker, Triggerer und Webserver verursacht werden.
Mögliche Lösungen:
Skalieren Sie die Airflow-Datenbank, indem Sie die Größe Ihrer Umgebung anpassen.
Reduzieren Sie die Anzahl der Planer und DAG-Prozessoren. Beginnen Sie mit einem Planer und einem DAG-Prozessor und erhöhen Sie die Anzahl der Planer oder DAG-Prozessoren, wenn diese Komponenten sich ihren Ressourcengrenzen nähern.
Vermeiden Sie die Verwendung globaler Variablen in Airflow-DAGs. Verwenden Sie stattdessen Umgebungsvariablen und Airflow-Variablen.
Setzen Sie
[scheduler]scheduler_heartbeat_sec
auf einen höheren Wert, z. B. 15 Sekunden oder mehr.Setzen Sie
[scheduler]job_heartbeat_sec
auf einen höheren Wert, z. B. 30 Sekunden oder mehr.Legen Sie
[scheduler]scheduler_health_check_threshold
auf einen Wert fest, der gleich[scheduler]job_heartbeat_sec
multipliziert mit4
ist.
Auf dem Webserver wird die Warnung „Der Planer scheint nicht zu laufen“ angezeigt.
Der Planer meldet seinen Heartbeat regelmäßig an die Airflow-Datenbank. Anhand dieser Informationen ermittelt der Airflow-Webserver, ob der Scheduler aktiv ist.
Wenn der Scheduler stark ausgelastet ist, kann es vorkommen, dass er seinen Heartbeat nicht alle [scheduler]scheduler_heartbeat_sec
senden kann.
In diesem Fall wird auf dem Airflow-Webserver möglicherweise die folgende Warnung angezeigt:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Mögliche Lösungen:
Erhöhen Sie die CPU- und Arbeitsspeicherressourcen für den Planer.
Optimieren Sie Ihre DAGs so, dass sie schneller geparst und geplant werden und nicht zu viele Scheduler-Ressourcen verbrauchen.
Vermeiden Sie die Verwendung globaler Variablen in Airflow-DAGs. Verwenden Sie stattdessen Umgebungsvariablen und Airflow-Variablen.
Erhöhen Sie den Wert der Airflow-Konfigurationsoption
[scheduler]scheduler_health_check_threshold
, damit der Webserver länger wartet, bevor er die Nichtverfügbarkeit des Planers meldet.
Lösungen für Probleme, die beim Backfill von DAGs auftreten
Manchmal möchten Sie DAGs, die bereits ausgeführt wurden, noch einmal ausführen. Sie können dies mit einem Airflow-Befehlszeilenbefehl tun:
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Wenn Sie nur fehlgeschlagene Aufgaben für einen bestimmten DAG noch einmal ausführen möchten, verwenden Sie auch das Argument --rerun-failed-tasks
.
Ersetzen Sie:
ENVIRONMENT_NAME
durch den Namen der Umgebung.LOCATION
durch die Region, in der sich die Umgebung befindet.START_DATE
mit einem Wert für den DAG-Parameterstart_date
im FormatYYYY-MM-DD
.END_DATE
mit einem Wert für den DAG-Parameterend_date
im FormatYYYY-MM-DD
.DAG_NAME
durch den Namen des DAG.
Beim Backfill-Vorgang kann es manchmal zu einer Deadlock-Situation kommen, in der ein Backfill nicht möglich ist, weil eine Aufgabe gesperrt ist. Beispiel:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
In einigen Fällen können Sie die folgenden Problemumgehungen verwenden, um Deadlocks zu vermeiden:
Deaktivieren Sie den Mini-Scheduler, indem Sie
[core]schedule_after_task_execution
überschreiben und aufFalse
setzen.Führen Sie Backfills für kürzere Zeiträume aus. Legen Sie beispielsweise
START_DATE
undEND_DATE
fest, um einen Zeitraum von nur einem Tag anzugeben.