Probleme bei der Aufgabenplanung beheben

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Diese Anleitung führt Sie durch die Diagnose und Fehlerbehebung bei der Aufgabenplanung und Parsing-Probleme, die zu Fehlfunktionen des Planers, Parsing-Fehlern Latenz und Aufgabenfehlern.

Einleitung

Der Airflow-Planer wird hauptsächlich von zwei Faktoren beeinflusst: Aufgabenplanung und DAG-Parsing. Probleme in einem dieser Faktoren können sich negativ auf das Zustand und Leistung der Umwelt.

Manchmal werden zu viele Aufgaben gleichzeitig geplant. In dieser Situation die Warteschlange ist voll und die Aufgaben verbleiben im Zeitplan "geplant" oder neu geplant, nachdem sie in die Warteschlange gestellt wurde, was zu einem Aufgabenfehler und einer höheren Leistung führen kann Latenz.

Ein weiteres häufiges Problem sind die Parsing-Latenz und Fehler, die durch die Komplexität einen DAG-Code. Beispiel: Ein DAG-Code mit Airflow-Variablen am Anfang Codeebene kann zu Verzögerungen beim Parsen, Datenbanküberlastung, Planung Fehler und DAG-Zeitüberschreitungen.

In dieser Anleitung diagnostizieren Sie die Beispiel-DAGs und erfahren, wie Sie Probleme mit der Planung und dem Parsen beheben, die DAG-Planung verbessern und Ihren DAG-Code und Ihre Umgebungskonfigurationen optimieren, um die Leistung zu verbessern.

Lernziele

In diesem Abschnitt werden Ziele als Beispiele in dieser Anleitung aufgeführt.

Beispiel: Fehlfunktion des Schedulers und Latenz aufgrund hoher Aufgabenparallelität

  • Beispiel-DAG hochladen, der mehrmals gleichzeitig ausgeführt wird, und Diagnose stellen Planerfehlfunktion und Latenzprobleme bei Cloud Monitoring.

  • Optimieren Sie Ihren DAG-Code, indem Sie die Aufgaben konsolidieren, und bewerten Sie die Auswirkungen auf die Leistung.

  • Verteilen Sie die Aufgaben gleichmäßiger über die Zeit und bewerten Sie die Auswirkungen auf die Leistung.

  • Optimieren Sie Ihre Airflow- und Umgebungskonfigurationen und bewerten Sie die Auswirkungen.

Beispiel: DAG-Parsingfehler und Latenz durch komplexen Code

  • Beispiel-DAG mit Airflow-Variablen hochladen und Parsing-Fehler diagnostizieren Probleme mit Cloud Monitoring.

  • Optimieren Sie den DAG-Code, indem Sie Airflow-Variablen auf oberster Ebene von und die Auswirkungen auf die Parsing-Zeit zu bewerten.

  • Optimieren Sie Airflow- und Umgebungskonfigurationen und bewerten Sie die Auswirkungen auf die Parsezeit.

Kosten

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

Nach Abschluss dieser Anleitung können Sie weitere Kosten durch Löschen von erstellten Ressourcen vermeiden. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

In diesem Abschnitt werden die Aktionen beschrieben, die vor dem Start der Anleitung erforderlich sind.

Projekt erstellen und konfigurieren

Für diese Anleitung benötigen Sie eine Google Cloud Projekt erstellen. Konfigurieren Sie das Projekt so:

  1. Wählen Sie in der Google Cloud Console ein Projekt aus oder erstellen Sie ein Projekt.

    Zur Projektauswahl

  2. Die Abrechnung für Ihr Projekt muss aktiviert sein. Hier erfahren Sie, wie Sie prüfen, ob die Abrechnung für ein Projekt aktiviert ist.

  3. Sorgen Sie dafür, dass Ihr Google Cloud-Projektnutzer die folgenden Rollen hat, um die erforderlichen Ressourcen zu erstellen:

    • Administrator für Umgebung und Storage-Objekte (roles/composer.environmentAndStorageObjectAdmin)
    • Compute-Administrator (roles/compute.admin)

Die APIs für Ihr Projekt aktivieren

Enable the Cloud Composer API.

Enable the API

Cloud Composer-Umgebung erstellen

Erstellen Sie eine Cloud Composer 2-Umgebung.

Beim Erstellen der Umgebung gewähren Sie dem Konto des Composer-Dienst-Agents die Rolle Dienst-Agent-Erweiterung für die Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext). Cloud Composer verwendet dieses Konto, um Vorgänge auszuführen in Ihrem Google Cloud-Projekt.

Beispiel: Fehlfunktion des Planers und Aufgabenfehler aufgrund von Problemen bei der Aufgabenplanung

Dieses Beispiel zeigt Fehlfunktionen des Debugging-Planers und Latenz, durch hohe Nebenläufigkeit von Aufgaben.

Beispiel-DAG in Ihre Umgebung hochladen

Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG dag_10_tasks_200_seconds_1

Dieser DAG hat 200 Aufgaben. Jede Aufgabe wartet 1 Sekunde und gibt „Abgeschlossen!“ aus. Der DAG wird nach dem Hochladen automatisch ausgelöst. Cloud Composer führt diesen DAG zehnmal aus und alle DAG-Ausführungen finden parallel statt.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Fehlfunktionen des Planers und Probleme mit Aufgabenfehlern diagnostizieren

Öffnen Sie nach Abschluss der DAG-Ausführung die Airflow-UI und klicken Sie auf dag_10_tasks_200_seconds_1-DAG. Sie sehen, dass insgesamt 10 DAG-Ausführungen erfolgreich waren und dass bei jeder 200 Aufgaben erfolgreich waren.

Prüfen Sie die Airflow-Aufgabenprotokolle:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Protokolle auf und klicken Sie dann auf Alle Protokolle > Airflow-Protokolle > Worker > Im Log-Explorer ansehen.

Im Log-Histogramm sind die rot markierten Fehler und Warnungen zu sehen. und orangefarbene Farben:

Histogramm der Airflow-Worker-Protokolle mit Fehlern und Warnungen in roter und orangefarbener Schrift
Abbildung 1: Histogramm der Airflow-Worker-Logs (zum Vergrößern anklicken)

Der Beispiel-DAG führte zu etwa 130 Warnungen und 60 Fehlern. Klicken Sie auf eine beliebige die gelbe und rote Balken enthält. Sie sehen einige der folgenden Optionen Warnungen und Fehlermeldungen:

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

Diese Protokolle können darauf hinweisen, dass die Ressourcennutzung die Limits überschritten hat und der Worker neu gestartet wurde.

Wenn eine Airflow-Aufgabe zu lange in der Warteschlange verbleibt, kennzeichnet der Scheduler sie als fehlgeschlagen und als „up_for_retry“ (Wiederholung möglich) und plant sie noch einmal für die Ausführung. Eine Möglichkeit, die Symptome dieser Situation zu beobachten, besteht darin, sich das Diagramm mit der Anzahl der Aufgaben in der Warteschlange anzusehen. Wenn die Spitzen in diesem Diagramm nach etwa 10 Minuten nicht abfallen, kommt es wahrscheinlich zu Aufgabenausfällen (ohne Protokolle).

Prüfen Sie die Monitoring-Informationen:

  1. Rufen Sie den Tab Monitoring auf und wählen Sie Übersicht aus.

  2. Sehen Sie sich das Diagramm Airflow-Tasks an.

    <ph type="x-smartling-placeholder"></ph> Das Diagramm der Airflow-Aufgaben im Zeitverlauf mit einer Spitze im
    Anzahl der Aufgaben in der Warteschlange
    Abbildung 2: Grafik der Airflow-Aufgaben (zum Vergrößern klicken)

    Im Diagramm „Airflow-Aufgaben“ ist ein Anstieg der Anzahl der Aufgaben in der Warteschlange zu sehen, der länger als 10 Minuten anhält. Das kann bedeuten, dass in Ihrer Umgebung nicht genügend Ressourcen vorhanden sind, um alle geplanten Aufgaben zu verarbeiten.

  3. Sehen Sie sich das Diagramm Aktive Worker an:

    <ph type="x-smartling-placeholder"></ph> Das Diagramm mit den aktiven Airflow-Workern im Zeitverlauf zeigt, dass die Anzahl der aktiven Worker auf die maximale Anzahl erhöht wurde.
    Abbildung 3: Diagramm zu aktiven Workern (zum Vergrößern klicken)

    Das Diagramm Aktive Worker zeigt an, dass der DAG das Autoscaling ausgelöst hat. während der DAG-Ausführung auf das maximal zulässige Limit von drei Workern begrenzt.

  4. Diagramme zur Ressourcennutzung können darauf hinweisen, dass die Airflow-Worker nicht genügend Kapazität haben, um anstehende Aufgaben auszuführen. Wählen Sie auf dem Tab Monitoring die Option Workers und Sehen Sie sich die Diagramme Gesamte Worker-CPU-Nutzung und Gesamte Worker-Arbeitsspeichernutzung an.

    Das Diagramm der CPU-Nutzung durch Airflow-Worker zeigt die CPU-Nutzung
    Ansteigend bis zur Höchstgrenze
    Abbildung 4: Diagramm zur CPU-Nutzung der gesamten Worker (zum Vergrößern klicken)
    Das Diagramm der Arbeitsspeichernutzung durch Airflow-Worker zeigt die Arbeitsspeichernutzung
    steigt an, aber erreicht nicht den Höchstwert
    Abbildung 5: Diagramm zur Arbeitsspeichernutzung der gesamten Worker (zum Vergrößern klicken)

    Die Diagramme zeigen, dass zu viele Aufgaben gleichzeitig ausgeführt werden. das CPU-Limit erreicht hat. Die Ressourcen wurden über 30 Minuten lang verwendet, was länger ist als die Gesamtdauer von 200 Aufgaben in 10 DAG-Ausführungen, die nacheinander ausgeführt wurden.

Dies sind die Indikatoren für das Auffüllen der Warteschlange und das Fehlen von Ressourcen. um alle geplanten Aufgaben zu verarbeiten.

Aufgaben konsolidieren

Der aktuelle Code erstellt viele DAGs und Aufgaben, für die nicht genügend Ressourcen vorhanden sind, um alle Aufgaben parallel zu verarbeiten. Das führt dazu, dass die Warteschlange voll ist. Wenn Aufgaben zu lange in der Warteschlange aufbewahrt werden, werden Aufgaben möglicherweise verschoben oder fehlschlagen. In solchen Fällen sollten Sie eine kleinere Anzahl von konsolidierten Aufgaben.

Im folgenden Beispiel-DAG wird die Anzahl der Aufgaben im ursprünglichen Beispiel von 200 auf 20 geändert und die Wartezeit von 1 auf 10 Sekunden erhöht, um mehr konsolidierte Aufgaben zu simulieren, die dieselbe Arbeitsmenge ausführen.

Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch. die Sie erstellt haben. In dieser Anleitung heißt dieser DAG dag_10_tasks_20_seconds_10.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Bewerten Sie die Auswirkungen konsolidierter Aufgaben auf die Planungsprozesse:

  1. Warten Sie, bis die DAG-Ausführungen abgeschlossen sind.

  2. Klicken Sie in der Airflow-Benutzeroberfläche auf der Seite DAGs auf den DAG dag_10_tasks_20_seconds_10. Sie sehen zehn DAG-Ausführungen mit jeweils 20 Aufgaben, die erfolgreich waren.

  3. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  4. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  5. Rufen Sie den Tab Protokolle auf und klicken Sie dann auf Alle Protokolle > Airflow-Protokolle > Worker > Im Log-Explorer ansehen.

    Das zweite Beispiel mit stärker konsolidierten Aufgaben führte zu etwa 10 Warnungen und 7 Fehler Im Histogramm können Sie die Anzahl der Fehler und Warnungen im ersten Beispiel (frühere Werte) mit dem zweiten Beispiel (spätere Werte) vergleichen.

    Das Histogramm der Airflow-Worker-Logs mit Fehlern und Warnungen zeigt die geringere Anzahl von Fehlern und Warnungen nach der Zusammenführung von Aufgaben.
    Abbildung 6: Histogramm der Airflow-Worker-Logs nach dem Aufgaben wurden konsolidiert (zum Vergrößern klicken)

    Wenn Sie das erste Beispiel mit dem konsolidierten Beispiel vergleichen, sehen Sie, dass im zweiten Beispiel deutlich weniger Fehler und Warnungen auftreten. Die gleichen Fehler in Bezug auf das Abschalten mit Warmwasser treten jedoch weiterhin in die Logs aufgrund von Ressourcenüberlastung.

  6. Wählen Sie auf dem Tab Monitoring die Option Worker aus und sehen Sie sich die Diagramme an.

    Wenn Sie das Diagramm für die Airflow-Aufgaben für das erste Beispiel (frühere Werte) mit dem Diagramm für das zweite Beispiel mit mehr konsolidierten Aufgaben vergleichen, sehen Sie, dass der Anstieg der Aufgaben in der Warteschlange bei konsolidierten Aufgaben kürzer war. Sie dauerte jedoch fast 10 Minuten, was immer noch nicht optimal ist.

    Das Diagramm mit den Airflow-Aufgaben im Zeitverlauf zeigt, dass der Anstieg der Airflow-Aufgaben kürzer als zuvor war.
    Abbildung 7. Diagramm der Airflow-Aufgaben nach der Zusammenführung der Aufgaben (zum Vergrößern klicken)

    Im Diagramm mit den aktiven Workern sehen Sie das erste Beispiel (links) Seite des Diagramms) viel länger Ressourcen verwendet haben. als das zweite, obwohl beide Beispiele die gleiche Menge an Informationen arbeiten.

    <ph type="x-smartling-placeholder"></ph> Das Diagramm der aktiven Airflow-Worker im Zeitverlauf zeigt,
    Anzahl der aktiven Beschäftigten wurde für einen kürzeren Zeitraum erhöht
    als zuvor.
    Abbildung 8: Diagramm zu aktiven Workern nach der Zusammenführung von Aufgaben (zum Vergrößern anklicken)

    Sehen Sie sich die Diagramme zur Worker-Ressourcennutzung an. Obwohl der Unterschied zwischen den Ressourcen, die im Beispiel mit mehr konsolidierten Aufgaben verwendet werden, und dem ursprünglichen Beispiel recht groß ist, steigt die CPU-Auslastung immer noch auf 70 % des Limits.

    <ph type="x-smartling-placeholder"></ph> Das Diagramm der CPU-Nutzung durch Airflow-Worker zeigt die CPU-Nutzung
    eine Erhöhung um bis zu 70% des Höchstwerts
    Abbildung 9. Grafik zur CPU-Nutzung insgesamt nach dem Aufgaben wurden konsolidiert (zum Vergrößern klicken)
    Das Diagramm der Arbeitsspeichernutzung durch Airflow-Worker zeigt, dass die Arbeitsspeichernutzung zunimmt, aber das Limit nicht erreicht
    Abbildung 10: Diagramm zur Arbeitsspeichernutzung der gesamten Worker nach dem Die Aufgaben wurden konsolidiert (zum Vergrößern klicken)

Aufgaben gleichmäßiger über einen längeren Zeitraum verteilen

Zu viele gleichzeitige Aufgaben führen dazu, dass die Warteschlange voll ist. Das wiederum kann dazu führen, dass Aufgaben in der Warteschlange hängen bleiben oder verschoben werden. In den vorherigen Schritten durch die Konsolidierung dieser Aufgaben die Anzahl der Aufgaben verringert. Die Ausgabe Logs und Monitoring anzeigten, dass die Anzahl gleichzeitiger Aufgaben weiterhin nicht optimal ist.

Sie können die Anzahl der gleichzeitigen Aufgabenausführungen steuern, indem Sie einen Zeitplan implementieren oder Limits dafür festlegen, wie viele Aufgaben gleichzeitig ausgeführt werden können.

In dieser Anleitung verteilen Sie Aufgaben gleichmäßiger über einen Zeitraum, indem Sie Parameter auf DAG-Ebene im DAG dag_10_tasks_20_seconds_10:

  1. Fügen Sie dem DAG-Kontextmanager das Argument max_active_runs=1 hinzu. Dieses Argument legt ein Limit von nur einer Instanz einer DAG-Ausführung fest, die in einem bestimmten Moment ausgeführt wird.

  2. Fügen Sie dem DAG-Kontextmanager das Argument max_active_tasks=5 hinzu. Mit diesem Argument wird die maximale Anzahl von Aufgabeninstanzen gesteuert, die in jedem DAG gleichzeitig ausgeführt werden können.

Laden Sie den folgenden Beispiel-DAG in die von Ihnen erstellte Umgebung hoch. In dieser Anleitung heißt dieser DAG dag_10_tasks_20_seconds_10_scheduled.py

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

Bewerten Sie die Auswirkungen der Verteilung von Aufgaben im Zeitverlauf auf die Planungsabläufe:

  1. Warten Sie, bis die DAG-Ausführungen abgeschlossen sind.

  2. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  3. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  4. Rufen Sie den Tab Logs auf und klicken Sie dann auf Alle Logs &gt; Airflow-Logs &gt; Worker &gt; Im Log-Explorer ansehen

  5. Im Histogramm sehen Sie, dass der dritte DAG mit aktiven Aufgaben und Ausführungen keine Warnungen oder Fehler generiert haben und der die Verteilung der Logs im Vergleich zu den vorherigen Werten gleichmäßiger aussieht.

    Das Histogramm von Airflow-Worker-Logs mit Fehlern und Warnungen
    Es werden keine Fehler oder Warnungen angezeigt, nachdem Aufgaben konsolidiert wurden und
    im Lauf der Zeit verteilt sind.
    Abbildung 11. Histogramm der Airflow-Worker-Logs, nachdem die Aufgaben konsolidiert und im Zeitverlauf verteilt wurden (zum Vergrößern klicken)

Die Aufgaben im Beispiel dag_10_tasks_20_seconds_10_scheduled mit einem eine begrenzte Anzahl aktiver Aufgaben und Ausführungen nicht zu Ressourcenproblemen führten, wurden die Aufgaben gleichmäßig in die Warteschlange gestellt.

Nachdem Sie die beschriebenen Schritte ausgeführt haben, haben Sie die Ressourcennutzung optimiert, indem Sie kleine Aufgaben zusammengeführt und im Zeitverlauf gleichmäßiger verteilt haben.

Umgebungskonfigurationen optimieren

Sie können Ihre Umgebungskonfigurationen so anpassen, dass immer genügend Kapazität in den Airflow-Workern vorhanden ist, um Aufgaben in der Warteschlange auszuführen.

Anzahl der Worker und Nebenläufigkeit von Workern

Sie können die maximal zulässige Anzahl von Workern anpassen. damit Cloud Composer Ihre Umgebung automatisch die festgelegten Limits zu erreichen.

Der Parameter [celery]worker_concurrency definiert die maximale Anzahl von Aufgaben, die ein einzelner Worker aus der Aufgabenwarteschlange abholen kann. Ändern dieses Parameters passt die Anzahl der Aufgaben an, die ein einzelner Worker gleichzeitig ausführen kann. Sie können diese Airflow-Konfigurationsoption ändern, indem Sie überschreiben. Standardmäßig ist die Worker-Nebenläufigkeit auf eine mindestens der folgenden Werte: 32, 12 * worker_CPU, 8 * worker_memory, was bedeutet abhängig von den Worker-Ressourcenlimits. Weitere Informationen finden Sie unter Unter Umgebungen optimieren finden Sie weitere Informationen zur Standardeinstellung. Worker-Gleichzeitigkeitswerte.

Die Anzahl der Worker und die Parallelität der Worker wirken zusammen und die Leistung Ihrer Umgebung hängt stark von beiden Parametern ab. Berücksichtigen Sie bei der Auswahl der richtigen Kombination die folgenden Überlegungen:

  • Mehrere schnelle Aufgaben, die parallel ausgeführt werden. Sie können die Worker-Parallelität erhöhen, wenn sich Aufgaben in der Warteschlange befinden und Ihre Worker einen geringen Prozentsatz ihrer CPUs und des Arbeitsspeichers gleichzeitig verwenden. Unter bestimmten Umständen wird die Warteschlange jedoch möglicherweise nie voll, sodass das Autoscaling nie ausgelöst wird. Wenn die Ausführung kleiner Aufgaben bis zum Zeitpunkt der neuen Worker abgeschlossen ist kann ein vorhandener Worker die verbleibenden Aufgaben übernehmen werden keine Aufgaben für neu erstellte Worker sein.

    In diesen Fällen wird empfohlen, die minimale Anzahl von Workern und die Nebenläufigkeit von Workern zu erhöhen, um eine zu schnelle Skalierung zu vermeiden.

  • Mehrere lange Aufgaben, die parallel ausgeführt werden. Die hohe Nebenläufigkeit der Worker verhindert, dass das System die Anzahl der Worker skaliert. Wenn mehrere Aufgaben ressourcenintensiv sind und viel Zeit in Anspruch nehmen. Nebenläufigkeit kann dazu führen, dass die Warteschlange nie gefüllt und alle Aufgaben nur von einem Worker übernommen werden, was zu Leistungsproblemen führt. In diesen Fällen wird empfohlen, die maximale Anzahl von Workern zu erhöhen und die Nebenläufigkeit von Workern zu verringern.

Die Bedeutung der Parallelität

Airflow-Planer steuern die Planung von DAG-Ausführungen und einzelnen Aufgaben DAGs. Mit der Airflow-Konfigurationsoption [core]parallelism wird gesteuert, Aufgaben, die der Airflow-Planer schließlich in die Warteschlange des Executors in die Warteschlange stellen kann für diese Aufgaben erfüllt sind.

Parallelität ist ein Schutzmechanismus von Airflow, der bestimmt, wie viele Aufgaben können unabhängig von der Anzahl der Worker für jeden Planer zur gleichen Zeit ausgeführt werden. Der Parallelitätswert multipliziert mit der Anzahl der Planer in Ihrem Cluster ist die maximale Anzahl von Aufgabeninstanzen, die Ihre Umgebung in die Warteschlange stellen kann.

Normalerweise wird [core]parallelism als Produkt aus der maximalen Anzahl von Workern und [celery]worker_concurrency festgelegt. Außerdem wird sie vom Pool beeinflusst. Sie können diese Airflow-Konfigurationsoption ändern, indem Sie sie überschreiben. Weitere Informationen zum Anpassen von Airflow Konfigurationen für die Skalierung, siehe Airflow-Konfiguration skalieren

Optimale Umgebungskonfigurationen finden

Wir empfehlen, kleinere Aufgaben in größere Aufgaben zusammenzuführen und Aufgaben gleichmäßiger über die Zeit zu verteilen, um Probleme mit der Planung zu beheben. Sie können nicht nur den DAG-Code optimieren, sondern auch die Umgebungskonfigurationen, um eine ausreichende Kapazität für die gleichzeitige Ausführung mehrerer Aufgaben zu haben.

Angenommen, Sie konsolidieren Aufgaben in Ihrer DAG so weit wie möglich, aber die Begrenzung aktiver Aufgaben, um sie gleichmäßiger über die Zeit zu verteilen, ist für Ihren speziellen Anwendungsfall nicht die bevorzugte Lösung.

Sie können die Parallelität, die Anzahl der Worker und die Worker-Nebenläufigkeit anpassen Parameter, um den DAG dag_10_tasks_20_seconds_10 ohne aktive Begrenzung auszuführen Aufgaben. In diesem Beispiel wird der DAG zehnmal ausgeführt und jede Ausführung enthält 20 kleine Aufgaben. So führen Sie alle gleichzeitig aus:

  • Sie benötigen eine größere Umgebung, da diese die Leistung steuert Parameter der verwalteten Cloud Composer-Infrastruktur Ihrer Umgebung.

  • Airflow-Worker müssen 20 Aufgaben gleichzeitig ausführen können. Sie müssen also die Worker-Parallelität auf 20 festlegen.

  • Die Worker benötigen ausreichend CPU und Arbeitsspeicher, um alle Aufgaben zu bewältigen. Die Worker-Parallelität wird von der CPU und dem Arbeitsspeicher der Worker beeinflusst. Sie benötigen daher mindestens worker_concurrency / 12 CPU und least worker_concurrency / 8 Arbeitsspeicher.

  • Sie müssen die Parallelität erhöhen, um der höheren Worker-Nebenläufigkeit gerecht zu werden. Damit die Worker 20 Aufgaben aus der Warteschlange aufnehmen können, führt der Planer diese 20 Aufgaben zuerst einplanen müssen.

Passen Sie die Umgebungskonfigurationen so an:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Klicken Sie unter Ressourcen > Arbeitslasten auf Bearbeiten.

  5. Geben Sie im Abschnitt Worker im Feld Memory das neue Arbeitsspeicherlimit für Airflow-Worker an. Verwenden Sie in dieser Anleitung 4 GB.

  6. Geben Sie im Feld CPU das neue CPU-Limit für Airflow-Worker an. In dieser Anleitung verwenden Sie zwei vCPUs.

  7. Speichern Sie die Änderungen und warten Sie einige Minuten, bis Ihre Airflow-Worker neu starten.

Überschreiben Sie als Nächstes die Airflow-Konfigurationsoptionen für Parallelität und Worker-Parallelität:

  1. Rufen Sie den Tab Airflow-Konfigurationsüberschreibungen auf.

  2. Klicken Sie auf Bearbeiten und dann auf Airflow-Konfigurationsüberschreibung hinzufügen.

  3. Überschreiben Sie die Parallelitätskonfiguration:

    Bereich Schlüssel Wert
    core parallelism 20
  4. Klicken Sie auf Airflow-Konfigurationsüberschreibung hinzufügen und überschreiben Sie die Konfiguration der Worker-Parallelität:

    Bereich Schlüssel Wert
    celery worker_concurrency 20
  5. Klicken Sie auf Speichern und warten Sie, bis die Umgebung ihre Konfiguration aktualisiert hat.

Lösen Sie denselben Beispiel-DAG noch einmal mit den angepassten Konfigurationen aus:

  1. Rufen Sie in der Airflow-Benutzeroberfläche die Seite DAGs auf.

  2. Suchen Sie den DAG dag_10_tasks_20_seconds_10 und löschen Sie ihn.

    Nachdem der DAG gelöscht wurde, prüft Airflow den DAGs-Ordner im Bucket Ihrer Umgebung und führt den DAG automatisch noch einmal aus.

Sehen Sie sich das Histogramm für Protokolle noch einmal an, nachdem die DAG-Ausführungen abgeschlossen sind. Im Diagramm sehen Sie, dass das Beispiel dag_10_tasks_20_seconds_10 mit mehr konsolidierten Aufgaben bei der Ausführung mit der angepassten Umgebungskonfiguration. Vergleichen Sie die Ergebnisse mit den vorherigen Daten. im Diagramm, wobei das gleiche Beispiel Fehler und Warnungen erzeugte, wird mit der Konfiguration der Standardumgebung ausgeführt.

Das Histogramm von Airflow-Worker-Logs mit Fehlern und Warnungen
        Es werden keine Fehler und keine Warnung angezeigt, nachdem die Umgebungskonfiguration
        angepasst
Abbildung 12. Histogramm der Airflow-Worker-Logs nach Anpassung der Umgebungskonfiguration (zum Vergrößern klicken)

Umgebungskonfigurationen und Airflow-Konfigurationen spielen eine entscheidende Rolle bei Aufgabenplanung. Es ist jedoch nicht möglich, die Konfigurationen über bestimmte Grenzen hinaus.

Wir empfehlen, den DAG-Code zu optimieren, Aufgaben zu konsolidieren und die Planung zu verwenden. für eine optimale Leistung und Effizienz.

Beispiel: DAG-Parsing-Fehler und Latenz aufgrund von komplexen DAG-Code

In diesem Beispiel untersuchen Sie die Parsing-Latenz eines Beispiel-DAG, der ahmt eine Überschreitung von Airflow-Variablen nach.

Neue Airflow-Variable erstellen

Bevor Sie den Beispielcode hochladen, erstellen Sie eine neue Airflow-Variable.

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Klicken Sie auf Verwaltung > Variablen > Neuen Datensatz hinzufügen.

  4. Stellen Sie folgende Werte ein:

    • Schlüssel: example_var
    • Wert: test_airflow_variable

Beispiel-DAG in Ihre Umgebung hochladen

Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG dag_for_loop_airflow_variable.

Dieser DAG enthält eine For-Schleife, die 1.000-mal ausgeführt wird und eine große Anzahl von Airflow-Variablen simuliert. Jede Iteration liest die Variable example_var und eine Aufgabe generiert. Jede Aufgabe enthält einen Befehl, mit dem die Variable Wert.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

Parsing-Probleme diagnostizieren

Die DAG-Analysezeit ist die Zeit, die der Airflow-Planer zum Lesen benötigt und parsen Sie sie. Bevor der Airflow-Planer eine Aufgabe planen kann aus einem DAG entfernt, muss der Planer die DAG-Datei parsen, um die Struktur der den DAG und die definierten Aufgaben.

Wenn das Parsen eines DAG lange dauert, verbraucht dies die Kapazität des Planers. die Leistung von DAG-Ausführungen beeinträchtigen kann.

So überwachen Sie die DAG-Parsingzeit:

  1. Führen Sie den Befehl dags report Airflow-Befehlszeilenbefehl in der gcloud-Befehlszeile aus, um die Parsing-Zeit für alle DAGs zu sehen:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    Ersetzen Sie Folgendes:

    • ENVIRONMENT_NAME: der Name Ihrer Umgebung
    • LOCATION: die Region, in der sich die Umgebung befindet.
  2. Suchen Sie in der Ausgabe des Befehls nach dem Wert für die Dauer des DAG dag_for_loop_airflow_variables. Ein hoher Wert weist möglicherweise darauf hin, dieser DAG nicht optimal implementiert ist. Wenn Sie mehrere DAGs haben, Anhand der Ausgabetabelle können Sie erkennen, welche DAGs lange Parsing-Zeiten haben.

    Beispiel:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. DAG-Parsing-Zeiten in der Google Cloud Console prüfen:

    1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  4. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  5. Rufen Sie den Tab Logs auf und klicken Sie dann auf Alle Logs &gt; DAG-Prozessormanager:

  6. Prüfen Sie dag-processor-manager-Logs, um mögliche Probleme zu ermitteln.

    <ph type="x-smartling-placeholder"></ph> Ein Logeintrag für den Beispiel-DAG zeigt, dass die DAG-Parsing-Zeit 46,3 Sekunden beträgt.
    Abbildung 13. Logs des DAG-Prozessormanagers mit DAG-Parsingzeiten (zum Vergrößern klicken)

Wenn die Gesamtzeit der DAG-Analyse ca. 10 Sekunden überschreitet, werden die Planer möglicherweise mit DAG-Parsing überlastet ist und DAGs nicht effektiv ausführen können.

DAG-Code optimieren

Es ist empfohlen um unnötige „oberste Ebene“ zu vermeiden, Python-Code in Ihren DAGs. DAGs mit vielen Importe, Variablen und Funktionen außerhalb des DAG führen zu einer besseren Analyse für den Airflow-Planer festlegen. Dadurch werden Leistung und Skalierbarkeit Cloud Composer und Airflow. Das Lesen zu vieler Airflow-Variablen führt zu einer langen Parsezeit und einer hohen Datenbanklast. Wenn sich dieser Code in einem DAG befindet werden diese Funktionen bei jedem Planer-Heartbeat ausgeführt, was langsam sein kann.

Mit den Vorlagenfeldern von Airflow können Sie Werte aus Airflow-Variablen und Jinja-Vorlagen in Ihre DAGs einbinden. So wird verhindert, dass Funktionen während der Heartbeats des Schedulers unnötig ausgeführt werden.

Verwenden Sie für eine bessere Implementierung des DAG-Beispiels keine Airflow-Variablen im Python-Code der obersten Ebene von DAGs. Übergeben Sie stattdessen Airflow-Variablen an durch eine Jinja-Vorlage, was das Lesen des Werts der Aufgabenausführung.

Laden die neue Version des Beispiel-DAG in Ihren zu verbessern. In dieser Anleitung heißt dieser DAG dag_for_loop_airflow_variable_optimized

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

Prüfen Sie die neue DAG-Analysezeit:

  1. Warten Sie, bis die DAG-Ausführung abgeschlossen ist.

  2. Führen Sie den Befehl dags report noch einmal aus, um die zum Parsen der Zeit für alle DAGs:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. Prüfen Sie dag-processor-manager-Logs noch einmal und die Dauer des Parsens.

    <ph type="x-smartling-placeholder"></ph> Ein Logeintrag für den Beispiel-DAG zeigt, dass die DAG-Parsing-Zeit 4,21 beträgt.
    Sekunden
    Abbildung 14. DAG-Prozessor-Manager-Logs zeigen DAG Zeit nach der Optimierung des DAG-Codes parsen (zum Vergrößern klicken)

Durch das Ersetzen der Umgebungsvariablen durch Airflow-Vorlagen haben Sie den DAG-Code vereinfacht und die Parsing-Latenz um etwa das Zehnfache reduziert.

Airflow-Umgebungskonfigurationen optimieren

Der Airflow-Planer versucht ständig, neue Aufgaben auszulösen, und parst alle DAGs im Bucket Ihrer Umgebung. Wenn Ihre DAGs lange Parsing-Zeiten haben und der der Planer viele Ressourcen verbraucht, können Sie den Airflow-Planer optimieren. damit der Planer Ressourcen effizienter nutzen kann.

In dieser Anleitung benötigen die DAG-Dateien viel Zeit für das Parsen und das Parsen von Zyklen. und die Kapazität des Planers erschöpft ist. In unserem Beispiel dauert das Parsen des ersten Beispiel-DAG mehr als 5 Sekunden. Sie konfigurieren den Scheduler daher so, dass er seltener ausgeführt wird, um die Ressourcen effizienter zu nutzen. Ich überschreiben die scheduler_heartbeat_sec Airflow-Konfigurationsoption. Diese Konfiguration legt fest, wie oft Planer ausgeführt werden soll (in Sekunden). Der Standardwert ist 5 Sekunden. Sie können diese Airflow-Konfigurationsoption ändern, indem Sie sie überschreiben.

Überschreiben Sie die Airflow-Konfigurationsoption scheduler_heartbeat_sec:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Airflow-Konfigurationsüberschreibungen auf.

  4. Klicken Sie auf Bearbeiten und dann auf Airflow-Konfigurationsüberschreibung hinzufügen.

  5. Überschreiben Sie die Airflow-Konfigurationsoption:

    Bereich Schlüssel Wert
    scheduler scheduler_heartbeat_sec 10
  6. Klicken Sie auf Speichern und warten Sie, bis die Umgebung ihre Konfiguration aktualisiert hat.

Prüfen Sie die Planermesswerte:

  1. Rufen Sie den Tab Monitoring auf und wählen Sie Planer aus.

  2. Klicken Sie im Diagramm Scheduler Heartbeat auf die Schaltfläche More options (Weitere Optionen). (drei Punkte) und klicken Sie dann auf Im Metrics Explorer ansehen.

Das Diagramm für den Scheduler-Heartbeat zeigt, dass der Heartbeat seltener auftritt.
Abbildung 15. Heartbeat-Diagramm des Planers (zum Vergrößern klicken)

Im Diagramm sehen Sie, dass der Planer zweimal seltener ausgeführt wird, nachdem Sie die Standardkonfiguration von 5 Sekunden in 10 Sekunden geändert. Wenn Sie die Häufigkeit der Heartbeats reduzieren, wird verhindert, dass der Scheduler gestartet wird, während der vorherige Parsezyklus noch läuft und die Ressourcenkapazität des Schedulers nicht ausgeschöpft ist.

Dem Planer weitere Ressourcen zuweisen

In Cloud Composer 2 können Sie dem Scheduler mehr CPU- und Arbeitsspeicherressourcen zuweisen. So können Sie die Leistung Ihres Planers steigern und die Parsing-Zeit des DAG zu beschleunigen.

Weisen Sie dem Planer zusätzliche CPU und Arbeitsspeicher zu:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Klicken Sie unter Ressourcen > Arbeitslasten auf Bearbeiten.

  5. Geben Sie im Abschnitt Scheduler im Feld Memory das neue Arbeitsspeicherlimit an. Verwenden Sie in dieser Anleitung 4 GB.

  6. Geben Sie im Feld CPU das neue CPU-Limit an. In dieser Anleitung verwenden Sie zwei vCPUs.

  7. Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Planer neu gestartet wurden.

  8. Rufen Sie den Tab Logs auf und klicken Sie dann auf Alle Logs &gt; DAG-Prozessormanager:

  9. Prüfen Sie die dag-processor-manager-Logs und vergleichen Sie die Parsingdauer für die Beispiel-DAGs:

    Ein Logeintrag für den Beispiel-DAG zeigt, dass die DAG-Parsing-Zeit für den optimierten DAG 1,5 Sekunden beträgt. Für den nicht optimierten DAG beträgt die Parsing-Zeit 28, 71 Sekunden
    Abbildung 16 In den Logs des DAG-Prozessormanagers sind die DAG-Parsingzeiten zu sehen, nachdem dem Scheduler mehr Ressourcen zugewiesen wurden (zum Vergrößern klicken)

Durch die Zuweisung weiterer Ressourcen an den Scheduler haben Sie die Kapazität des Schedulers erhöht und die Parsing-Latenz im Vergleich zu den Standardumgebungskonfigurationen deutlich reduziert. Mit mehr Ressourcen kann der Scheduler die DAGs schneller parsen. Die mit Cloud Composer-Ressourcen verbundenen Kosten steigen jedoch ebenfalls. Außerdem ist es nicht möglich, den Wert über ein bestimmtes Limit hinaus zu arbeiten.

Wir empfehlen, Ressourcen erst zuzuweisen, nachdem die möglichen Optimierungen des DAG-Codes und der Airflow-Konfiguration implementiert wurden.

Bereinigen

Um zu vermeiden, dass Ihrem Google Cloud-Konto die Ressourcen in Rechnung gestellt werden Löschen Sie entweder das Projekt, das die Ressourcen enthält, oder das Projekt behalten und die einzelnen Ressourcen löschen.

Projekt löschen

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Einzelne Ressourcen löschen

Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.

Löschen Sie die Cloud Composer-Umgebung. Außerdem wird dabei der Bucket der Umgebung gelöscht.

Nächste Schritte