Probleme mit der Aufgabenplanung beheben

Cloud Composer 1 Cloud Composer 2

In dieser Anleitung erfahren Sie, wie Sie Probleme bei der Aufgabenplanung und beim Parsen diagnostizieren und beheben, die zu Fehlfunktionen des Planers, Analysefehlern und Latenz sowie Aufgabenfehlern führen.

Einführung

Der Airflow-Planer wird hauptsächlich von zwei Faktoren beeinflusst: der Aufgabenplanung und dem DAG-Parsing. Probleme in einem dieser Faktoren können sich negativ auf den Zustand und die Leistung der Umgebung auswirken.

Manchmal werden zu viele Aufgaben gleichzeitig geplant. In diesem Fall ist die Warteschlange gefüllt und Aufgaben verbleiben im Status „Geplant“ oder werden neu geplant, nachdem sie in die Warteschlange gestellt wurden. Dies kann zu einem Aufgabenfehler und einer Leistungslatenz führen.

Ein weiteres häufiges Problem sind beim Parsen von Latenz und Fehlern, die durch die Komplexität eines DAG-Codes verursacht werden. Beispielsweise kann ein DAG-Code, der Airflow-Variablen auf oberster Codeebene enthält, zu Verzögerungen beim Parsen, einer Datenbanküberlastung, Planungsfehlern und DAG-Zeitüberschreitungen führen.

In dieser Anleitung diagnostizieren Sie die Beispiel-DAGs und erfahren, wie Sie Planungs- und Parsing-Probleme beheben, die DAG-Planung verbessern und Ihren DAG-Code und die Umgebungskonfigurationen optimieren, um die Leistung zu verbessern.

Lernziele

In diesem Abschnitt sind Ziele für Beispiele in dieser Anleitung aufgeführt.

Beispiel: Fehlfunktion des Planers und Latenz aufgrund hoher Nebenläufigkeit von Aufgaben

  • Laden Sie den Beispiel-DAG hoch, der mehrmals gleichzeitig ausgeführt wird, und diagnostizieren Sie mit Cloud Monitoring Fehlfunktionen und Latenzprobleme des Planers.

  • Optimieren Sie den DAG-Code, indem Sie die Aufgaben konsolidieren und die Auswirkungen auf die Leistung bewerten.

  • Verteilen Sie die Aufgaben gleichmäßiger im Zeitverlauf und bewerten Sie die Auswirkungen auf die Leistung.

  • Optimieren Sie Ihre Airflow-Konfigurationen sowie Umgebungskonfigurationen und bewerten Sie die Auswirkungen.

Beispiel: DAG-Parsing-Fehler und durch komplexen Code verursachte Latenz

  • Laden Sie den Beispiel-DAG mit Airflow-Variablen hoch und diagnostizieren Sie mit Cloud Monitoring Parsing-Probleme.

  • Optimieren Sie den DAG-Code, indem Sie Airflow-Variablen auf oberster Codeebene vermeiden und die Auswirkungen auf die Parsing-Zeit bewerten.

  • Optimieren Sie Airflow-Konfigurationen und Umgebungskonfigurationen und bewerten Sie die Auswirkungen auf die Parsing-Zeit.

Kosten

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

Nach Abschluss dieser Anleitung können Sie weitere Kosten durch Löschen von erstellten Ressourcen vermeiden. Weitere Informationen finden Sie im Hilfeartikel Bereinigen.

Hinweise

In diesem Abschnitt werden Aktionen beschrieben, die vor dem Start der Anleitung erforderlich sind.

Projekt erstellen und konfigurieren

Für diese Anleitung benötigen Sie ein Google Cloud-Projekt. Konfigurieren Sie das Projekt so:

  1. Wählen Sie ein Projekt in der Google Cloud Console aus oder erstellen Sie eines:

    Zur Projektauswahl

  2. Die Abrechnung für Ihr Projekt muss aktiviert sein. Hier erfahren Sie, wie Sie prüfen, ob die Abrechnung für ein Projekt aktiviert ist.

  3. Achten Sie darauf, dass der Nutzer Ihres Google Cloud-Projekts die folgenden Rollen hat, um die erforderlichen Ressourcen zu erstellen:

    • Administrator für Umgebungen und Storage-Objekte (roles/composer.environmentAndStorageObjectAdmin)
    • Compute-Administrator (roles/compute.admin)

Die APIs für Ihr Projekt aktivieren

Cloud Composer API aktivieren.

Aktivieren Sie die API

Cloud Composer-Umgebung erstellen

Erstellen Sie eine Cloud Composer 2-Umgebung.

Beim Erstellen der Umgebung weisen Sie dem Konto des Composer-Dienst-Agents die Rolle Dienst-Agent-Erweiterung für die Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext) zu. Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud-Projekt auszuführen.

Beispiel: Fehlfunktion des Planers und Aufgabenfehler aufgrund von Problemen bei der Aufgabenplanung

Dieses Beispiel zeigt das Debugging von Fehlfunktionen und Latenz des Planers, die durch eine hohe Nebenläufigkeit von Aufgaben verursacht wird.

Beispiel-DAG in Ihre Umgebung hochladen

Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG dag_10_tasks_200_seconds_1.

Dieser DAG hat 200 Aufgaben. Jede Aufgabe wartet 1 Sekunde lang und gibt „Abgeschlossen!“ aus. Der DAG wird nach dem Hochladen automatisch ausgelöst. Cloud Composer führt diesen DAG zehnmal aus, wobei alle DAG-Ausführungen parallel ausgeführt werden.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Fehlfunktion des Planers und Probleme mit Aufgabenfehlern diagnostizieren

Öffnen Sie nach Abschluss der DAG-Ausführung die Airflow-UI und klicken Sie auf den DAG dag_10_tasks_200_seconds_1. Sie werden feststellen, dass insgesamt 10 DAG-Ausführungen erfolgreich waren und jede davon 200 erfolgreiche Aufgaben umfasst.

Prüfen Sie die Airflow-Tasklogs:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Wechseln Sie zum Tab Logs und wählen Sie Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen aus.

Im Log-Histogramm sehen Sie die Fehler und Warnungen in Rot und Orange:

Das Histogramm der Airflow-Worker-Logs mit rot und orangefarbenen Fehlern und Warnungen
Abbildung 1. Histogramm zu Airflow-Worker-Logs (zum Vergrößern klicken)

Der Beispiel-DAG führte zu etwa 130 Warnungen und 60 Fehlern. Klicken Sie auf eine Spalte mit gelben und roten Balken. In den Logs werden einige der folgenden Warnungen und Fehler angezeigt:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Diese Logs können darauf hinweisen, dass die Ressourcennutzung die Limits überschritten hat und der Worker sich selbst neu gestartet hat.

Wenn eine Airflow-Aufgabe zu lange in der Warteschlange verbleibt, markiert der Planer sie als fehlgeschlagen und up_for_retry und plant sie noch einmal zur Ausführung neu. Eine Möglichkeit, die Symptome dieser Situation zu beobachten, besteht darin, sich das Diagramm mit der Anzahl der Aufgaben in der Warteschlange anzusehen. Wenn die Spitzen in diesem Diagramm nicht in etwa 10 Minuten abfallen, treten wahrscheinlich Fehler bei Aufgaben (ohne Logs) auf.

Prüfen Sie die Monitoring-Informationen:

  1. Rufen Sie den Tab Monitoring auf und wählen Sie Übersicht aus.

  2. Überprüfen Sie das Diagramm Airflow-Aufgaben.

    Das Diagramm der Airflow-Aufgaben im Zeitverlauf, das einen Anstieg der Anzahl der Aufgaben in der Warteschlange zeigt
    Abbildung 2: Airflow-Aufgabendiagramm (zum Vergrößern klicken)

    Im Airflow-Aufgabendiagramm gibt es eine Spitze bei den Aufgaben in der Warteschlange, die länger als 10 Minuten andauert. Dies kann bedeuten, dass in Ihrer Umgebung nicht genügend Ressourcen zur Verarbeitung aller geplanten Aufgaben vorhanden sind.

  3. Sehen Sie sich das Diagramm Aktive Worker an:

    Das Diagramm der aktiven Airflow-Worker im Zeitverlauf zeigt, dass die Anzahl der aktiven Worker bis zum Höchstwert erhöht wurde
    Abbildung 3: Grafik zu aktiven Workern (zum Vergrößern klicken)

    Das Diagramm Aktive Worker zeigt an, dass der DAG während der DAG-Ausführung das Autoscaling bis zur maximal zulässigen Anzahl von drei Workern ausgelöst hat.

  4. Diagramme zur Ressourcennutzung können darauf hinweisen, dass die Airflow-Worker nicht genügend Kapazität zum Ausführen von Aufgaben in der Warteschlange haben. Wählen Sie auf dem Tab Monitoring die Option Worker aus und sehen Sie sich die Diagramme Gesamte CPU-Nutzung pro Worker und Gesamte Worker-Arbeitsspeichernutzung an.

    Das Diagramm der CPU-Nutzung durch Airflow-Worker zeigt, wie die CPU-Nutzung bis zum Höchstlimit ansteigt
    Abbildung 4. Grafik zur CPU-Nutzung der gesamten Worker (zum Vergrößern klicken)
    Das Diagramm der Arbeitsspeichernutzung durch Airflow-Worker zeigt, wie die Arbeitsspeichernutzung steigt, aber das maximale Limit nicht erreicht hat
    Abbildung 5. Diagramm zur gesamten Arbeitsspeichernutzung der Worker (zum Vergrößern klicken)

    Die Diagramme zeigen, dass die gleichzeitige Ausführung von zu vielen Aufgaben zum Erreichen des CPU-Limits geführt hat. Die Ressourcen wurden mehr als 30 Minuten verwendet, was noch länger als die Gesamtdauer von 200 Aufgaben in 10 DAG-Ausführungen ist, die nacheinander ausgeführt werden.

Dies sind Indikatoren dafür, dass die Warteschlange gefüllt ist und nicht genügend Ressourcen für die Verarbeitung aller geplanten Aufgaben vorhanden sind.

Aufgaben zusammenfassen

Mit dem aktuellen Code werden viele DAGs und Aufgaben ohne ausreichende Ressourcen erstellt, um alle Aufgaben parallel zu verarbeiten. Dadurch wird die Warteschlange gefüllt. Wenn Aufgaben zu lange in der Warteschlange verbleiben, kann dies dazu führen, dass die Aufgaben verschoben werden oder fehlschlagen. In solchen Situationen sollten Sie sich für eine kleinere Anzahl konsolidierter Aufgaben entscheiden.

Der folgende Beispiel-DAG ändert die Anzahl der Aufgaben im ersten Beispiel von 200 auf 20 und erhöht die Wartezeit von 1 auf 10 Sekunden, um stärker konsolidierte Aufgaben zu imitieren, die die gleiche Menge an Arbeit ausführen.

Laden Sie den folgenden Beispiel-DAG in die von Ihnen erstellte Umgebung hoch. In dieser Anleitung heißt dieser DAG dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Bewerten Sie die Auswirkungen konsolidierter Aufgaben auf die Planung von Prozessen:

  1. Warten Sie, bis die DAG-Ausführungen abgeschlossen sind.

  2. Klicken Sie in der Airflow-UI auf der Seite DAGs auf den DAG dag_10_tasks_20_seconds_10. Sie sehen 10 DAG-Ausführungen, von denen jede 20 erfolgreiche Aufgaben umfasst.

  3. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  4. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  5. Wechseln Sie zum Tab Logs und wählen Sie Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen aus.

    Das zweite Beispiel mit konsolidierteren Aufgaben führte zu etwa 10 Warnungen und 7 Fehlern. Im Histogramm können Sie die Anzahl der Fehler und Warnungen im ersten Beispiel (frühere Werte) und im zweiten Beispiel (spätere Werte) vergleichen.

    Das Histogramm der Airflow-Worker-Logs mit Fehlern und Warnungen zeigt die geringere Anzahl von Fehlern und Warnungen nach der Konsolidierung von Aufgaben.
    Abbildung 6. Histogramm der Airflow-Worker-Logs nach der Konsolidierung der Aufgaben (zum Vergrößern klicken)

    Wenn Sie das erste mit dem stärker konsolidierten Beispiel vergleichen, sehen Sie, dass es im zweiten Beispiel deutlich weniger Fehler und Warnungen gibt. Die gleichen Fehler im Zusammenhang mit dem Warm-Herunterfahren werden jedoch aufgrund der Ressourcenüberlastung weiterhin in den Logs angezeigt.

  6. Wählen Sie auf dem Tab Monitoring die Option Worker aus und überprüfen Sie die Grafiken.

    Wenn Sie das Airflow-Aufgabendiagramm für das erste Beispiel (frühere Werte) mit dem Diagramm für das zweite Beispiel mit konsolidierteren Aufgaben vergleichen, sehen Sie, dass der Anstieg der Aufgaben in der Warteschlange für einen kürzeren Zeitraum andauerte, als die Aufgaben konsolidiert waren. Er dauerte jedoch fast 10 Minuten, was immer noch nicht optimal ist.

    Das Diagramm der Airflow-Aufgaben im Zeitverlauf zeigt, dass die Spitze der Airflow-Aufgaben kürzer als zuvor angehalten wurde.
    Abbildung 7. Grafik der Airflow-Aufgaben nach der Konsolidierung der Aufgaben (zum Vergrößern klicken)

    In der Grafik „Aktive Worker“ sehen Sie, dass das erste Beispiel (links in der Grafik) viel länger Ressourcen verwendet hat als das zweite Beispiel, obwohl beide Beispiele den gleichen Arbeitsumfang nachahmen.

    Das Diagramm der aktiven Airflow-Worker im Zeitverlauf zeigt, dass die Anzahl der aktiven Worker kürzer als zuvor erhöht wurde.
    Abbildung 8. Grafik zu aktiven Workern nach der Konsolidierung der Aufgaben (zum Vergrößern klicken)

    Sehen Sie sich die Diagramme zum Verbrauch der Worker-Ressourcen an. Obwohl der Unterschied zwischen den in diesem Beispiel mit konsolidierten Aufgaben und dem ursprünglichen Beispiel verwendeten Ressourcen erheblich ist, ist die CPU-Auslastung immer noch auf 70% des Limits angestiegen.

    Das Diagramm der CPU-Nutzung durch Airflow-Worker zeigt, dass die CPU-Nutzung um bis zu 70% des maximalen Limits steigt
    Abbildung 9. Diagramm zur CPU-Nutzung der Gesamtzahl der Worker nach der Konsolidierung der Aufgaben (zum Vergrößern klicken)
    Das Diagramm der Arbeitsspeichernutzung von Airflow-Workern zeigt, wie die Arbeitsspeichernutzung steigt, aber das maximale Limit nicht erreicht hat
    Abbildung 10. Diagramm zur gesamten Arbeitsspeichernutzung der Worker nach der Konsolidierung der Aufgaben (zum Vergrößern klicken)

Aufgaben gleichmäßiger im Zeitverlauf verteilen

Zu viele gleichzeitige Aufgaben führen dazu, dass die Warteschlange gefüllt wird, was dazu führt, dass Aufgaben in der Warteschlange hängen bleiben oder neu geplant werden. In den vorherigen Schritten haben Sie die Anzahl der Aufgaben durch Konsolidieren dieser Aufgaben verringert. Aus den Ausgabelogs und dem Monitoring ging jedoch hervor, dass die Anzahl der gleichzeitigen Aufgaben noch nicht optimal ist.

Sie können die Anzahl der gleichzeitigen Aufgabenausführungen steuern, indem Sie einen Zeitplan implementieren oder Limits dafür festlegen, wie viele Aufgaben gleichzeitig ausgeführt werden können.

In dieser Anleitung werden Aufgaben gleichmäßiger über einen Zeitraum verteilt. Dazu fügen Sie dem DAG dag_10_tasks_20_seconds_10 Parameter auf DAG-Ebene hinzu:

  1. Fügen Sie dem DAG-Kontextmanager das Argument max_active_runs=1 hinzu. Dieses Argument legt ein Limit von nur einer Instanz einer DAG-Ausführung zu einem bestimmten Zeitpunkt fest.

  2. Fügen Sie dem DAG-Kontextmanager das Argument max_active_tasks=5 hinzu. Dieses Argument steuert die maximale Anzahl von Aufgabeninstanzen, die in jedem DAG gleichzeitig ausgeführt werden können.

Laden Sie den folgenden Beispiel-DAG in die von Ihnen erstellte Umgebung hoch. In dieser Anleitung heißt dieser DAG dag_10_tasks_20_seconds_10_scheduled.py.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Bewerten Sie die Auswirkungen der Verteilung von Aufgaben im Zeitverlauf auf die Planung von Prozessen:

  1. Warten Sie, bis die DAG-Ausführungen abgeschlossen sind.

  2. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  3. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  4. Wechseln Sie zum Tab Logs und wählen Sie Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen aus.

  5. Im Histogramm können Sie sehen, dass der dritte DAG mit einer begrenzten Anzahl aktiver Aufgaben und Ausführungen keine Warnungen oder Fehler generiert hat und die Verteilung der Logs im Vergleich zu den vorherigen Werten gleichmäßiger aussieht.

    Das Histogramm der Airflow-Worker-Logs mit Fehlern und Warnungen zeigt keine Fehler oder Warnungen an, nachdem Aufgaben konsolidiert und über einen bestimmten Zeitraum verteilt wurden.
    Abbildung 11. Histogramm der Airflow-Worker-Logs, nachdem die Aufgaben konsolidiert und über die Zeit verteilt wurden (zum Vergrößern klicken)

Die Aufgaben im Beispiel dag_10_tasks_20_seconds_10_scheduled, die eine begrenzte Anzahl aktiver Aufgaben und Ausführungen haben, verursachten keine Ressourcenauslastung, da die Aufgaben gleichmäßig in die Warteschlange gestellt wurden.

Nachdem Sie die beschriebenen Schritte ausgeführt haben, haben Sie die Ressourcennutzung optimiert, indem Sie kleine Aufgaben konsolidiert und gleichmäßiger im Laufe der Zeit verteilt haben.

Umgebungskonfigurationen optimieren

Sie können Ihre Umgebungskonfigurationen so anpassen, dass die Airflow-Worker immer Kapazitäten zum Ausführen von Aufgaben in der Warteschlange haben.

Anzahl der Worker und Worker-Nebenläufigkeit

Sie können die maximale Anzahl von Workern anpassen, damit Cloud Composer Ihre Umgebung innerhalb der festgelegten Limits automatisch skaliert.

Der Parameter [celery]worker_concurrency definiert die maximale Anzahl von Aufgaben, die ein einzelner Worker aus der Aufgabenwarteschlange übernehmen kann. Wenn Sie diesen Parameter ändern, wird die Anzahl der Aufgaben angepasst, die ein einzelner Worker gleichzeitig ausführen kann. Sie können diese Airflow-Konfigurationsoption ändern, indem Sie sie überschreiben. Standardmäßig ist die Nebenläufigkeit von Workern auf einen der folgenden Mindestwerte festgelegt: 32, 12 * worker_CPU, 8 * worker_memory. Das bedeutet, dass sie von den Limits für Worker-Ressourcen abhängt. Weitere Informationen zu den Standard-Gleichzeitigkeitswerten für Worker finden Sie unter Umgebungen optimieren.

Die Anzahl der Worker und die Nebenläufigkeit der Worker hängen zusammen und die Leistung Ihrer Umgebung hängt stark von beiden Parametern ab. Beachten Sie die folgenden Hinweise zur Auswahl der richtigen Kombination:

  • Mehrere schnelle Tasks werden parallel ausgeführt. Sie können die Nebenläufigkeit von Workern erhöhen, wenn sich Aufgaben in der Warteschlange befinden und Ihre Worker gleichzeitig einen geringen Prozentsatz ihrer CPUs und Ihres Arbeitsspeichers verwenden. Unter bestimmten Umständen füllt sich die Warteschlange jedoch möglicherweise nie, sodass das Autoscaling nie ausgelöst wird. Wenn die Ausführung kleiner Aufgaben abgeschlossen ist, bevor die neuen Worker bereit sind, kann ein vorhandener Worker die verbleibenden Aufgaben übernehmen. Es gibt dann keine Aufgaben für neu erstellte Worker.

    In diesen Situationen wird empfohlen, die Mindestanzahl von Workern und gleichzeitig die Worker-Parallelität zu erhöhen, um eine übermäßige Skalierung zu vermeiden.

  • Mehrere lange Tasks werden parallel ausgeführt. Die hohe Worker-Nebenläufigkeit verhindert, dass das System die Anzahl der Worker skaliert. Wenn mehrere Aufgaben ressourcenintensiv sind und lange dauern, kann eine hohe Nebenläufigkeit von Workern dazu führen, dass die Warteschlange nie gefüllt wird und alle Aufgaben von nur einem Worker übernommen werden. Dies führt zu Leistungsproblemen. In diesen Situationen wird empfohlen, die maximale Anzahl von Workern zu erhöhen und gleichzeitig die Worker-Parallelität zu verringern.

Die Bedeutung der Parallelität

Airflow-Planer steuern die Planung von DAG-Ausführungen und einzelne Aufgaben aus DAGs. Die Airflow-Konfigurationsoption [core]parallelism steuert, wie viele Aufgaben der Airflow-Planer in der Warteschlange des Executors in die Warteschlange stellen kann, nachdem alle Abhängigkeiten für diese Aufgaben erfüllt sind.

Parallelität ist ein Schutzmechanismus von Airflow, der unabhängig von der Anzahl der Worker bestimmt, wie viele Aufgaben pro Planer gleichzeitig ausgeführt werden können. Der Wert der Parallelität multipliziert mit der Anzahl der Planer im Cluster gibt die maximale Anzahl von Aufgabeninstanzen an, die Ihre Umgebung in die Warteschlange stellen kann.

Normalerweise ist [core]parallelism das Produkt aus der maximalen Anzahl von Workern und [celery]worker_concurrency. Sie ist auch vom Pool betroffen. Sie können diese Airflow-Konfigurationsoption ändern, indem Sie sie überschreiben. Weitere Informationen zum Anpassen von Airflow-Konfigurationen für die Skalierung finden Sie unter Airflow-Konfiguration skalieren.

Optimale Umgebungskonfigurationen ermitteln

Die empfohlene Methode zur Behebung von Planungsproblemen besteht darin, kleine Aufgaben in größere Aufgaben zu konsolidieren und die Aufgaben gleichmäßiger im Zeitverlauf zu verteilen. Neben der Optimierung von DAG-Code können Sie auch Umgebungskonfigurationen so optimieren, dass eine ausreichende Kapazität zum gleichzeitigen Ausführen mehrerer Aufgaben vorhanden ist.

Angenommen, Sie konsolidieren Aufgaben in Ihrem DAG so weit wie möglich, aber es ist für Ihren speziellen Anwendungsfall keine bevorzugte Lösung, die aktiven Aufgaben auf eine gleichmäßigere Verteilung über einen bestimmten Zeitraum zu beschränken.

Sie können die Parallelität, die Anzahl der Worker und die Worker-Nebenläufigkeitsparameter anpassen, um den DAG dag_10_tasks_20_seconds_10 auszuführen, ohne aktive Aufgaben einzuschränken. In diesem Beispiel wird ein DAG zehnmal ausgeführt und jede Ausführung enthält 20 kleine Aufgaben. So können Sie sie alle gleichzeitig ausführen:

  • Sie benötigen eine größere Umgebungsgröße, da sie die Leistungsparameter der verwalteten Cloud Composer-Infrastruktur Ihrer Umgebung steuert.

  • Airflow-Worker müssen 20 Aufgaben gleichzeitig ausführen können. Dies bedeutet, dass Sie die Worker-Parallelität auf 20 festlegen müssen.

  • Die Worker benötigen genügend CPU und Arbeitsspeicher, um alle Aufgaben zu erledigen. Die Nebenläufigkeit der Worker wird von der CPU und dem Arbeitsspeicher der Worker beeinflusst. Daher benötigen Sie mindestens worker_concurrency / 12 in der CPU und least worker_concurrency / 8 im Arbeitsspeicher.

  • Sie müssen die Parallelität erhöhen, um der höheren Parallelität der Worker zu entsprechen. Damit Worker 20 Aufgaben aus der Warteschlange übernehmen können, muss der Planer diese 20 Aufgaben zuerst planen.

Passen Sie die Umgebungskonfigurationen so an:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Suchen Sie die Konfiguration unter Ressourcen > Arbeitslasten und klicken Sie auf Bearbeiten.

  5. Geben Sie im Abschnitt Worker im Feld Arbeitsspeicher das neue Arbeitsspeicherlimit für Airflow-Worker an. In dieser Anleitung werden 4 GB genutzt.

  6. Geben Sie im Feld CPU das neue CPU-Limit für Airflow-Worker an. Verwenden Sie in dieser Anleitung 2 vCPUs.

  7. Speichern Sie die Änderungen und warten Sie einige Minuten, bis Ihre Airflow-Worker neu gestartet sind.

Als Nächstes überschreiben Sie die Airflow-Konfigurationsoptionen für Parallelität und Worker-Parallelität:

  1. Rufen Sie den Tab Airflow-Konfigurationsüberschreibungen auf.

  2. Klicken Sie auf Bearbeiten und dann auf Airflow-Konfigurationsüberschreibung hinzufügen.

  3. Überschreiben Sie die Konfiguration des Parralismus:

    Bereich Schlüssel Wert
    core parallelism 20
  4. Klicken Sie auf Airflow-Konfigurationsüberschreibung hinzufügen und überschreiben Sie die Konfiguration der Nebenläufigkeit der Worker:

    Bereich Schlüssel Wert
    celery worker_concurrency 20
  5. Klicken Sie auf Speichern und warten Sie, bis die Konfiguration der Umgebung aktualisiert wurde.

Lösen Sie denselben Beispiel-DAG noch einmal mit den angepassten Konfigurationen aus:

  1. Rufen Sie in der Airflow-UI die Seite DAGs auf.

  2. Suchen Sie den DAG dag_10_tasks_20_seconds_10 und löschen Sie ihn.

    Nachdem der DAG gelöscht wurde, prüft Airflow den DAGs-Ordner im Bucket Ihrer Umgebung und führt den DAG automatisch noch einmal aus.

Sehen Sie sich nach Abschluss der DAG-Ausführungen das Log-Histogramm noch einmal an. Im Diagramm können Sie sehen, dass das Beispiel dag_10_tasks_20_seconds_10 mit konsolidierteren Aufgaben beim Ausführen mit der angepassten Umgebungskonfiguration keine Fehler und Warnungen generiert hat. Vergleichen Sie die Ergebnisse mit den vorherigen Daten im Diagramm, in denen dasselbe Beispiel beim Ausführen mit der tge-Standardumgebungskonfiguration Fehler und Warnungen erzeugt hat.

Das Histogramm der Airflow-Worker-Logs mit Fehlern und Warnungen zeigt keine Fehler und Warnungen an, nachdem die Umgebungskonfiguration angepasst wurde
Abbildung 12. Histogramm der Airflow-Worker-Logs nach Anpassung der Umgebungskonfiguration (zum Vergrößern klicken)

Umgebungs- und Airflow-Konfigurationen spielen bei der Aufgabenplanung eine entscheidende Rolle. Es ist jedoch nicht möglich, die Konfigurationen über bestimmte Limits hinaus zu erhöhen.

Wir empfehlen, den DAG-Code zu optimieren, Aufgaben zu konsolidieren und mit Planung die Leistung und Effizienz zu optimieren.

Beispiel: DAG-Parsing-Fehler und Latenz aufgrund von komplexem DAG-Code

In diesem Beispiel untersuchen Sie die Parsing-Latenz eines Beispiel-DAG, der zu viele Airflow-Variablen imitiert.

Neue Airflow-Variable erstellen

Erstellen Sie eine neue Airflow-Variable, bevor Sie den Beispielcode hochladen.

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Klicken Sie auf Admin > Variablen > Neuen Eintrag hinzufügen.

  4. Stellen Sie folgende Werte ein:

    • Schlüssel: example_var
    • Wert: test_airflow_variable

Beispiel-DAG in Ihre Umgebung hochladen

Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG dag_for_loop_airflow_variable.

Dieser DAG enthält eine for-Schleife,die 1.000 Mal ausgeführt wird und einen Überschuss an Airflow-Variablen imitiert. Bei jedem Durchlauf wird die Variable example_var gelesen und eine Aufgabe generiert. Jede Aufgabe enthält einen Befehl, der den Wert der Variablen ausgibt.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Parsing-Probleme diagnostizieren

Die DAG-Analysezeit ist die Zeit, die der Airflow-Planer benötigt, um eine DAG-Datei zu lesen und zu parsen. Bevor der Airflow-Planer eine Aufgabe aus einem DAG planen kann, muss der Planer die DAG-Datei parsen, um die Struktur des DAG und der definierten Aufgaben zu ermitteln.

Wenn das Parsen eines DAG lange dauert, wird die Kapazität des Planers verbraucht und die Leistung der DAG-Ausführungen verringert sich.

So überwachen Sie die DAG-Analysezeit:

  1. Führen Sie den dags report Airflow-Befehlszeilenbefehl in der gcloud CLI aus, um die Parsing-Zeit für alle Ihre DAGs anzuzeigen:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Ersetzen Sie Folgendes:

    • ENVIRONMENT_NAME: der Name Ihrer Umgebung
    • LOCATION: Die Region, in der sich die Umgebung befindet.
  2. Suchen Sie in der Ausgabe des Befehls nach dem Wert für die Dauer des DAG dag_for_loop_airflow_variables. Ein hoher Wert kann darauf hinweisen, dass dieser DAG nicht optimal implementiert ist. Wenn Sie mehrere DAGs haben, können Sie anhand der Ausgabetabelle feststellen, für welche DAGs eine lange Parsing-Zeit hat.

    Beispiel:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Prüfen Sie die DAG-Analysezeiten in der Google Cloud Console:

    1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  4. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  5. Wechseln Sie zum Tab Logs und dann zu Alle Logs > DAG-Prozessormanager.

  6. Prüfen Sie dag-processor-manager-Logs und identifizieren Sie mögliche Probleme.

    Ein Protokolleintrag für den Beispiel-DAG zeigt, dass die DAG-Analysezeit 46,3 Sekunden beträgt
    Abbildung 13. In Logs des DAG-Prozessormanagers werden die DAG-Analysezeiten angezeigt (zum Vergrößern klicken).

Wenn die DAG-Analyse insgesamt etwa 10 Sekunden überschreitet, sind Ihre Planer möglicherweise mit der DAG-Analyse überlastet und können DAGs nicht effektiv ausführen.

DAG-Code optimieren

Es wird empfohlen, unnötigen Python-Code der obersten Ebene in Ihren DAGs zu vermeiden. DAGs mit vielen Importen, Variablen und Funktionen außerhalb des DAG führen zu längeren Parsing-Zeiten für den Airflow-Planer. Dies reduziert die Leistung und Skalierbarkeit von Cloud Composer und Airflow. Wenn zu viele Airflow-Variablen gelesen werden, dauert das Parsen und die Datenbanklast hoch. Wenn sich dieser Code in einer DAG-Datei befindet, werden diese Funktionen bei jedem Planerherzschlag ausgeführt, was möglicherweise langsam ist.

Mit den Vorlagenfeldern von Airflow können Sie Werte aus Airflow-Variablen und Jinja-Vorlagen in Ihre DAGs einbinden. Dies verhindert eine unnötige Funktionsausführung während Planerherzschlägen.

Vermeiden Sie die Verwendung von Airflow-Variablen im Python-Code der obersten Ebene von DAGs, um das DAG-Beispiel besser zu implementieren. Übergeben Sie stattdessen Airflow-Variablen über eine Jinja-Vorlage an vorhandene Operatoren, wodurch das Lesen des Werts bis zur Ausführung der Aufgabe verzögert wird.

Laden Sie die neue Version des Beispiel-DAG in Ihre Umgebung hoch. In dieser Anleitung heißt dieser DAG dag_for_loop_airflow_variable_optimized.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Prüfen Sie die neue DAG-Analysezeit:

  1. Warten Sie, bis die DAG-Ausführung abgeschlossen ist.

  2. Führen Sie den Befehl dags report noch einmal aus, um die Parsing-Zeit für alle Ihre DAGs zu sehen:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Sehen Sie sich die dag-processor-manager-Logs noch einmal an und analysieren Sie die Analysedauer.

    Ein Logeintrag für den Beispiel-DAG zeigt, dass die DAG-Analysezeit 4,21 Sekunden beträgt
    Abbildung 14. Logs des DAG-Prozessormanagers zeigen die DAG-Analysezeiten nach der Optimierung des DAG-Codes an (zum Vergrößern klicken).

Durch das Ersetzen der Umgebungsvariablen durch Airflow-Vorlagen haben Sie den DAG-Code vereinfacht und die Parsing-Latenz um das Zehnfache reduziert.

Airflow-Umgebungskonfigurationen optimieren

Der Airflow-Planer versucht ständig, neue Aufgaben auszulösen und alle DAGs in Ihrem Umgebungs-Bucket zu parsen. Wenn das Parsen Ihrer DAGs lange dauert und der Planer viele Ressourcen beansprucht, können Sie die Konfigurationen des Airflow-Planers optimieren, damit der Planer Ressourcen effizienter nutzen kann.

In dieser Anleitung benötigen die DAG-Dateien viel Zeit zum Parsen. Die Parsing-Zyklen beginnen sich zu überschneiden, wodurch dann die Kapazität des Planers erschöpft ist. In unserem Beispiel dauert das Parsen des ersten Beispiel-DAG länger als 5 Sekunden. Daher konfigurieren Sie den Planer so, dass er seltener ausgeführt wird, um Ressourcen effizienter zu nutzen. Sie überschreiben die Airflow-Konfigurationsoption scheduler_heartbeat_sec. Diese Konfiguration definiert, wie oft der Planer ausgeführt werden soll (in Sekunden). Standardmäßig ist der Wert auf 5 Sekunden eingestellt. Sie können diese Airflow-Konfigurationsoption ändern, indem Sie sie überschreiben.

Überschreiben Sie die Airflow-Konfigurationsoption scheduler_heartbeat_sec:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Airflow-Konfigurationsüberschreibungen auf.

  4. Klicken Sie auf Bearbeiten und dann auf Airflow-Konfigurationsüberschreibung hinzufügen.

  5. Überschreiben Sie die Airflow-Konfigurationsoption:

    Bereich Schlüssel Wert
    scheduler scheduler_heartbeat_sec 10
  6. Klicken Sie auf Speichern und warten Sie, bis die Konfiguration der Umgebung aktualisiert wurde.

Prüfen Sie die Planermesswerte:

  1. Wechseln Sie zum Tab Monitoring und wählen Sie Planer aus.

  2. Klicken Sie im Diagramm Scheduler Heartbeat auf die Schaltfläche Weitere Optionen (drei Punkte) und dann auf Im Metrics Explorer ansehen.

Das Diagramm des Planers zeigt, dass der Heartbeat seltener auftritt.
Abbildung 15. Heartbeat-Diagramm des Planers (zum Vergrößern klicken)

In der Grafik sehen Sie, dass der Planer zweimal seltener ausgeführt wird, nachdem Sie die Standardkonfiguration von 5 Sekunden auf 10 Sekunden geändert haben. Durch Verringern der Häufigkeit von Herzschlägen sorgen Sie dafür, dass der Planer nicht ausgeführt wird, während der vorherige Parsing-Zyklus noch läuft und die Ressourcenkapazität des Planers nicht erschöpft ist.

Dem Planer weitere Ressourcen zuweisen

In Cloud Composer 2 können Sie dem Planer mehr CPU- und Arbeitsspeicherressourcen zuweisen. Auf diese Weise können Sie die Leistung Ihres Planers erhöhen und die Parsing-Zeit für Ihren DAG beschleunigen.

Weisen Sie dem Planer zusätzliche CPU und Arbeitsspeicher zu:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Suchen Sie die Konfiguration unter Ressourcen > Arbeitslasten und klicken Sie auf Bearbeiten.

  5. Geben Sie im Abschnitt Planer im Feld Arbeitsspeicher das neue Arbeitsspeicherlimit an. In dieser Anleitung werden 4 GB genutzt.

  6. Geben Sie im Feld CPU das neue CPU-Limit an. Verwenden Sie in dieser Anleitung 2 vCPUs.

  7. Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Planer neu gestartet werden.

  8. Wechseln Sie zum Tab Logs und dann zu Alle Logs > DAG-Prozessormanager.

  9. Überprüfen Sie dag-processor-manager-Logs und vergleichen Sie die Parsing-Dauer für die Beispiel-DAGs:

    Ein Protokolleintrag für den Beispiel-DAG zeigt, dass die DAG-Analysezeit für den optimierten DAG 1,5 Sekunden beträgt. Für den nicht optimierten DAG beträgt die Parsing-Zeit 28, 71 Sekunden
    Abbildung 16. Logs des DAG-Prozessormanagers zeigen die DAG-Analysezeiten an, nachdem dem Planer weitere Ressourcen zugewiesen wurden (zum Vergrößern klicken).

Indem Sie dem Planer weitere Ressourcen zuweisen, haben Sie die Kapazität des Planers erhöht und die Parsing-Latenz im Vergleich zu den Konfigurationen der Standardumgebung erheblich reduziert. Mit mehr Ressourcen kann der Planer die DAGs schneller parsen. Allerdings steigen die mit Cloud Composer-Ressourcen verbundenen Kosten. Außerdem ist es nicht möglich, die Ressourcenanzahl über ein bestimmtes Limit hinaus zu erhöhen.

Wir empfehlen, Ressourcen erst zuzuweisen, nachdem die möglichen Optimierungen für DAG-Code und Airflow-Konfiguration implementiert wurden.

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder behalten Sie das Projekt bei und löschen Sie die einzelnen Ressourcen.

Projekt löschen

  1. Wechseln Sie in der Google Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Einzelne Ressourcen löschen

Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.

Löschen Sie die Cloud Composer-Umgebung. Bei diesem Vorgang löschen Sie auch den Bucket der Umgebung.

Nächste Schritte