Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
In dieser Anleitung erfahren Sie, wie Sie Probleme mit der Aufgabenplanung und dem Parsen diagnostizieren und beheben, die zu Fehlfunktionen des Schedulers, zu Parsefehlern und Latenz sowie zu Aufgabenausfällen führen.
Einführung
Der Airflow-Planer wird hauptsächlich von zwei Faktoren beeinflusst: der Aufgabenplanung und dem DAG-Parsen. Probleme mit einem dieser Faktoren können sich negativ auf die Gesundheit und Leistung der Umgebung auswirken.
Manchmal werden zu viele Aufgaben gleichzeitig geplant. In diesem Fall ist die Warteschlange voll und Aufgaben bleiben im Status „Geplant“ oder werden nach der Aufnahme in die Warteschlange neu geplant. Dies kann zu Aufgabenausfällen und Leistungslatenzen führen.
Ein weiteres häufiges Problem ist die Parselatenz und Fehler, die durch die Komplexität eines DAG-Codes verursacht werden. Ein DAG-Code, der Airflow-Variablen auf der obersten Codeebene enthält, kann beispielsweise zu Parseverzögerungen, Datenbanküberlastungen, Planungsfehlern und DAG-Zeitüberschreitungen führen.
In dieser Anleitung diagnostizieren Sie die Beispiel-DAGs und erfahren, wie Sie Probleme mit der Planung und dem Parsen beheben, die DAG-Planung verbessern und Ihren DAG-Code und Ihre Umgebungskonfigurationen optimieren, um die Leistung zu verbessern.
Lernziele
In diesem Abschnitt sind die Ziele für die Beispiele in dieser Anleitung aufgeführt.
Beispiel: Fehlfunktion des Schedulers und Latenz aufgrund hoher Aufgabenparallelität
Laden Sie den Beispiel-DAG hoch, der mehrmals gleichzeitig ausgeführt wird, und diagnostizieren Sie die Fehlfunktion des Schedulers und Latenzprobleme mit Cloud Monitoring.
Optimieren Sie Ihren DAG-Code, indem Sie die Aufgaben konsolidieren, und bewerten Sie die Auswirkungen auf die Leistung.
Verteilen Sie die Aufgaben gleichmäßiger über die Zeit und bewerten Sie die Auswirkungen auf die Leistung.
Optimieren Sie Ihre Airflow- und Umgebungskonfigurationen und bewerten Sie die Auswirkungen.
Beispiel: DAG-Parsingfehler und Latenz durch komplexen Code
Laden Sie den Beispiel-DAG mit Airflow-Variablen hoch und diagnostizieren Sie Probleme beim Parsen mit Cloud Monitoring.
Optimieren Sie den DAG-Code, indem Sie Airflow-Variablen auf der obersten Codeebene vermeiden, und bewerten Sie die Auswirkungen auf die Parsezeit.
Optimieren Sie Airflow- und Umgebungskonfigurationen und bewerten Sie die Auswirkungen auf die Parsezeit.
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloudverwendet:
Nach Abschluss dieser Anleitung können Sie weitere Kosten durch Löschen von erstellten Ressourcen vermeiden. Weitere Informationen finden Sie unter Bereinigen.
Hinweis
In diesem Abschnitt werden die Schritte beschrieben, die Sie ausführen müssen, bevor Sie mit der Anleitung beginnen.
Projekt erstellen und konfigurieren
Für diese Anleitung benötigen Sie ein Google Cloud Projekt. Konfigurieren Sie das Projekt so:
Wählen Sie in der Google Cloud Console ein Projekt aus oder erstellen Sie ein Projekt.
Die Abrechnung für Ihr Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist
Der Google Cloud Nutzer Ihres Projekts muss die folgenden Rollen haben, um die erforderlichen Ressourcen zu erstellen:
- Administrator für Umgebung und Storage-Objekte
(
roles/composer.environmentAndStorageObjectAdmin
) - Compute-Administrator (
roles/compute.admin
)
- Administrator für Umgebung und Storage-Objekte
(
Die APIs für Ihr Projekt aktivieren
Enable the Cloud Composer API.
Cloud Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer 2-Umgebung.
Beim Erstellen der Umgebung gewähren Sie dem Konto des Composer-Dienst-Agents die Rolle Dienst-Agent-Erweiterung für die Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext
). Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud Projekt auszuführen.
Beispiel: Fehlfunktion des Schedulers und Aufgabenfehler aufgrund von Problemen mit der Aufgabenplanung
In diesem Beispiel wird die Fehlerbehebung bei einem nicht funktionierenden Scheduler und die Latenz gezeigt, die durch eine hohe Aufgabenparallelität verursacht wird.
Beispiel-DAG in die Umgebung hochladen
Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG dag_10_tasks_200_seconds_1
.
Dieser DAG hat 200 Aufgaben. Bei jeder Aufgabe wird eine Sekunde gewartet und „Fertig!“ ausgegeben. Der DAG wird nach dem Hochladen automatisch ausgelöst. Cloud Composer führt diesen DAG zehnmal aus. Alle DAG-Ausführungen erfolgen parallel.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Fehlfunktionen des Schedulers und Aufgabenfehler diagnostizieren
Öffnen Sie nach Abschluss der DAG-Ausführung die Airflow-Benutzeroberfläche und klicken Sie auf den DAG dag_10_tasks_200_seconds_1
. Sie sehen, dass insgesamt 10 DAG-Ausführungen erfolgreich waren und dass bei jeder 200 Aufgaben erfolgreich waren.
Prüfen Sie die Airflow-Aufgabenprotokolle:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Protokolle und dann Alle Protokolle > Airflow-Protokolle > Worker > Im Log-Explorer ansehen auf.
Im Histogramm der Protokolle sind Fehler und Warnungen rot und orange gekennzeichnet:

Der Beispiel-DAG führte zu etwa 130 Warnungen und 60 Fehlern. Klicken Sie auf eine Spalte mit gelben und roten Balken. In den Protokollen werden möglicherweise einige der folgenden Warnungen und Fehler angezeigt:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Diese Protokolle können darauf hinweisen, dass die Ressourcennutzung die Limits überschritten hat und der Worker neu gestartet wurde.
Wenn eine Airflow-Aufgabe zu lange in der Warteschlange verbleibt, kennzeichnet der Scheduler sie als fehlgeschlagen und als „up_for_retry“ (Wiederholung möglich) und plant sie noch einmal für die Ausführung. Eine Möglichkeit, die Symptome dieser Situation zu beobachten, besteht darin, sich das Diagramm mit der Anzahl der Aufgaben in der Warteschlange anzusehen. Wenn die Spitzen in diesem Diagramm nach etwa 10 Minuten nicht abfallen, kommt es wahrscheinlich zu Aufgabenausfällen (ohne Protokolle).
Prüfen Sie die Monitoring-Informationen:
Rufen Sie den Tab Monitoring auf und wählen Sie Übersicht aus.
Sehen Sie sich das Diagramm Airflow-Tasks an.
Abbildung 2. Diagramm zu Airflow-Aufgaben (zum Vergrößern anklicken) Im Diagramm „Airflow-Aufgaben“ ist ein Anstieg der Anzahl der Aufgaben in der Warteschlange zu sehen, der länger als 10 Minuten anhält. Das kann bedeuten, dass in Ihrer Umgebung nicht genügend Ressourcen vorhanden sind, um alle geplanten Aufgaben zu verarbeiten.
Sehen Sie sich das Diagramm Aktive Worker an:
Abbildung 3. Diagramm zu aktiven Workern (zum Vergrößern klicken) Das Diagramm Aktive Worker zeigt, dass der DAG während der Ausführung das Autoscaling auf die maximal zulässige Anzahl von drei Workern ausgelöst hat.
Diagramme zur Ressourcennutzung können darauf hinweisen, dass die Airflow-Worker nicht genügend Kapazität haben, um anstehende Aufgaben auszuführen. Wählen Sie auf dem Tab Monitoring die Option Worker aus und sehen Sie sich die Diagramme Gesamte CPU-Auslastung der Worker und Gesamte Arbeitsspeichernutzung der Worker an.
Abbildung 4: Diagramm zur CPU-Nutzung aller Worker (zum Vergrößern anklicken) Abbildung 5. Diagramm zur Gesamtarbeitsspeichernutzung der Worker (zum Vergrößern anklicken) Die Grafiken zeigen, dass durch die Ausführung zu vieler Aufgaben gleichzeitig das CPU-Limit erreicht wurde. Die Ressourcen wurden über 30 Minuten lang verwendet, was länger ist als die Gesamtdauer von 200 Aufgaben in 10 DAG-Ausführungen, die nacheinander ausgeführt wurden.
Dies sind Anzeichen dafür, dass die Warteschlange voll ist und nicht genügend Ressourcen vorhanden sind, um alle geplanten Aufgaben zu verarbeiten.
Aufgaben zusammenführen
Der aktuelle Code erstellt viele DAGs und Aufgaben, für die nicht genügend Ressourcen vorhanden sind, um alle Aufgaben parallel zu verarbeiten. Das führt dazu, dass die Warteschlange voll ist. Wenn Aufgaben zu lange in der Warteschlange bleiben, werden sie möglicherweise neu geplant oder fehlschlagen. In solchen Situationen sollten Sie eine kleinere Anzahl konsolidierter Aufgaben verwenden.
Im folgenden Beispiel-DAG wird die Anzahl der Aufgaben im ursprünglichen Beispiel von 200 auf 20 geändert und die Wartezeit von 1 auf 10 Sekunden erhöht, um mehr konsolidierte Aufgaben zu simulieren, die dieselbe Arbeitsmenge ausführen.
Laden Sie den folgenden Beispiel-DAG in die von Ihnen erstellte Umgebung hoch. In dieser Anleitung heißt dieser DAG dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Bewerten Sie die Auswirkungen von konsolidierten Aufgaben auf die Planungsabläufe:
Warten Sie, bis die DAG-Ausführungen abgeschlossen sind.
Klicken Sie in der Airflow-Benutzeroberfläche auf der Seite DAGs auf den DAG
dag_10_tasks_20_seconds_10
. Sie sehen zehn DAG-Ausführungen mit jeweils 20 erfolgreichen Aufgaben.Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Protokolle und dann Alle Protokolle > Airflow-Protokolle > Worker > Im Log-Explorer ansehen auf.
Das zweite Beispiel mit mehr konsolidierten Aufgaben führte zu etwa 10 Warnungen und 7 Fehlern. Im Histogramm können Sie die Anzahl der Fehler und Warnungen im ersten Beispiel (frühere Werte) mit dem zweiten Beispiel (spätere Werte) vergleichen.
Abbildung 6: Histogramm der Airflow-Worker-Protokolle nach der Zusammenführung der Aufgaben (zum Vergrößern anklicken) Wenn Sie das erste Beispiel mit dem konsolidierten Beispiel vergleichen, sehen Sie, dass im zweiten Beispiel deutlich weniger Fehler und Warnungen auftreten. Dieselben Fehler im Zusammenhang mit dem Warmstart werden jedoch aufgrund von Ressourcenüberlastung weiterhin in den Protokollen angezeigt.
Wählen Sie auf dem Tab Monitoring die Option Worker aus und sehen Sie sich die Diagramme an.
Wenn Sie das Diagramm für die Airflow-Aufgaben für das erste Beispiel (frühere Werte) mit dem Diagramm für das zweite Beispiel mit mehr konsolidierten Aufgaben vergleichen, sehen Sie, dass der Anstieg der Aufgaben in der Warteschlange bei konsolidierten Aufgaben kürzer war. Es dauerte jedoch fast 10 Minuten, was immer noch suboptimal ist.
Abbildung 7. Diagramm der Airflow-Aufgaben nach der Zusammenführung der Aufgaben (zum Vergrößern klicken) Im Diagramm „Aktive Worker“ sehen Sie, dass im ersten Beispiel (links im Diagramm) Ressourcen über einen viel längeren Zeitraum verwendet wurden als im zweiten Beispiel, obwohl beide Beispiele dieselbe Arbeitsmenge simulieren.
Abbildung 8. Diagramm zu aktiven Workern nach der Zusammenführung von Aufgaben (zum Vergrößern anklicken) Sehen Sie sich die Diagramme zum Ressourcenverbrauch der Worker an. Obwohl der Unterschied zwischen den Ressourcen, die im Beispiel mit mehr konsolidierten Aufgaben verwendet werden, und dem ursprünglichen Beispiel recht groß ist, steigt die CPU-Auslastung immer noch auf 70% des Limits.
Abbildung 9. Diagramm zur CPU-Auslastung der Worker nach der Zusammenführung der Aufgaben (zum Vergrößern anklicken) Abbildung 10. Diagramm zur Arbeitsspeichernutzung der Worker nach der Zusammenführung der Aufgaben (zum Vergrößern anklicken)
Aufgaben gleichmäßiger über die Zeit verteilen
Zu viele gleichzeitige Aufgaben führen dazu, dass die Warteschlange voll ist, was dazu führt, dass Aufgaben in der Warteschlange hängen bleiben oder verschoben werden. In den vorherigen Schritten haben Sie die Anzahl der Aufgaben reduziert, indem Sie diese zusammengeführt haben. Die Ausgabeprotokolle und die Überwachung haben jedoch gezeigt, dass die Anzahl der gleichzeitigen Aufgaben immer noch nicht optimal ist.
Sie können die Anzahl der gleichzeitigen Aufgabenausführungen steuern, indem Sie einen Zeitplan implementieren oder ein Limit für die Anzahl der Aufgaben festlegen, die gleichzeitig ausgeführt werden können.
In dieser Anleitung verteilen Sie Aufgaben gleichmäßiger über die Zeit, indem Sie dem DAG dag_10_tasks_20_seconds_10
Parameter auf DAG-Ebene hinzufügen:
Fügen Sie dem DAG-Kontextmanager das Argument
max_active_runs=1
hinzu. Mit diesem Argument wird festgelegt, dass zu einem bestimmten Zeitpunkt nur eine einzige Instanz eines DAG-Laufs ausgeführt wird.Fügen Sie dem DAG-Kontextmanager das Argument
max_active_tasks=5
hinzu. Mit diesem Argument wird die maximale Anzahl von Aufgabeninstanzen gesteuert, die in jedem DAG gleichzeitig ausgeführt werden können.
Laden Sie den folgenden Beispiel-DAG in die von Ihnen erstellte Umgebung hoch. In dieser Anleitung heißt dieser DAG dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Bewerten Sie die Auswirkungen der Verteilung von Aufgaben im Zeitverlauf auf die Planungsabläufe:
Warten Sie, bis die DAG-Ausführungen abgeschlossen sind.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Protokolle und dann Alle Protokolle > Airflow-Protokolle > Worker > Im Log-Explorer ansehen auf.
Im Histogramm sehen Sie, dass der dritte DAG mit einer begrenzten Anzahl aktiver Aufgaben und Ausführungen keine Warnungen oder Fehler generiert hat. Die Verteilung der Protokolle ist im Vergleich zu den vorherigen Werten gleichmäßiger.
Abbildung 11. Histogramm der Airflow-Worker-Logs, nachdem die Aufgaben konsolidiert und im Zeitverlauf verteilt wurden (zum Vergrößern klicken)
Die Aufgaben im Beispiel dag_10_tasks_20_seconds_10_scheduled
mit einer begrenzten Anzahl aktiver Aufgaben und Ausführungen haben keinen Ressourcendruck verursacht, da die Aufgaben gleichmäßig in die Warteschlange gestellt wurden.
Nachdem Sie die beschriebenen Schritte ausgeführt haben, haben Sie die Ressourcennutzung optimiert, indem Sie kleine Aufgaben zusammengeführt und im Zeitverlauf gleichmäßiger verteilt haben.
Umgebungskonfigurationen optimieren
Sie können die Umgebungskonfigurationen so anpassen, dass die Airflow-Worker immer genügend Kapazität haben, um anstehende Aufgaben auszuführen.
Anzahl der Worker und Nebenläufigkeit von Workern
Sie können die maximale Anzahl von Workern anpassen, damit Cloud Composer Ihre Umgebung automatisch innerhalb der festgelegten Limits skaliert.
Der Parameter [celery]worker_concurrency
definiert die maximale Anzahl von Aufgaben, die ein einzelner Worker aus der Aufgabenwarteschlange abholen kann. Wenn Sie diesen Parameter ändern, wird die Anzahl der Aufgaben angepasst, die ein einzelner Worker gleichzeitig ausführen kann.
Sie können diese Airflow-Konfigurationsoption ändern, indem Sie sie überschreiben. Standardmäßig ist die Worker-Parallelität auf ein Minimum von 32, 12 * worker_CPU, 8 * worker_memory
festgelegt. Das bedeutet, dass sie von den Ressourcenlimits der Worker abhängt. Weitere Informationen zu den Standardwerten für die Worker-Gleichzeitigkeit finden Sie unter Umgebungen optimieren.
Die Anzahl der Worker und die Worker-Parallelität wirken zusammen und die Leistung Ihrer Umgebung hängt stark von beiden Parametern ab. Anhand der folgenden Überlegungen können Sie die richtige Kombination auswählen:
Mehrere schnelle Aufgaben, die parallel ausgeführt werden Sie können die Worker-Parallelität erhöhen, wenn sich Aufgaben in der Warteschlange befinden und Ihre Worker einen geringen Prozentsatz ihrer CPUs und des Arbeitsspeichers gleichzeitig verwenden. Unter bestimmten Umständen wird die Warteschlange jedoch möglicherweise nie voll, sodass das Autoscaling nie ausgelöst wird. Wenn kleine Aufgaben bis zum Zeitpunkt der Bereitschaft der neuen Worker abgeschlossen sind, kann ein vorhandener Worker die verbleibenden Aufgaben übernehmen. Für die neu erstellten Worker gibt es dann keine Aufgaben.
In diesen Fällen wird empfohlen, die minimale Anzahl von Workern und die Nebenläufigkeit von Workern zu erhöhen, um eine zu schnelle Skalierung zu vermeiden.
Mehrere lange Aufgaben, die parallel ausgeführt werden Die hohe Nebenläufigkeit der Worker verhindert, dass das System die Anzahl der Worker skaliert. Wenn mehrere Aufgaben ressourcenintensiv sind und lange dauern, kann eine hohe Worker-Parallelität dazu führen, dass die Warteschlange nie voll ist und alle Aufgaben nur von einem Worker übernommen werden, was zu Leistungsproblemen führt. In diesen Fällen wird empfohlen, die maximale Anzahl von Workern zu erhöhen und die Nebenläufigkeit von Workern zu verringern.
Die Bedeutung der Parallelität
Airflow-Planer steuern die Planung von DAG-Ausführungen und einzelnen Aufgaben von DAGs. Die Airflow-Konfigurationsoption [core]parallelism
steuert, wie viele Aufgaben der Airflow-Planer in die Warteschlange des Executors stellen kann, nachdem alle Abhängigkeiten für diese Aufgaben erfüllt wurden.
Die Parallelität ist ein Schutzmechanismus von Airflow, der bestimmt, wie viele Aufgaben pro Planer gleichzeitig ausgeführt werden können, unabhängig von der Anzahl der Worker. Der Wert für die Parallelität multipliziert mit der Anzahl der Planer in Ihrem Cluster ergibt die maximale Anzahl von Aufgabeninstanzen, die in Ihrer Umgebung in die Warteschlange gestellt werden können.
Normalerweise wird [core]parallelism
als Produkt aus der maximalen Anzahl von Workern und [celery]worker_concurrency
festgelegt. Außerdem wird sie vom Pool beeinflusst.
Sie können diese Airflow-Konfigurationsoption ändern, indem Sie sie überschreiben. Weitere Informationen zum Anpassen von Airflow-Konfigurationen im Zusammenhang mit der Skalierung finden Sie unter Airflow-Konfiguration skalieren.
Optimale Umgebungskonfigurationen finden
Wir empfehlen, kleinere Aufgaben in größere Aufgaben zusammenzuführen und Aufgaben gleichmäßiger über die Zeit zu verteilen, um Probleme mit der Planung zu beheben. Sie können nicht nur den DAG-Code optimieren, sondern auch die Umgebungskonfigurationen, um eine ausreichende Kapazität für die gleichzeitige Ausführung mehrerer Aufgaben zu haben.
Angenommen, Sie konsolidieren Aufgaben in Ihrer DAG so weit wie möglich, aber die Begrenzung aktiver Aufgaben, um sie gleichmäßiger über die Zeit zu verteilen, ist für Ihren speziellen Anwendungsfall nicht die bevorzugte Lösung.
Sie können die Parameter für Parallelität, Anzahl der Worker und Worker-Parallelität anpassen, um den dag_10_tasks_20_seconds_10
-DAG auszuführen, ohne die aktiven Aufgaben einzuschränken. In diesem Beispiel wird der DAG zehnmal ausgeführt und jede Ausführung enthält 20 kleine Aufgaben.
So führen Sie sie alle gleichzeitig aus:
Sie benötigen eine größere Umgebungsgröße, da damit die Leistungsparameter der verwalteten Cloud Composer-Infrastruktur Ihrer Umgebung gesteuert werden.
Airflow-Worker müssen 20 Aufgaben gleichzeitig ausführen können. Sie müssen also die Worker-Parallelität auf 20 festlegen.
Die Worker benötigen ausreichend CPU und Arbeitsspeicher, um alle Aufgaben zu bewältigen. Die Worker-Parallelität wird von der CPU und dem Arbeitsspeicher der Worker beeinflusst. Sie benötigen daher mindestens
worker_concurrency / 12
CPU undleast worker_concurrency / 8
Arbeitsspeicher.Sie müssen die Parallelität erhöhen, um der höheren Worker-Nebenläufigkeit gerecht zu werden. Damit Worker 20 Aufgaben aus der Warteschlange abholen können, müssen diese 20 Aufgaben zuerst vom Scheduler geplant werden.
Passen Sie die Umgebungskonfigurationen so an:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Umgebungskonfiguration auf.
Klicken Sie unter Ressourcen > Arbeitslasten auf Bearbeiten.
Geben Sie im Abschnitt Worker im Feld Memory das neue Arbeitsspeicherlimit für Airflow-Worker an. Verwenden Sie in dieser Anleitung 4 GB.
Geben Sie im Feld CPU das neue CPU-Limit für Airflow-Worker an. In dieser Anleitung verwenden Sie zwei vCPUs.
Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Worker neu gestartet wurden.
Überschreiben Sie als Nächstes die Airflow-Konfigurationsoptionen für Parallelität und Worker-Parallelität:
Rufen Sie den Tab Airflow-Konfigurationsüberschreibungen auf.
Klicken Sie auf Bearbeiten und dann auf Airflow-Konfigurationsüberschreibung hinzufügen.
Überschreiben Sie die Parallelitätskonfiguration:
Bereich Schlüssel Wert core
parallelism
20
Klicken Sie auf Airflow-Konfigurationsüberschreibung hinzufügen und überschreiben Sie die Konfiguration der Worker-Parallelität:
Bereich Schlüssel Wert celery
worker_concurrency
20
Klicken Sie auf Speichern und warten Sie, bis die Konfiguration der Umgebung aktualisiert wurde.
Lösen Sie denselben Beispiel-DAG noch einmal mit den angepassten Konfigurationen aus:
Rufen Sie in der Airflow-Benutzeroberfläche die Seite DAGs auf.
Suchen Sie den DAG
dag_10_tasks_20_seconds_10
und löschen Sie ihn.Nachdem der DAG gelöscht wurde, prüft Airflow den DAGs-Ordner im Bucket Ihrer Umgebung und führt den DAG automatisch noch einmal aus.
Sehen Sie sich das Histogramm für Protokolle noch einmal an, nachdem die DAG-Ausführungen abgeschlossen sind. Im Diagramm sehen Sie, dass im Beispiel dag_10_tasks_20_seconds_10
mit mehr konsolidierten Aufgaben keine Fehler und Warnungen generiert wurden, als es mit der angepassten Umgebungskonfiguration ausgeführt wurde. Vergleichen Sie die Ergebnisse mit den vorherigen Daten im Diagramm, in denen dasselbe Beispiel bei der Ausführung mit der Standardumgebungskonfiguration Fehler und Warnungen generiert hat.

Umgebungskonfigurationen und Airflow-Konfigurationen spielen eine wichtige Rolle bei der Aufgabenplanung. Es ist jedoch nicht möglich, die Konfigurationen über bestimmte Grenzen hinaus zu erhöhen.
Wir empfehlen, den DAG-Code zu optimieren, Aufgaben zu konsolidieren und eine Planung zu verwenden, um Leistung und Effizienz zu optimieren.
Beispiel: DAG-Parsingfehler und Latenz aufgrund komplexen DAG-Codes
In diesem Beispiel untersuchen Sie die Parselatenz eines Beispiel-DAGs, das eine übermäßige Anzahl von Airflow-Variablen simuliert.
Neue Airflow-Variable erstellen
Bevor Sie den Beispielcode hochladen, erstellen Sie eine neue Airflow-Variable.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.
Klicken Sie auf Verwaltung > Variablen > Neuen Datensatz hinzufügen.
Stellen Sie folgende Werte ein:
- Taste:
example_var
- val:
test_airflow_variable
- Taste:
Beispiel-DAG in die Umgebung hochladen
Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG dag_for_loop_airflow_variable
.
Dieser DAG enthält eine For-Schleife,die 1.000-mal ausgeführt wird und eine große Anzahl von Airflow-Variablen simuliert. Bei jeder Iteration wird die Variable example_var
gelesen und eine Aufgabe generiert. Jede Aufgabe enthält einen Befehl, der den Wert der Variablen ausgibt.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Probleme beim Parsen diagnostizieren
Die DAG-Parsing-Zeit ist die Zeit, die der Airflow-Planer benötigt, um eine DAG-Datei zu lesen und zu parsen. Bevor der Airflow-Planer Aufgaben aus einem DAG planen kann, muss er die DAG-Datei parsen, um die Struktur des DAG und die definierten Aufgaben zu ermitteln.
Wenn das Parsen eines DAGs sehr lange dauert, wird dadurch Kapazität des Planers verbraucht und die Leistung von DAG-Ausführungen reduziert.
So überwachen Sie die DAG-Parsingzeit:
Führen Sie den Befehl
dags report
Airflow-Befehlszeilenbefehl in der gcloud-Befehlszeile aus, um die Parsing-Zeit für alle DAGs anzuzeigen:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME
: der Name Ihrer UmgebungLOCATION
: die Region, in der sich die Umgebung befindet.
Suchen Sie in der Ausgabe des Befehls nach dem Wert für die Dauer des DAG
dag_for_loop_airflow_variables
. Ein hoher Wert kann darauf hinweisen, dass diese DAG nicht optimal implementiert ist. Wenn Sie mehrere DAGs haben, können Sie in der Ausgabetabelle ermitteln, welche DAGs eine lange Parsing-Zeit haben.Beispiel:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
DAG-Parsing-Zeiten in der Google Cloud Console prüfen:
- Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Protokolle und dann Alle Protokolle > DAG-Prozessormanager auf.
Prüfen Sie die
dag-processor-manager
-Protokolle und identifizieren Sie mögliche Probleme.Abbildung 13. Logs des DAG-Prozessormanagers mit DAG-Parsingzeiten (zum Vergrößern klicken)
Wenn die Gesamtzeit für das DAG-Parsen etwa 10 Sekunden überschreitet, sind Ihre Planer möglicherweise mit dem DAG-Parsing überlastet und können DAGs nicht effektiv ausführen.
DAG-Code optimieren
Es wird empfohlen, unnötigen Python-Code auf oberster Ebene in Ihren DAGs zu vermeiden. DAGs mit vielen Importen, Variablen und Funktionen außerhalb des DAGs führen zu längeren Parsezeiten für den Airflow-Planer. Dies reduziert die Leistung und Skalierbarkeit von Cloud Composer und Airflow. Das Lesen zu vieler Airflow-Variablen führt zu einer langen Parsezeit und einer hohen Datenbanklast. Wenn sich dieser Code in einer DAG-Datei befindet, werden diese Funktionen bei jedem Scheduler-Heartbeat ausgeführt, was zu Verzögerungen führen kann.
Mit den Vorlagenfeldern von Airflow können Sie Werte aus Airflow-Variablen und Jinja-Vorlagen in Ihre DAGs einbinden. So wird verhindert, dass Funktionen während der Heartbeats des Schedulers unnötig ausgeführt werden.
Verwenden Sie für eine bessere Implementierung des DAG-Beispiels keine Airflow-Variablen im Python-Code der obersten Ebene von DAGs. Übergeben Sie stattdessen Airflow-Variablen über eine Jinja-Vorlage an vorhandene Operatoren. Dadurch wird das Lesen des Werts bis zur Ausführung der Aufgabe verzögert.
Laden Sie die neue Version des Beispiel-DAG in Ihre Umgebung hoch. In dieser Anleitung heißt dieser DAG dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Prüfen Sie die neue DAG-Parsing-Zeit:
Warten Sie, bis die DAG-Ausführung abgeschlossen ist.
Führen Sie den Befehl
dags report
noch einmal aus, um die Parsing-Zeit für alle DAGs anzuzeigen:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Prüfen Sie die
dag-processor-manager
-Logs noch einmal und analysieren Sie die Parsedauer.Abbildung 14. In den Logs des DAG-Prozessormanagers sind die DAG-Parsingzeiten nach der Optimierung des DAG-Codes zu sehen (zum Vergrößern klicken)
Durch das Ersetzen der Umgebungsvariablen durch Airflow-Vorlagen haben Sie den DAG-Code vereinfacht und die Parsing-Latenz um etwa das Zehnfache reduziert.
Airflow-Umgebungskonfigurationen optimieren
Der Airflow-Planer versucht ständig, neue Aufgaben auszulösen, und parst alle DAGs im Bucket Ihrer Umgebung. Wenn das Parsen Ihrer DAGs lange dauert und der Scheduler viele Ressourcen verbraucht, können Sie die Airflow-Schedulerkonfigurationen optimieren, damit der Scheduler die Ressourcen effizienter nutzt.
In dieser Anleitung dauert das Parsen der DAG-Dateien sehr lange und die Parsingzyklen beginnen sich zu überschneiden, wodurch die Kapazität des Planers erschöpft wird. In unserem Beispiel dauert das Parsen des ersten Beispiel-DAG mehr als 5 Sekunden. Sie konfigurieren den Scheduler daher so, dass er seltener ausgeführt wird, um die Ressourcen effizienter zu nutzen. Sie überschreiben die Airflow-Konfigurationsoption scheduler_heartbeat_sec
. Mit dieser Konfiguration wird festgelegt, wie oft der Scheduler ausgeführt werden soll (in Sekunden). Standardmäßig ist der Wert auf 5 Sekunden festgelegt.
Sie können diese Airflow-Konfigurationsoption ändern, indem Sie sie überschreiben.
Überschreiben Sie die Airflow-Konfigurationsoption scheduler_heartbeat_sec
:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Airflow-Konfigurationsüberschreibungen auf.
Klicken Sie auf Bearbeiten und dann auf Airflow-Konfigurationsüberschreibung hinzufügen.
Überschreiben Sie die Airflow-Konfigurationsoption:
Bereich Schlüssel Wert scheduler
scheduler_heartbeat_sec
10
Klicken Sie auf Speichern und warten Sie, bis die Konfiguration der Umgebung aktualisiert wurde.
Prüfen Sie die Planermesswerte:
Rufen Sie den Tab Monitoring auf und wählen Sie Planer aus.
Klicken Sie im Diagramm Scheduler-Herzschlag auf das Dreipunkt-Menü Weitere Optionen und dann auf Im Metrics Explorer ansehen.

In der Grafik sehen Sie, dass der Scheduler nach der Änderung der Standardkonfiguration von 5 Sekunden auf 10 Sekunden doppelt so selten ausgeführt wird. Wenn Sie die Häufigkeit der Heartbeats verringern, wird sichergestellt, dass der Scheduler nicht gestartet wird, während der vorherige Parsezyklus noch läuft und die Ressourcenkapazität des Schedulers nicht ausgeschöpft ist.
Dem Scheduler mehr Ressourcen zuweisen
In Cloud Composer 2 können Sie dem Scheduler mehr CPU- und Arbeitsspeicherressourcen zuweisen. So können Sie die Leistung Ihres Planers steigern und die Parsingzeit für Ihren DAG verkürzen.
Weisen Sie dem Planer zusätzliche CPU und Arbeitsspeicher zu:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Umgebungskonfiguration auf.
Klicken Sie unter Ressourcen > Arbeitslasten auf Bearbeiten.
Geben Sie im Abschnitt Scheduler im Feld Memory das neue Arbeitsspeicherlimit an. Verwenden Sie in dieser Anleitung 4 GB.
Geben Sie im Feld CPU das neue CPU-Limit an. In dieser Anleitung verwenden Sie zwei vCPUs.
Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Planer neu gestartet wurden.
Rufen Sie den Tab Protokolle und dann Alle Protokolle > DAG-Prozessormanager auf.
Prüfen Sie die
dag-processor-manager
-Logs und vergleichen Sie die Parsingdauer für die Beispiel-DAGs:Abbildung 16 In den Logs des DAG-Prozessormanagers sind die DAG-Parsingzeiten zu sehen, nachdem dem Scheduler mehr Ressourcen zugewiesen wurden (zum Vergrößern klicken)
Durch die Zuweisung weiterer Ressourcen an den Scheduler haben Sie die Kapazität des Schedulers erhöht und die Parsing-Latenz im Vergleich zu den Standardumgebungskonfigurationen deutlich reduziert. Mit mehr Ressourcen kann der Scheduler die DAGs schneller parsen. Die mit Cloud Composer-Ressourcen verbundenen Kosten steigen jedoch ebenfalls. Außerdem ist es nicht möglich, die Ressourcen über ein bestimmtes Limit hinaus zu erhöhen.
Wir empfehlen, Ressourcen erst dann zuzuweisen, wenn alle möglichen Optimierungen am DAG-Code und an der Airflow-Konfiguration implementiert wurden.
Bereinigen
Damit Ihrem Google Cloud -Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, können Sie entweder das Projekt löschen, das die Ressourcen enthält, oder das Projekt beibehalten und die einzelnen Ressourcen löschen.
Projekt löschen
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Einzelne Ressourcen löschen
Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.
Löschen Sie die Cloud Composer-Umgebung. Außerdem wird dabei der Bucket der Umgebung gelöscht.