Airflow-Planerprobleme beheben

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite finden Sie Schritte zur Fehlerbehebung und Informationen für häufige Probleme mit Airflow-Planern.

Problemursache identifizieren

Ermitteln Sie zu Beginn der Fehlerbehebung, ob das Problem zur DAG-Parsing-Zeit oder bei der Verarbeitung von Aufgaben zur Ausführungszeit auftritt. Weitere Informationen zur Parsing- und zur Ausführungszeit finden Sie unter Unterschied zwischen der DAG-Parsing-Zeit und der DAG-Ausführungszeit.

DAG-Prozessorlogs prüfen

Bei komplexen DAGs ist der DAG-Prozessor, der vom parsen Sie möglicherweise nicht alle DAGs. Dies kann zu vielen Problemen führen, die folgenden Symptome haben.

Symptome:

  • Wenn der DAG-Prozessor beim Parsen Ihrer DAGs auf Probleme stößt, kann dies zu einer Kombination der unten aufgeführten Probleme führen. Wenn DAGs dynamisch generiert werden, können diese Probleme im Vergleich zu statischen DAGs schwerwiegender sein.

  • DAGs sind in der Airflow-UI und der DAG-UI nicht sichtbar.

  • Die Ausführung von DAGs ist nicht geplant.

  • Die DAG-Prozessorlogs enthalten beispielsweise Fehler:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    oder

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Bei Airflow-Planern treten Probleme auf, die zu Neustarts von Planern führen.

  • Zur Ausführung geplante Airflow-Aufgaben werden abgebrochen und DAG-Ausführungen ausgeführt für DAGs, die nicht geparst werden konnten, sind sie möglicherweise als failed gekennzeichnet. Beispiel:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

Lösung

  • Anzahl der Parameter für das DAG-Parsing erhöhen:

    • Erhöhung dagbag-import-timeout auf mindestens 120 Sekunden (oder länger, falls erforderlich).

    • Erhöhung dag-file-processor-timeout auf mindestens 180 Sekunden (oder mehr, falls erforderlich). Dieser Wert muss höher als dagbag-import-timeout.

  • Korrigieren oder entfernen Sie DAGs, die Probleme beim DAG-Prozessor verursachen.

DAG-Parsing-Zeiten prüfen

Führen Sie folgende Schritte aus, um festzustellen, ob das Problem beim DAG-Parsen auftritt.

Console

In der Google Cloud Console können Sie die Seite Monitoring verwenden und den Tab Logs, um die DAG-Analysezeiten zu überprüfen.

Prüfen Sie die DAG-Analysezeiträume auf der Monitoring-Seite von Cloud Composer:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Monitoring wird geöffnet.

  3. Prüfen Sie im Tab Monitoring das Diagramm Gesamte Parsing-Zeit für alle DAG-Dateien im Abschnitt DAG-Ausführungen und identifizieren Sie mögliche Probleme.

    Im Bereich „DAG-Ausführungen“ auf dem Tab „Monitoring“ in Composer werden Statusmesswerte für die DAGs in Ihrer Umgebung angezeigt

Prüfen Sie die DAG-Analysezeiten auf dem Tab Cloud Composer-Logs:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Monitoring wird geöffnet.

  3. Wechseln Sie zum Tab Logs und im Navigationsbaum Alle Logs. Wählen Sie den Bereich DAG-Prozessormanager aus.

  4. Prüfen Sie dag-processor-manager-Logs, um mögliche Probleme zu ermitteln.

    Die DAG-Prozessorlogs zeigen die DAG-Parsing-Zeiten

gcloud – Airflow 1

Verwenden Sie den list_dags-Befehl mit dem -r-Flag, um die Parsing-Zeit für alle DAGs anzuzeigen.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    list_dags -- -r

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.

Die Ausgabe sieht dann ungefähr so aus:

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

Suchen Sie nach dem Wert DagBag-Parsing-Zeit. Ein hoher Wert kann darauf hinweisen, dass einer Ihrer DAGs nicht optimal implementiert ist. In der Ausgabetabelle können Sie ermitteln, welche DAGs eine lange Parsing-Zeit haben.

gcloud – Airflow 2

Verwenden Sie den Befehl dags report, um die Parsing-Zeit für alle DAGs aufzurufen.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.

Die Ausgabe sieht dann ungefähr so aus:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

Suchen Sie nach dem Wert duration für jeden der in der Tabelle aufgeführten Tage. Ein hoher Wert kann darauf hinweisen, dass einer Ihrer DAGs nicht implementiert ist. optimal nutzen können. Anhand der Ausgabetabelle können Sie erkennen, für welche DAGs lange Parsing-Zeit.

Laufende und in der Warteschlange befindliche Aufgaben überwachen

So prüfen Sie, ob Aufgaben in einer Warteschlange hängen:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Monitoring auf.

  4. Sehen Sie sich auf dem Tab Monitoring das Diagramm Airflow-Aufgaben an. im Bereich DAG-Ausführungen und identifizieren Sie mögliche Probleme. Airflow-Aufgaben Aufgaben, die sich in Airflow in einer Warteschlange befinden, können entweder Celery- oder Kubernetes Executor-Broker-Warteschlange. Aufgaben in der Celery-Warteschlange sind Aufgaben Instanzen, die in die Celery-Broker-Warteschlange gestellt werden.

Probleme zur DAG-Parsing-Zeit beheben

In folgenden Abschnitten werden Symptome und mögliche Lösungen für einige während der DAG-Parsing-Zeit häufig auftretende Probleme beschrieben.

DAG-Parsing und Planung in Cloud Composer 1 und Airflow 1

Die DAG-Parsing-Effizienz wurde in Airflow 2 erheblich verbessert. Wenn Sie Probleme mit der DAG-Analyse und -Planung haben, zu Airflow 2 migrieren.

In Cloud Composer 1 wird der Planer zusammen mit anderen Knoten auf Clusterknoten ausgeführt. Cloud Composer-Komponenten. Daher ist die Auslastung der einzelnen Clusterknoten können im Vergleich zu anderen Knoten höher oder niedriger sein. Der Planer Leistung (DAG-Parsing und -Planung) kann je nach Knoten variieren. auf dem der Planer ausgeführt wird. Darüber hinaus wird ein einzelner Knoten, Planerausführungen können sich infolge von Upgrade- oder Wartungsvorgängen ändern. Diese Einschränkung wurde in Cloud Composer 2 behoben, wo Sie CPU- und Arbeitsspeicherressourcen für den Planer und die Leistung des Planers hängen nicht von der Last der Clusterknoten ab.

Begrenzte Anzahl an Threads

Der DAG-Prozessormanager (der Teil des Planers, der verarbeitet DAG-Dateien), sodass nur eine begrenzte Anzahl von Threads verwendet wird. die Zeit der DAG-Analyse.

Überschreiben Sie die folgenden Airflow-Konfigurationsoptionen, um das Problem zu beheben:

  • Für Airflow 1.10.12 und frühere Versionen überschreiben Sie die max_threads-Parameter:

    Bereich Schlüssel Wert Hinweise
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 Ersetzen Sie NUMBER_OF_CORES_IN_MACHINE durch die Anzahl der Kerne
    auf den Maschinen mit Worker-Knoten.
  • Für Airflow-Versionen ab 1.10.14 überschreiben Sie parsing_processes-Parameter:

    Bereich Schlüssel Wert Hinweise
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 Ersetzen Sie NUMBER_OF_CORES_IN_MACHINE durch die Anzahl der Kerne
    auf den Maschinen mit Worker-Knoten.

Anzahl und Zeitverteilung der Aufgaben

Airflow ist dafür bekannt, dass bei der Planung einer großen Zahl kleiner Aufgaben Probleme auftreten. In solchen Situationen sollten Sie eine kleinere Anzahl konsolidierter Aufgaben verwenden.

Die gleichzeitige Planung einer großen Zahl an DAGs oder Aufgaben kann ebenfalls zu Problemen führen. Um dieses Problem zu vermeiden, verteilen Sie Ihre Aufgaben gleichmäßiger über die Zeit.

Fehlerbehebung bei laufenden und in der Warteschlange befindlichen Aufgaben

In folgenden Abschnitten werden Symptome und mögliche Lösungen für einige bei laufenden und in der Warteschlange befindlichen Aufgaben häufig auftretenden Problemen beschrieben.

Die Aufgabenwarteschlangen sind zu lang

In einigen Fällen ist eine Aufgabenwarteschlange für den Planer zu lang. Informationen zum Optimieren von Worker- und Celery-Parametern finden Sie unter Cloud Composer-Umgebung mit Ihrem Unternehmen skalieren.

Zeitachsenfunktion des Airflow-Planers verwenden

Ab Airflow 2.2 können Sie eine Zeitachse für einen DAG mithilfe eines eine neue Funktion namens TimeTable.

Sie können einen Zeitplan mit einer der folgenden Methoden definieren:

Eingeschränkte Clusterressourcen

Dieser Abschnitt gilt nur für Cloud Composer 1.

Wenn der GKE-Cluster Ihrer Umgebung zu klein ist, um alle DAGs und Aufgaben zu verarbeiten, können Leistungsprobleme auftreten. Versuchen Sie in diesem Fall eine der folgenden Lösungen:

  • Erstellen Sie eine neue Umgebung mit einem Maschinentyp, der mehr Leistung bietet, und migrieren Sie Ihre DAGs dorthin.
  • Erstellen Sie weitere Cloud Composer-Umgebungen und teilen Sie die DAGs zwischen ihnen auf.
  • Ändern Sie den Maschinentyp für GKE-Knoten wie unter Maschinentyp für GKE-Knoten aktualisieren beschrieben. Da dieses Verfahren fehleranfällig ist, ist es die am wenigsten empfohlene Option.
  • Führen Sie ein Upgrade des Maschinentyps der Cloud SQL-Instanz durch, auf der die Airflow-Datenbank in Ihrer Umgebung ausgeführt wird, z. B. mit den gcloud composer environments update-Befehlen. Niedrige Leistung der Airflow-Datenbank könnte der Grund sein ist der Planer langsam.

Aufgabenplanung während Wartungsfenstern vermeiden

Sie können bestimmte Wartungsfenster für Ihr zu verbessern. In diesen Zeiträumen werden Wartungsereignisse für Cloud SQL und GKE ausgeführt.

So bestimmen Sie, dass der Airflow-Planer unnötige Dateien ignoriert:

Sie können die Leistung des Airflow-Planers verbessern, wenn Sie unnötige Dateien im DAGs-Ordner überspringen. Der Airflow-Planer ignoriert Dateien und Ordner, die in der Datei .airflowignore angegeben sind.

So bestimmen Sie, dass der Airflow-Planer unnötige Dateien ignoriert:

  1. Erstellen Sie eine .airflowignore-Datei.
  2. Listen Sie in dieser Datei Dateien und Ordner auf, die ignoriert werden sollen.
  3. Laden Sie diese Datei in den Ordner /dags in Ihrem im Bucket der Umgebung.

Weitere Informationen zum Dateiformat .airflowignore findest du unter Airflow-Dokumentation.

Airflow-Planer verarbeitet pausierte DAGs

Airflow-Nutzer pausieren DAGs, um ihre Ausführung zu vermeiden. Dadurch werden Airflow-Worker eingespart Verarbeitungszyklen.

Der Airflow-Planer parst pausierte DAGs weiter. Wenn Sie wirklich Leistung des Airflow-Planers verbessern, .airflowignore verwenden oder pausierte Daten löschen DAGs aus dem Ordner „DAGs“.

Verwendung von "wait_for_downstream" in Ihren DAGs

Wenn Sie den Parameter wait_for_downstream in den DAGs auf True festlegen, müssen, damit eine Aufgabe erfolgreich ist, alle Aufgaben, die in Bezug auf diese Aufgabe unmittelbar nachgelagert sind, ebenfalls erfolgreich ausgeführt werden. Dies bedeutet, dass die Ausführung von Aufgaben, die zu einer bestimmten DAG-Ausführung gehören, durch die Ausführung von Aufgaben aus der vorherigen DAG-Ausführung verlangsamt werden kann. Weitere Informationen dazu finden Sie in der Airflow-Dokumentation

Aufgaben, die zu lange in die Warteschlange gestellt wurden, werden abgebrochen und neu geplant

Wenn eine Airflow-Aufgabe zu lange in der Warteschlange aufbewahrt wird, wird die Ausführung noch einmal geplant (in Airflow-Versionen vor 2.3.1 Die Aufgabe wird außerdem als fehlgeschlagen markiert und wiederholt, wenn sie für einen Wiederholungsversuch infrage kommt).

Eine Möglichkeit, die Symptome dieser Situation zu beobachten, sich das Diagramm mit der Anzahl der Aufgaben in der Warteschlange anzusehen, Tab „Monitoring“ in der Cloud Composer-UI Und wenn die Spitzen in diesem Diagramm nicht innerhalb von etwa zwei Stunden abfallen, werden die Aufgaben höchstwahrscheinlich neu geplant (ohne Logs) gefolgt von „Die übernommenen Aufgaben standen noch aus ...“ Logeinträge in den Planerlogs. In solchen Fällen wird möglicherweise die Meldung „Protokolldatei wurde nicht gefunden...“ angezeigt. Nachricht in Airflow-Tasks-Logs angezeigt, da die Aufgabe nicht ausgeführt wurde.

Dieses Verhalten ist im Allgemeinen zu erwarten und die nächste Instanz der geplanten die Aufgabe gemäß Zeitplan ausgeführt werden soll. Wenn Sie viele in Ihren Cloud Composer-Umgebungen zu platzieren, dass es in Ihrer Umgebung nicht genügend Airflow-Worker gibt, um die geplanten Aufgaben.

Lösung: Um dieses Problem zu beheben, müssen Sie sicherstellen, dass immer genügend Kapazitäten in Airflow-Workern, um Aufgaben in der Warteschlange auszuführen. Zum Beispiel können Sie die Anzahl der „worker_concurrency“ oder „worker_concurrency“. Sie können auch die Parallelität oder Pools anpassen, um Aufgaben in die Warteschlange zu stellen, als Sie haben.

Gelegentlich können veraltete Aufgaben die Ausführung eines bestimmten DAG blockieren

In regulären Fällen sollte der Airflow-Planer in der Lage sein, in denen sich veraltete Aufgaben in der Warteschlange befinden, die Möglichkeit, sie korrekt auszuführen (z. B. ein DAG, zu dem die veralteten Aufgaben gehören). wurde gelöscht).

Wenn diese veralteten Aufgaben vom Planer nicht dauerhaft gelöscht werden, müssen Sie möglicherweise manuell löschen. Das können Sie zum Beispiel in der Airflow-UI tun. Rufen Sie dazu Menü &gt; Browser &gt; auf. Aufgabeninstanzen), suchen Sie nach Aufgaben in der Warteschlange, die zu einem veralteten DAG gehören, und löschen Sie sie.

Führen Sie ein Upgrade Ihrer Umgebung auf Cloud Composer durch, um dieses Problem zu beheben. Version 2.1.12 oder höher.

Cloud Composer-Ansatz für den Parameter [scheduler]min_file_process_interval

Cloud Composer ändert die Art und Weise, wie [scheduler]min_file_process_interval vom Airflow-Planer verwendet wird.

Airflow 1

Wenn Cloud Composer Airflow 1 verwendet, können Nutzer den Wert festlegen. von [scheduler]min_file_process_interval zwischen 0 und 600 Sekunden. Werte höher als 600 Sekunden liefern die gleichen Ergebnisse wie bei einer Einstellung von [scheduler]min_file_process_interval auf 600 Sekunden.

Airflow 2

In Airflow 2 kann [scheduler]min_file_process_interval nur mit Versionen 1.19.9 und 2.0.26 oder aktueller

  • Cloud Composer-Versionen vor 1.19.9 und 2.0.26

    In diesen Versionen wird [scheduler]min_file_process_interval ignoriert.

  • Cloud Composer-Versionen 1.19.9 oder 2.0.26 oder neuere Versionen

    Der Airflow-Planer wird nach einer bestimmten Anzahl von Malen aller DAGs neu gestartet sind geplant und der Parameter [scheduler]num_runs steuert, wie oft dies vom Planer ausgeführt wird. Wenn Planer [scheduler]num_runs Planungsschleifen erreicht, ist es neu gestartet – der Scheduler ist eine zustandslose Komponente, einen Mechanismus zur automatischen Reparatur für alle Probleme, die in Scheduler auftreten könnten. Wenn keine Angabe erfolgt, wird die Standardeinstellung Wert von [scheduler]num_runs angewendet wird, also 5.000.

    Mit [scheduler]min_file_process_interval lässt sich konfigurieren, wie häufig Das DAG-Parsing erfolgt, aber dieser Parameter darf nicht länger als die erforderliche Zeit sein damit ein Planer [scheduler]num_runs ausführt. bei der Planung Ihrer DAGs.

Airflow-Konfiguration skalieren

Airflow bietet Konfigurationsoptionen, die steuern, wie viele Aufgaben und DAGs es gleichzeitig ausführen kann. Um diese Konfigurationsoptionen festzulegen, ihre Werte für Ihre Umgebung überschreiben.

  • Worker-Gleichzeitigkeit

    Der Parameter [celery]worker_concurrency steuert die maximale Anzahl von Aufgaben, die ein Airflow-Worker gleichzeitig ausführen kann. Wenn Sie den Wert dieses Parameters mit der Anzahl der Airflow-Worker in Ihrer Cloud Composer-Umgebung multiplizieren, erhalten Sie die maximale Anzahl von Aufgaben, die zu einem bestimmten Zeitpunkt in Ihrer Umgebung ausgeführt werden können. Dieses wird durch die Airflow-Konfigurationsoption [core]parallelism begrenzt. die näher beschrieben wird.

    In Cloud Composer 2-Umgebungen wird der Standardwert [celery]worker_concurrency wird automatisch berechnet

    • Für die Airflow-Versionen 2.3.3 und höher ist [celery]worker_concurrency festgelegt. auf einen Mindestwert von 32, 12 * worker_CPU und 8 * worker_memory.

    • Für Airflow-Versionen bis 2.2.5 ist [celery]worker_concurrency festgelegt auf 12 * Anzahl der Worker CPUs.

  • Maximale Anzahl aktiver DAG-Ausführungen

    Die Airflow-Konfigurationsoption [core]max_active_runs_per_dag steuert die maximale Anzahl aktiver DAG-Ausführungen pro DAG. Der Planer erstellt keine weiteren DAG-Ausführungen, wenn das Limit erreicht ist.

    Ist dieser Parameter falsch eingestellt, kann ein Problem auftreten, bei dem der Planer die DAG-Ausführung drosselt, da er zu einer bestimmten Zeit keine DAG-Ausführungsinstanzen mehr erstellen kann.

  • Max. aktive Aufgaben pro DAG

    Die Airflow-Konfigurationsoption [core]max_active_tasks_per_dag steuert die maximale Anzahl an Aufgabeninstanzen, die pro DAG gleichzeitig ausgeführt werden können. Dies ist ein Parameter auf DAG-Ebene.

    Ist dieser Parameter falsch festgelegt, kann ein Problem auftreten, bei dem die Ausführung einer einzelnen DAG-Instanz langsam läuft, da nur eine begrenzte Anzahl an DAG-Aufgaben zu einer bestimmten Zeit ausgeführt werden können.

    Lösung: [core]max_active_tasks_per_dag erhöhen.

  • Parallelität und Poolgröße

    Die Airflow-Konfigurationsoption [core]parallelism steuert, wie viele Aufgaben der Airflow-Planer in die Warteschlange des Executors stellen kann, nachdem alle Abhängigkeiten für diese Aufgaben erfüllt wurden.

    Dies ist ein globaler Parameter für die gesamte Airflow-Einrichtung.

    Aufgaben werden in die Warteschlange gestellt und in einem Pool ausgeführt. Cloud Composer-Umgebungen verwenden nur einen Pool. Die Größe dieses Pools steuert, wie viele Aufgaben der Planer in einem bestimmten Moment zur Ausführung in die Warteschlange stellen kann. Wenn die Poolgröße zu klein ist, kann der Planer Aufgaben nicht für die Ausführung in die Warteschlange stellen, auch wenn Grenzwerte, die durch die Konfigurationsoption [core]parallelism und die Konfigurationsoption [celery]worker_concurrency multipliziert mit der Anzahl der Airflow-Worker definiert sind, noch nicht erreicht sind.

    Sie können die Poolgröße in der Airflow-UI konfigurieren (Menü > Administrator > Pools). Passen Sie die Poolgröße an das Maß an Parallelität an, das Sie in Ihrer Umgebung erwarten.

    Normalerweise ist [core]parallelism ein Produkt der maximalen Anzahl von Workern und [celery]worker_concurrency.

DAGs werden aufgrund von Zeitüberschreitungen des DAG-Prozessors nicht vom Planer geplant

Weitere Informationen zu diesem Problem finden Sie unter Fehlerbehebung bei DAGs.

Aufgaben werden nach Erreichen von dagrun_timeout als fehlgeschlagen markiert

Der Planer markiert Aufgaben, die noch nicht abgeschlossen sind (ausgeführt, geplant und in die Warteschlange gestellt). als fehlgeschlagen, wenn eine DAG-Ausführung nicht innerhalb dagrun_timeout (ein DAG-Parameter).

Lösung:

Symptome einer Airflow-Datenbank, die unter Lastdruck steht

Manchmal sehen Sie in den Airflow-Planerlogs folgenden Warnlogeintrag:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Ähnliche Symptome können auch in Airflow-Worker-Logs auftreten:

Für MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Für PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Solche Fehler oder Warnungen können ein Hinweis darauf sein, dass die Airflow-Datenbank mit der Anzahl offener Verbindungen oder Abfragen überfordert. gleichzeitig von Planern oder anderen Airflow-Komponenten ausgeführt werden. wie Worker, Trigger und Webserver.

Mögliche Lösungen:

Der Webserver zeigt "Der Planer wird anscheinend nicht ausgeführt." Warnung

Der Planer meldet seinen Heartbeat regelmäßig an Airflow Datenbank. Anhand dieser Informationen bestimmt der Airflow-Webserver, Planer ist aktiv.

Wenn der Planer stark ausgelastet ist, kann er seinen Herzschlag melden alle [scheduler]scheduler-heartbeat-sec.

In einer solchen Situation zeigt der Airflow-Webserver möglicherweise die folgende Warnung an:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Mögliche Lösungen:

  • Erhöhen Sie die CPU und den Arbeitsspeicher der Ressourcen für den Planer.

  • Optimieren Sie Ihre DAGs, sodass deren Parsen und Planung schneller erfolgen zu viele Planerressourcen verbrauchen.

  • Vermeiden Sie die Verwendung globaler Variablen in Airflow-DAGs: Cloud Composer-Umgebungsvariablen und Airflow-Variablen.

  • Erhöhen Sie den Wert der [scheduler]scheduler-health-check-threshold damit der Webserver länger wartet, bevor er die Nichtverfügbarkeit den Planer.

Problemumgehungen für Probleme beim Backfill von DAGs

Manchmal möchten Sie DAGs, die bereits ausgeführt wurden, möglicherweise noch einmal ausführen. Mit dem Airflow-Befehlszeilentool können Sie dies so tun:

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

Wenn Sie nur fehlgeschlagene Aufgaben für einen bestimmten DAG noch einmal ausführen möchten, verwenden Sie den --rerun_failed_tasks-Argument.

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Wenn Sie nur fehlgeschlagene Aufgaben für einen bestimmten DAG noch einmal ausführen möchten, verwenden Sie den --rerun-failed-tasks-Argument.

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.
  • START_DATE durch einen Wert für den DAG-Parameter start_date in YYYY-MM-DD-Format.
  • END_DATE durch einen Wert für den DAG-Parameter end_date in YYYY-MM-DD-Format.
  • DAG_NAME durch den Namen des DAG.

Backfill-Vorgänge können manchmal zu einem Deadlock führen, in dem ein Backfill ist nicht möglich, da eine Aufgabe gesperrt ist. Beispiel:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

In einigen Fällen können Sie die folgenden Problemumgehungen verwenden, um Deadlocks zu überwinden:

  • Deaktivieren Sie den Miniplaner, indem Sie den [core]schedule-after-task-execution an False.

  • Führen Sie Backfills für kürzere Zeiträume aus. Legen Sie beispielsweise START_DATE fest. und END_DATE, um einen Zeitraum von nur 1 Tag festzulegen.

Nächste Schritte