Airflow-Planerprobleme beheben

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Diese Seite enthält Schritte zur Fehlerbehebung und Informationen zu häufigen Problemen mit Airflow-Planern und DAG-Prozessoren.

Problemursache identifizieren

Ermitteln Sie zu Beginn der Fehlerbehebung, ob das Problem auftritt:

  • Zur DAG-Parsing-Zeit, während der DAG von einem Airflow-DAG-Prozessor geparst wird
  • Zur Ausführungszeit, während der DAG von einem Airflow-Planer verarbeitet wird

Weitere Informationen zur Parsing- und zur Ausführungszeit finden Sie unter Unterschied zwischen der DAG-Parsing-Zeit und der DAG-Ausführungszeit.

Probleme bei der DAG-Verarbeitung untersuchen

  1. DAG-Prozessor-Logs prüfen
  2. DAG-Parsing-Zeiten prüfen

Laufende und in der Warteschlange befindliche Aufgaben überwachen

So prüfen Sie, ob Aufgaben in einer Warteschlange hängen:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Monitoring auf.

  4. Prüfen Sie auf dem Tab Monitoring das Diagramm Airflow-Aufgaben im Abschnitt DAG-Ausführungen und identifizieren Sie mögliche Probleme. Airflow-Aufgaben sind Aufgaben, die in Airflow in der Warteschlange stehen. Sie können entweder in die Broker-Warteschlange von Celery oder Kubernetes Executor verschoben werden. Tasks in der Celery-Warteschlange sind Taskinstanzen, die in die Celery-Broker-Warteschlange gestellt wurden.

Probleme zur DAG-Parsing-Zeit beheben

In folgenden Abschnitten werden Symptome und mögliche Lösungen für einige während der DAG-Parsing-Zeit häufig auftretende Probleme beschrieben.

Anzahl und Zeitverteilung der Aufgaben

Bei der gleichzeitigen Planung einer großen Anzahl von DAGs oder Aufgaben kann es in Airflow zu Problemen kommen. So kannst du Probleme bei der Planung vermeiden:

  • Passen Sie Ihre DAGs so an, dass sie eine kleinere Anzahl konsolidierter Aufgaben verwenden.
  • Passen Sie die Zeitplanintervalle Ihrer DAGs an, um DAG-Ausführungen gleichmäßiger über die Zeit zu verteilen.

Airflow-Konfiguration skalieren

Airflow bietet Konfigurationsoptionen, die steuern, wie viele Aufgaben und DAGs es gleichzeitig ausführen kann. Um diese Konfigurationsoptionen festzulegen, überschreiben Sie deren Werte für Ihre Umgebung. Einige dieser Werte können Sie auch auf DAG- oder Aufgabenebene festlegen.

  • Worker-Nebenläufigkeit

    Der Parameter [celery]worker_concurrency steuert die maximale Anzahl von Aufgaben, die ein Airflow-Worker gleichzeitig ausführen kann. Wenn Sie den Wert dieses Parameters mit der Anzahl der Airflow-Worker in Ihrer Cloud Composer-Umgebung multiplizieren, erhalten Sie die maximale Anzahl von Aufgaben, die zu einem bestimmten Zeitpunkt in Ihrer Umgebung ausgeführt werden können. Diese Zahl ist durch die Airflow-Konfigurationsoption [core]parallelism begrenzt, die unten weiter beschrieben wird.

    In Cloud Composer 3-Umgebungen wird der Standardwert von [celery]worker_concurrency automatisch anhand der Anzahl der nebenläufigen Lightweight-Aufgabeninstanzen berechnet, die ein Worker verarbeiten kann. Das bedeutet, dass der Wert von den Ressourcenlimits für Worker abhängt. Der Wert für die Worker-Nebenläufigkeit hängt nicht von der Anzahl der Worker in Ihrer Umgebung ab.

  • Maximale Anzahl aktiver DAG-Ausführungen

    Die Airflow-Konfigurationsoption [core]max_active_runs_per_dag steuert die maximale Anzahl aktiver DAG-Ausführungen pro DAG. Der Planer erstellt keine weiteren DAG-Ausführungen, wenn das Limit erreicht ist.

    Ist dieser Parameter falsch eingestellt, kann ein Problem auftreten, bei dem der Planer die DAG-Ausführung drosselt, da er zu einer bestimmten Zeit keine DAG-Ausführungsinstanzen mehr erstellen kann.

    Sie können diesen Wert auch auf DAG-Ebene mit dem Parameter max_active_runs festlegen.

  • Maximale Anzahl aktiver Aufgaben pro DAG

    Die Airflow-Konfigurationsoption [core]max_active_tasks_per_dag steuert die maximale Anzahl an Aufgabeninstanzen, die pro DAG gleichzeitig ausgeführt werden können.

    Ist dieser Parameter falsch festgelegt, kann ein Problem auftreten, bei dem die Ausführung einer einzelnen DAG-Instanz langsam läuft, da nur eine begrenzte Anzahl an DAG-Aufgaben zu einer bestimmten Zeit ausgeführt werden können. In diesem Fall können Sie den Wert dieser Konfigurationsoption erhöhen.

    Sie können diesen Wert auch auf DAG-Ebene mit dem Parameter max_active_tasks festlegen.

    Mit den Parametern max_active_tis_per_dag und max_active_tis_per_dagrun auf Aufgabenebene können Sie steuern, wie viele Instanzen mit einer bestimmten Aufgaben-ID pro DAG und pro DAG-Lauf ausgeführt werden dürfen.

  • Parallelität und Poolgröße

    Die Airflow-Konfigurationsoption [core]parallelism steuert, wie viele Aufgaben der Airflow-Planer in die Warteschlange des Executors stellen kann, nachdem alle Abhängigkeiten für diese Aufgaben erfüllt wurden.

    Dies ist ein globaler Parameter für die gesamte Airflow-Einrichtung.

    Aufgaben werden in die Warteschlange gestellt und in einem Pool ausgeführt. Cloud Composer-Umgebungen verwenden nur einen Pool. Die Größe dieses Pools steuert, wie viele Aufgaben der Planer in einem bestimmten Moment zur Ausführung in die Warteschlange stellen kann. Wenn die Poolgröße zu klein ist, kann der Planer Aufgaben nicht für die Ausführung in die Warteschlange stellen, auch wenn Grenzwerte, die durch die Konfigurationsoption [core]parallelism und die Konfigurationsoption [celery]worker_concurrency multipliziert mit der Anzahl der Airflow-Worker definiert sind, noch nicht erreicht sind.

    Sie können die Poolgröße in der Airflow-UI konfigurieren (Menü > Administrator > Pools). Passen Sie die Poolgröße an das Maß an Parallelität an, das Sie in Ihrer Umgebung erwarten.

    Normalerweise wird [core]parallelism als Produkt aus der maximalen Anzahl von Workern und [celery]worker_concurrency festgelegt.

Fehlerbehebung bei laufenden und in der Warteschlange befindlichen Aufgaben

In folgenden Abschnitten werden Symptome und mögliche Lösungen für einige bei laufenden und in der Warteschlange befindlichen Aufgaben häufig auftretenden Problemen beschrieben.

DAG-Ausführungen werden nicht ausgeführt

Symptom:

Wenn ein Zeitplandatum für einen DAG dynamisch festgelegt wird, kann dies zu verschiedenen unerwarteten Nebenwirkungen führen. Beispiel:

  • Eine DAG-Ausführung findet immer in der Zukunft statt und die DAG wird nie ausgeführt.

  • Vergangene DAG-Ausführungen werden als ausgeführt und erfolgreich markiert, obwohl sie nicht ausgeführt wurden.

Weitere Informationen finden Sie in der Apache Airflow-Dokumentation.

Mögliche Lösungen:

  • Folgen Sie den Empfehlungen in der Apache Airflow-Dokumentation.

  • Statische start_date für DAGs festlegen Alternativ können Sie catchup=False verwenden, um die Ausführung des DAG für vergangene Datumsangaben zu deaktivieren.

  • Vermeiden Sie die Verwendung von datetime.now() oder days_ago(<number of days>), es sei denn, Sie sind sich der Nebenwirkungen dieses Ansatzes bewusst.

TimeTable-Funktion des Airflow-Planers verwenden

Cloud Composer 3 unterstützt keine benutzerdefinierten Plug-ins für den Airflow-Scheduler, einschließlich Zeitplänen, die in DAGs implementiert sind. Plug-ins werden nicht mit den Schedulern in Ihrer Umgebung synchronisiert.

Sie können integrierte Zeitpläne weiterhin in Cloud Composer 3 verwenden.

Aufgabenplanung während Wartungsfenstern vermeiden

Sie können Wartungsfenster für Ihre Umgebung definieren, damit die Wartung der Umgebung außerhalb der Zeiten erfolgt, in denen Sie Ihre DAGs ausführen. Sie können Ihre DAGs weiterhin während Wartungsfenstern ausführen, sofern es akzeptabel ist, dass einige Aufgaben unterbrochen und wiederholt werden. Weitere Informationen dazu, wie sich Wartungsfenster auf Ihre Umgebung auswirken, finden Sie unter Wartungsfenster angeben.

Verwendung von "wait_for_downstream" in Ihren DAGs

Wenn Sie den Parameter wait_for_downstream in den DAGs auf True festlegen, müssen, damit eine Aufgabe erfolgreich ist, alle Aufgaben, die in Bezug auf diese Aufgabe unmittelbar nachgelagert sind, ebenfalls erfolgreich ausgeführt werden. Dies bedeutet, dass die Ausführung von Aufgaben, die zu einer bestimmten DAG-Ausführung gehören, durch die Ausführung von Aufgaben aus der vorherigen DAG-Ausführung verlangsamt werden kann. Weitere Informationen dazu finden Sie in der Airflow-Dokumentation.

Aufgaben, die zu lange in der Warteschlange stehen, werden abgebrochen und neu geplant

Wenn eine Airflow-Aufgabe zu lange in der Warteschlange verbleibt, plant der Planer sie nach Ablauf der in der Airflow-Konfigurationsoption [scheduler]task_queued_timeout festgelegten Zeit noch einmal für die Ausführung ein. Der Standardwert ist 2400.

Eine Möglichkeit, die Symptome dieser Situation zu beobachten, besteht darin, sich das Diagramm mit der Anzahl der in die Warteschlange eingereihten Aufgaben anzusehen (Tab „Monitoring“ in der Cloud Composer-UI). Wenn die Spitzen in diesem Diagramm nicht innerhalb von etwa zwei Stunden abnehmen, werden die Aufgaben höchstwahrscheinlich neu geplant (ohne Logs). Danach werden Logeinträge vom Typ „Adopted tasks were still pending ...“ in den Scheduler-Logs angezeigt. In solchen Fällen wird in den Airflow-Aufgabenlogs möglicherweise die Meldung „Logdatei nicht gefunden…“ angezeigt, da die Aufgabe nicht ausgeführt wurde.

Im Allgemeinen ist dieses Verhalten zu erwarten und die nächste Instanz der geplanten Aufgabe soll gemäß dem Zeitplan ausgeführt werden. Wenn Sie in Ihren Cloud Composer-Umgebungen viele solcher Fälle beobachten, bedeutet dies möglicherweise, dass nicht genügend Airflow-Worker in Ihrer Umgebung vorhanden sind, um alle geplanten Aufgaben zu verarbeiten.

Lösung: Um dieses Problem zu beheben, müssen Sie dafür sorgen, dass in Airflow-Workern immer Kapazität zum Ausführen von in die Warteschlange eingereihten Aufgaben vorhanden ist. Sie können beispielsweise die Anzahl der Worker oder die Nebenläufigkeit von Workern erhöhen. Sie können auch die Parallelität oder Pools anpassen, um zu verhindern, dass mehr Aufgaben in die Warteschlange gestellt werden, als Sie Kapazität haben.

Cloud Composer-Ansatz für den Parameter „min_file_process_interval“

Cloud Composer ändert die Art und Weise, wie [scheduler]min_file_process_interval vom Airflow-Planer verwendet wird.

Der Airflow-Planer wird nach einer bestimmten Anzahl von Malen neu gestartet, wenn alle DAGs geplant sind. Der Parameter [scheduler]num_runs steuert, wie oft dies vom Planer erfolgt. Wenn der Scheduler [scheduler]num_runs Planungsschleifen erreicht, wird er neu gestartet. Der Scheduler ist eine zustandslose Komponente. Ein Neustart ist ein Mechanismus zur automatischen Reparatur von Problemen, die beim Scheduler auftreten können. Der Standardwert von [scheduler]num_runs ist 5.000.

Mit [scheduler]min_file_process_interval kann konfiguriert werden, wie oft DAGs geparst werden. Dieser Parameter darf jedoch nicht länger sein als die Zeit, die ein Planer für die Ausführung von [scheduler]num_runs-Schleifen beim Planen Ihrer DAGs benötigt.

Aufgaben nach Erreichen von „dagrun_timeout“ als fehlgeschlagen markieren

Der Planer markiert Aufgaben, die nicht abgeschlossen sind (werden ausgeführt, geplant und in der Warteschlange), als fehlgeschlagen, wenn eine DAG-Ausführung nicht innerhalb von dagrun_timeout (ein DAG-Parameter) abgeschlossen wird.

Lösung:

Symptome einer hohen Last der Airflow-Datenbank

In den Airflow-Worker-Logs wird manchmal der folgende Warnungslogeintrag angezeigt:

psycopg2.OperationalError: connection to server at ... failed

Solche Fehler oder Warnungen können ein Symptom dafür sein, dass die Airflow-Datenbank durch die Anzahl der offenen Verbindungen oder die Anzahl der gleichzeitig ausgeführten Abfragen überlastet ist. Dies kann durch Planer oder andere Airflow-Komponenten wie Worker, Triggerer und Webserver verursacht werden.

Mögliche Lösungen:

Auf dem Webserver wird die Warnung „Der Planer scheint nicht zu laufen“ angezeigt.

Der Planer meldet seinen Heartbeat regelmäßig an die Airflow-Datenbank. Anhand dieser Informationen ermittelt der Airflow-Webserver, ob der Scheduler aktiv ist.

Wenn der Scheduler stark ausgelastet ist, kann es vorkommen, dass er seinen Heartbeat nicht alle [scheduler]scheduler_heartbeat_sec senden kann.

In diesem Fall wird auf dem Airflow-Webserver möglicherweise die folgende Warnung angezeigt:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Mögliche Lösungen:

  • Erhöhen Sie die CPU- und Arbeitsspeicherressourcen für den Planer.

  • Optimieren Sie Ihre DAGs so, dass sie schneller geparst und geplant werden und nicht zu viele Scheduler-Ressourcen verbrauchen.

  • Vermeiden Sie die Verwendung globaler Variablen in Airflow-DAGs. Verwenden Sie stattdessen Umgebungsvariablen und Airflow-Variablen.

  • Erhöhen Sie den Wert der Airflow-Konfigurationsoption [scheduler]scheduler_health_check_threshold, damit der Webserver länger wartet, bevor er die Nichtverfügbarkeit des Planers meldet.

Lösungen für Probleme, die beim Backfill von DAGs auftreten

Manchmal möchten Sie DAGs, die bereits ausgeführt wurden, noch einmal ausführen. Sie können dies mit einem Airflow-Befehlszeilenbefehl tun:

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Wenn Sie nur fehlgeschlagene Aufgaben für einen bestimmten DAG noch einmal ausführen möchten, verwenden Sie auch das Argument --rerun-failed-tasks.

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.
  • START_DATE mit einem Wert für den DAG-Parameter start_date im Format YYYY-MM-DD.
  • END_DATE mit einem Wert für den DAG-Parameter end_date im Format YYYY-MM-DD.
  • DAG_NAME durch den Namen des DAG.

Beim Backfill-Vorgang kann es manchmal zu einer Deadlock-Situation kommen, in der ein Backfill nicht möglich ist, weil eine Aufgabe gesperrt ist. Beispiel:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

In einigen Fällen können Sie die folgenden Problemumgehungen verwenden, um Deadlocks zu vermeiden:

  • Deaktivieren Sie den Mini-Scheduler, indem Sie [core]schedule_after_task_execution überschreiben und auf False setzen.

  • Führen Sie Backfills für kürzere Zeiträume aus. Legen Sie beispielsweise START_DATE und END_DATE fest, um einen Zeitraum von nur einem Tag anzugeben.

Nächste Schritte