Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
In dieser Anleitung erfahren Sie, wie Sie Probleme bei der Aufgabenplanung und beim Parsen diagnostizieren und beheben, die zu Fehlfunktionen des Schedulers, Parsing-Fehlern, Latenz und Aufgabenfehlern führen.
Einführung
Der Airflow-Planer wird hauptsächlich von zwei Faktoren beeinflusst: der Aufgabenplanung und dem DAG-Parsing. Probleme bei einem dieser Faktoren können sich negativ auf die Umwelt und die Leistung auswirken.
Manchmal werden zu viele Aufgaben gleichzeitig geplant. In diesem Fall ist die Warteschlange voll und Aufgaben bleiben im Status „Geplant“ oder werden nach dem Eintragen in die Warteschlange neu geplant, was zu Aufgabenfehlern und Leistungslatenz führen kann.
Ein weiteres häufiges Problem sind Parsing-Latenz und Fehler, die durch die Komplexität eines DAG-Codes verursacht werden. Wenn DAG-Code beispielsweise Airflow-Variablen auf der obersten Ebene des Codes enthält, kann dies zu Parsing-Verzögerungen, Datenbanküberlastung, Planungsfehlern und DAG-Timeouts führen.
In dieser Anleitung diagnostizieren Sie die Beispiel-DAGs und erfahren, wie Sie Probleme bei der Planung und beim Parsen beheben, die DAG-Planung verbessern und Ihren DAG-Code und Ihre Umgebungskonfigurationen optimieren, um die Leistung zu steigern.
Lernziele
In diesem Abschnitt werden die Ziele für die Beispiele in dieser Anleitung aufgeführt.
Beispiel: Scheduler-Fehler und Latenz aufgrund hoher Task-Nebenläufigkeit
Laden Sie die Beispiel-DAG hoch, die mehrmals gleichzeitig ausgeführt wird, und diagnostizieren Sie die Fehlfunktion des Schedulers und Latenzprobleme mit Cloud Monitoring.
Optimieren Sie Ihren DAG-Code, indem Sie die Aufgaben konsolidieren, und bewerten Sie die Auswirkungen auf die Leistung.
Verteilen Sie die Aufgaben gleichmäßiger über die Zeit und bewerten Sie die Auswirkungen auf die Leistung.
Optimieren Sie Ihre Airflow- und Umgebungskonfigurationen und bewerten Sie die Auswirkungen.
Beispiel: DAG-Parsing-Fehler und Latenz aufgrund von komplexem Code
Laden Sie die Beispiel-DAG mit Airflow-Variablen hoch und beheben Sie Parsing-Probleme mit Cloud Monitoring.
Optimieren Sie den DAG-Code, indem Sie Airflow-Variablen auf der obersten Ebene des Codes vermeiden, und bewerten Sie die Auswirkungen auf die Parsing-Zeit.
Optimieren Sie die Airflow- und Umgebungskonfigurationen und bewerten Sie die Auswirkungen auf die Parsing-Zeit.
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloudverwendet:
Nach Abschluss dieser Anleitung können Sie weitere Kosten durch Löschen von erstellten Ressourcen vermeiden. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
In diesem Abschnitt werden die Aktionen beschrieben, die vor Beginn des Tutorials erforderlich sind.
Projekt erstellen und konfigurieren
Für diese Anleitung benötigen Sie ein Google Cloud-Projekt. Konfigurieren Sie das Projekt so:
Wählen Sie in der Google Cloud -Console ein Projekt aus oder erstellen Sie ein Projekt:
Die Abrechnung für Ihr Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.
Achten Sie darauf, dass der Nutzer Ihres Google Cloud Projekts die folgenden Rollen hat, um die erforderlichen Ressourcen zu erstellen:
- Administrator für Umgebung und Storage-Objekte (
roles/composer.environmentAndStorageObjectAdmin
) - Compute-Administrator (
roles/compute.admin
)
- Administrator für Umgebung und Storage-Objekte (
Die APIs für Ihr Projekt aktivieren
Enable the Cloud Composer API.
Cloud Composer-Umgebung erstellen
Cloud Composer 2-Umgebung erstellen
Beim Erstellen der Umgebung weisen Sie dem Composer-Dienst-Agent-Konto die Rolle Dienst-Agent-Erweiterung für die Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext
) zu. Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud -Projekt auszuführen.
Beispiel: Planer funktioniert nicht richtig und Aufgabe schlägt aufgrund von Problemen bei der Aufgabenplanung fehl
In diesem Beispiel wird gezeigt, wie Sie Fehler im Scheduler und Latenz, die durch eine hohe Anzahl gleichzeitig ausgeführter Aufgaben verursacht wird, beheben können.
Beispiel-DAG in Ihre Umgebung hochladen
Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt der DAG dag_10_tasks_200_seconds_1
.
Dieser DAG hat 200 Aufgaben. Jede Aufgabe wartet eine Sekunde und gibt dann „Complete!“ aus. Der DAG wird nach dem Hochladen automatisch ausgelöst. Cloud Composer führt diesen DAG zehnmal aus. Alle DAG-Ausführungen erfolgen parallel.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Probleme mit dem Scheduler und Aufgabenfehler diagnostizieren
Öffnen Sie nach Abschluss der DAG-Ausführungen die Airflow-UI und klicken Sie auf den dag_10_tasks_200_seconds_1
-DAG. Sie sehen, dass insgesamt 10 DAG-Ausführungen erfolgreich waren und jede 200 erfolgreiche Aufgaben enthält.
Airflow-Aufgabenlogs ansehen:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf und wählen Sie dann Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen aus.
Im Log-Histogramm werden Fehler und Warnungen rot und orange dargestellt:

Die Beispiel-DAG hat zu etwa 130 Warnungen und 60 Fehlern geführt. Klicken Sie auf eine beliebige Spalte mit gelben und roten Balken. In den Logs werden einige der folgenden Warnungen und Fehler angezeigt:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Diese Logs können darauf hinweisen, dass die Ressourcennutzung die Grenzwerte überschritten hat und der Worker neu gestartet wurde.
Wenn eine Airflow-Aufgabe zu lange in der Warteschlange verbleibt, markiert der Scheduler sie als fehlgeschlagen und als „up_for_retry“ (zur Wiederholung anstehend) und plant sie noch einmal für die Ausführung. Eine Möglichkeit, die Symptome dieser Situation zu beobachten, besteht darin, sich das Diagramm mit der Anzahl der Aufgaben in der Warteschlange anzusehen. Wenn die Spitzen in diesem Diagramm nicht innerhalb von etwa 10 Minuten abnehmen, kommt es wahrscheinlich zu Fehlern bei Aufgaben (ohne Logs).
Überprüfen Sie die Monitoring-Informationen:
Rufen Sie den Tab Monitoring auf und wählen Sie Übersicht aus.
Sehen Sie sich das Diagramm Airflow-Tasks an.
Abbildung 2. Diagramm für Airflow-Aufgaben (zum Vergrößern klicken) Im Diagramm für Airflow-Aufgaben ist ein Anstieg der Aufgaben in der Warteschlange zu sehen, der länger als 10 Minuten dauert. Das kann darauf hindeuten, dass in Ihrer Umgebung nicht genügend Ressourcen vorhanden sind, um alle geplanten Aufgaben zu verarbeiten.
Sehen Sie sich das Diagramm Aktive Worker an:
Abbildung 3. Diagramm zu aktiven Workern (zum Vergrößern klicken) Das Diagramm Aktive Worker zeigt, dass durch den DAG während der DAG-Ausführung Autoscaling bis zum maximal zulässigen Limit von drei Workern ausgelöst wurde.
Diagramme zur Ressourcennutzung können auf einen Mangel an Kapazität in Airflow-Workern zum Ausführen von in die Warteschlange gestellten Aufgaben hinweisen. Wählen Sie auf dem Tab Monitoring die Option Worker aus und sehen Sie sich die Diagramme Gesamte CPU-Auslastung der Worker und Gesamte Arbeitsspeichernutzung der Worker an.
Abbildung 4. Diagramm zur gesamten CPU-Nutzung der Worker (zum Vergrößern klicken) Abbildung 5. Diagramm zur gesamten Arbeitsspeichernutzung der Worker (zum Vergrößern klicken) Die Diagramme zeigen, dass die Ausführung zu vieler Aufgaben gleichzeitig dazu geführt hat, dass das CPU-Limit erreicht wurde. Die Ressourcen wurden über 30 Minuten lang verwendet, was sogar länger ist als die Gesamtdauer von 200 Aufgaben in 10 DAG-Ausführungen, die nacheinander ausgeführt werden.
Dies sind die Anzeichen dafür, dass die Warteschlange voll ist und nicht genügend Ressourcen zur Verarbeitung aller geplanten Aufgaben vorhanden sind.
Aufgaben zusammenfassen
Im aktuellen Code werden viele DAGs und Aufgaben erstellt, ohne dass genügend Ressourcen vorhanden sind, um alle Aufgaben parallel zu verarbeiten. Dadurch wird die Warteschlange gefüllt. Wenn Aufgaben zu lange in der Warteschlange verbleiben, kann es sein, dass sie neu geplant werden oder fehlschlagen. In solchen Situationen sollten Sie eine kleinere Anzahl konsolidierter Aufgaben verwenden.
In der folgenden Beispiel-DAG wird die Anzahl der Aufgaben im ursprünglichen Beispiel von 200 auf 20 reduziert und die Wartezeit von 1 auf 10 Sekunden erhöht, um konsolidiertere Aufgaben zu simulieren, die dieselbe Menge an Arbeit verrichten.
Laden Sie den folgenden Beispiel-DAG in die von Ihnen erstellte Umgebung hoch. In dieser Anleitung heißt der DAG dag_10_tasks_20_seconds_10
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Bewerten Sie die Auswirkungen von konsolidierten Aufgaben auf die Planungsprozesse:
Warten Sie, bis die DAG-Ausführungen abgeschlossen sind.
Klicken Sie in der Airflow-UI auf der Seite DAGs auf den
dag_10_tasks_20_seconds_10
-DAG. Sie sehen 10 DAG-Ausführungen mit jeweils 20 erfolgreichen Aufgaben.Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf und wählen Sie dann Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen aus.
Das zweite Beispiel mit konsolidierteren Aufgaben führte zu etwa 10 Warnungen und 7 Fehlern. Im Histogramm können Sie die Anzahl der Fehler und Warnungen im ersten Beispiel (frühere Werte) und im zweiten Beispiel (spätere Werte) vergleichen.
Abbildung 6. Histogramm der Airflow-Worker-Logs nach der Konsolidierung der Aufgaben (zum Vergrößern klicken) Wenn Sie das erste Beispiel mit dem konsolidierteren vergleichen, sehen Sie, dass im zweiten Beispiel deutlich weniger Fehler und Warnungen enthalten sind. Dieselben Fehler im Zusammenhang mit dem Warm Shutdown werden jedoch aufgrund von Ressourcenüberlastung weiterhin in den Logs angezeigt.
Wählen Sie auf dem Tab Monitoring die Option Worker aus und sehen Sie sich die Diagramme an.
Wenn Sie das Diagramm Airflow-Aufgaben für das erste Beispiel (frühere Werte) mit dem Diagramm für das zweite Beispiel mit konsolidierteren Aufgaben vergleichen, sehen Sie, dass der Anstieg der in die Warteschlange gestellten Aufgaben bei konsolidierteren Aufgaben kürzer war. Es dauerte jedoch fast 10 Minuten, was immer noch suboptimal ist.
Abbildung 7. Diagramm der Airflow-Aufgaben nach der Konsolidierung der Aufgaben (zum Vergrößern klicken) Im Diagramm „Aktive Worker“ sehen Sie, dass im ersten Beispiel (auf der linken Seite des Diagramms) Ressourcen über einen viel längeren Zeitraum verwendet wurden als im zweiten, obwohl in beiden Beispielen die gleiche Menge an Arbeit simuliert wird.
Abbildung 8. Diagramm zu aktiven Workern nach der Konsolidierung der Aufgaben (zum Vergrößern klicken) Sehen Sie sich die Diagramme zum Ressourcenverbrauch der Worker an. Obwohl der Unterschied zwischen den Ressourcen, die im Beispiel mit konsolidierteren Aufgaben verwendet werden, und dem ursprünglichen Beispiel recht groß ist, steigt die CPU-Auslastung immer noch auf 70% des Limits.
Abbildung 9. Diagramm zur gesamten CPU-Nutzung der Worker nach der Konsolidierung der Aufgaben (zum Vergrößern klicken) Abbildung 10. Diagramm zur gesamten Arbeitsspeichernutzung der Worker nach der Konsolidierung der Aufgaben (zum Vergrößern klicken)
Aufgaben gleichmäßiger über die Zeit verteilen
Zu viele gleichzeitige Aufgaben führen dazu, dass die Warteschlange voll ist. Das kann dazu führen, dass Aufgaben in der Warteschlange hängen bleiben oder neu geplant werden. In den vorherigen Schritten haben Sie die Anzahl der Aufgaben durch Konsolidierung reduziert. Die Ausgabelogs und das Monitoring haben jedoch gezeigt, dass die Anzahl der gleichzeitig ausgeführten Aufgaben immer noch suboptimal ist.
Sie können die Anzahl der gleichzeitigen Aufgabenläufe steuern, indem Sie einen Zeitplan implementieren oder Grenzwerte für die Anzahl der Aufgaben festlegen, die gleichzeitig ausgeführt werden können.
In dieser Anleitung verteilen Sie Aufgaben gleichmäßiger über die Zeit, indem Sie Parameter auf DAG-Ebene in den dag_10_tasks_20_seconds_10
-DAG einfügen:
Fügen Sie dem DAG-Kontextmanager das Argument
max_active_runs=1
hinzu. Mit diesem Argument wird ein Limit von nur einer Instanz eines DAG-Laufs zu einem bestimmten Zeitpunkt festgelegt.Fügen Sie dem DAG-Kontextmanager das Argument
max_active_tasks=5
hinzu. Mit diesem Argument wird die maximale Anzahl von Aufgabeninstanzen gesteuert, die in jedem DAG gleichzeitig ausgeführt werden können.
Laden Sie den folgenden Beispiel-DAG in die von Ihnen erstellte Umgebung hoch. In dieser Anleitung heißt der DAG dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Auswirkungen der Verteilung von Aufgaben im Zeitverlauf auf die Planungsprozesse bewerten:
Warten Sie, bis die DAG-Ausführungen abgeschlossen sind.
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf und wählen Sie dann Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen aus.
Im Histogramm sehen Sie, dass für den dritten DAG mit einer begrenzten Anzahl aktiver Aufgaben und Ausführungen keine Warnungen oder Fehler generiert wurden und die Verteilung der Logs im Vergleich zu den vorherigen Werten gleichmäßiger aussieht.
Abbildung 11. Histogramm der Airflow-Worker-Logs nach der Konsolidierung und Verteilung der Aufgaben über die Zeit (zum Vergrößern klicken)
Die Aufgaben im Beispiel dag_10_tasks_20_seconds_10_scheduled
, bei dem die Anzahl der aktiven Aufgaben und Ausführungen begrenzt ist, haben keinen Ressourcenengpass verursacht, da die Aufgaben gleichmäßig in die Warteschlange gestellt wurden.
Nachdem Sie die beschriebenen Schritte ausgeführt haben, haben Sie die Ressourcennutzung optimiert, indem Sie kleine Aufgaben zusammengefasst und gleichmäßiger über die Zeit verteilt haben.
Umgebungskonfigurationen optimieren
Sie können die Konfigurationen Ihrer Umgebung anpassen, um sicherzustellen, dass in Airflow-Workern immer Kapazität zum Ausführen von in die Warteschlange gestellten Aufgaben vorhanden ist.
Anzahl der Worker und Worker-Nebenläufigkeit
Sie können die maximale Anzahl von Workern anpassen, damit Cloud Composer Ihre Umgebung automatisch innerhalb der festgelegten Limits skaliert.
Der Parameter [celery]worker_concurrency
definiert die maximale Anzahl von Aufgaben, die ein einzelner Worker aus der Aufgabenwarteschlange abrufen kann. Wenn Sie diesen Parameter ändern, wird die Anzahl der Aufgaben angepasst, die ein einzelner Worker gleichzeitig ausführen kann.
Sie können diese Airflow-Konfigurationsoption überschreiben. Standardmäßig wird die Worker-Nebenläufigkeit basierend auf der Anzahl der leichten, nebenläufigen Aufgabeninstanzen festgelegt, die ein Worker verarbeiten kann. Das bedeutet, dass der Wert von den Ressourcenlimits für Worker abhängt.
Der Wert für die Worker-Nebenläufigkeit hängt nicht von der Anzahl der Worker in Ihrer Umgebung ab.
Die Anzahl der Worker und die Worker-Concurrency arbeiten zusammen und die Leistung Ihrer Umgebung hängt stark von beiden Parametern ab. Die folgenden Überlegungen können Ihnen bei der Auswahl der richtigen Kombination helfen:
Mehrere Schnellaufgaben werden parallel ausgeführt. Sie können die Worker-Parallelität erhöhen, wenn sich Aufgaben in der Warteschlange befinden und Ihre Worker einen geringen Prozentsatz ihrer CPUs und des Arbeitsspeichers gleichzeitig verwenden. Unter bestimmten Umständen füllt sich die Warteschlange jedoch möglicherweise nie, sodass das Autoscaling nie ausgelöst wird. Wenn kleine Aufgaben abgeschlossen sind, bevor die neuen Worker bereit sind, kann ein vorhandener Worker die verbleibenden Aufgaben übernehmen. In diesem Fall gibt es keine Aufgaben für neu erstellte Worker.
In diesen Fällen empfiehlt es sich, die Mindestanzahl von Workern und die Nebenläufigkeit von Workern zu erhöhen, um eine übereifrige Skalierung zu vermeiden.
Mehrere lange Aufgaben werden parallel ausgeführt. Die hohe Nebenläufigkeit der Worker verhindert, dass das System die Anzahl der Worker skaliert. Wenn mehrere Aufgaben ressourcenintensiv sind und lange dauern, kann eine hohe Worker-Nebenläufigkeit dazu führen, dass die Warteschlange nie gefüllt wird und alle Aufgaben von nur einem Worker übernommen werden, was zu Leistungsproblemen führt. In diesen Fällen empfiehlt es sich, die maximale Anzahl von Workern zu erhöhen und die Nebenläufigkeit von Workern zu verringern.
Die Bedeutung der Parallelität
Airflow-Planer steuern die Planung von DAG-Ausführungen und einzelnen Aufgaben von DAGs. Die Airflow-Konfigurationsoption [core]parallelism
steuert, wie viele Aufgaben der Airflow-Planer in die Warteschlange des Executors stellen kann, nachdem alle Abhängigkeiten für diese Aufgaben erfüllt wurden.
Die Parallelität ist ein Schutzmechanismus von Airflow, der bestimmt, wie viele Aufgaben gleichzeitig pro Planer ausgeführt werden können, unabhängig von der Anzahl der Worker. Der Parallelitätswert multipliziert mit der Anzahl der Planer in Ihrem Cluster ergibt die maximale Anzahl von Task-Instanzen, die in Ihrer Umgebung in die Warteschlange gestellt werden können.
Normalerweise wird [core]parallelism
als Produkt aus der maximalen Anzahl von Workern und [celery]worker_concurrency
festgelegt. Er wird auch vom Pool beeinflusst.
Sie können diese Airflow-Konfigurationsoption überschreiben. Weitere Informationen zum Anpassen von Airflow-Konfigurationen in Bezug auf die Skalierung finden Sie unter Airflow-Konfiguration skalieren.
Optimale Umgebungskonfigurationen finden
Die empfohlene Methode zur Behebung von Planungsproblemen besteht darin, kleine Aufgaben in größere Aufgaben zusammenzufassen und Aufgaben gleichmäßiger über die Zeit zu verteilen. Neben der Optimierung von DAG-Code können Sie auch Umgebungskonfigurationen optimieren, um genügend Kapazität für die gleichzeitige Ausführung mehrerer Aufgaben zu haben.
Angenommen, Sie konsolidieren Aufgaben in Ihrem DAG so weit wie möglich, aber die Begrenzung aktiver Aufgaben, um sie gleichmäßiger über die Zeit zu verteilen, ist für Ihren speziellen Anwendungsfall keine bevorzugte Lösung.
Sie können die Parameter für Parallelität, Anzahl der Worker und Worker-Concurrency anpassen, um den dag_10_tasks_20_seconds_10
-DAG auszuführen, ohne aktive Aufgaben einzuschränken. In diesem Beispiel wird der DAG zehnmal ausgeführt und jeder Lauf enthält 20 kleine Aufgaben.
So führen Sie alle gleichzeitig aus:
Sie benötigen eine größere Umgebungsgröße, da sie die Leistungsparameter der verwalteten Cloud Composer-Infrastruktur Ihrer Umgebung steuert.
Airflow-Worker müssen 20 Aufgaben gleichzeitig ausführen können. Daher müssen Sie die Worker-Concurrency auf 20 festlegen.
Die Worker benötigen ausreichend CPU und Arbeitsspeicher, um alle Aufgaben zu verarbeiten. Die Worker-Parallelität wird durch die Worker-CPU und den Worker-Arbeitsspeicher beeinflusst. Daher benötigen Sie mindestens
worker_concurrency / 12
CPU undleast worker_concurrency / 8
Arbeitsspeicher.Sie müssen die Parallelität erhöhen, um der höheren Nebenläufigkeit der Worker gerecht zu werden. Damit Worker 20 Aufgaben aus der Warteschlange abrufen können, muss der Scheduler diese 20 Aufgaben zuerst planen.
Passen Sie die Konfigurationen Ihrer Umgebung so an:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Umgebungskonfiguration auf.
Suchen Sie nach der Konfiguration Ressourcen > Arbeitslasten und klicken Sie auf Bearbeiten.
Geben Sie im Abschnitt Worker im Feld Arbeitsspeicher das neue Arbeitsspeicherlimit für Airflow-Worker an. Verwenden Sie in dieser Anleitung 4 GB.
Geben Sie im Feld CPU das neue CPU-Limit für Airflow-Worker an. Verwenden Sie in dieser Anleitung 2 vCPUs.
Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Worker neu gestartet wurden.
Überschreiben Sie als Nächstes die Airflow-Konfigurationsoptionen für Parallelität und Worker-Concurrency:
Rufen Sie den Tab Airflow-Konfigurationsüberschreibungen auf.
Klicken Sie auf Bearbeiten und dann auf Airflow-Konfigurationsüberschreibung hinzufügen.
Parallelitätskonfiguration überschreiben:
Bereich Schlüssel Wert core
parallelism
20
Klicken Sie auf Airflow-Konfigurationsüberschreibung hinzufügen und überschreiben Sie die Worker-Concurrency-Konfiguration:
Bereich Schlüssel Wert celery
worker_concurrency
20
Klicken Sie auf Speichern und warten Sie, bis die Umgebung ihre Konfiguration aktualisiert hat.
Lösen Sie den gleichen Beispiel-DAG noch einmal mit den angepassten Konfigurationen aus:
Rufen Sie in der Airflow-UI die Seite DAGs auf.
Suchen Sie den
dag_10_tasks_20_seconds_10
-DAG und löschen Sie ihn.Nachdem der DAG gelöscht wurde, prüft Airflow den DAG-Ordner im Bucket Ihrer Umgebung und führt den DAG automatisch noch einmal aus.
Sehen Sie sich das Histogramm für Logs noch einmal an, nachdem die DAG-Ausführungen abgeschlossen sind. Im Diagramm sehen Sie, dass beim dag_10_tasks_20_seconds_10
-Beispiel mit mehr konsolidierten Aufgaben bei der Ausführung mit der angepassten Umgebungskonfiguration keine Fehler und Warnungen generiert wurden. Vergleichen Sie die Ergebnisse mit den früheren Daten im Diagramm, bei denen für dasselbe Beispiel Fehler und Warnungen generiert wurden, als es mit der Standardumgebungskonfiguration ausgeführt wurde.

Umgebungskonfigurationen und Airflow-Konfigurationen spielen eine entscheidende Rolle bei der Planung von Aufgaben. Es ist jedoch nicht möglich, die Konfigurationen über bestimmte Grenzwerte hinaus zu erhöhen.
Wir empfehlen, den DAG-Code zu optimieren, Aufgaben zusammenzufassen und die Planung zu verwenden, um Leistung und Effizienz zu optimieren.
Beispiel: Fehler beim Parsen von DAGs und Latenz aufgrund von komplexem DAG-Code
In diesem Beispiel untersuchen Sie die Parsing-Latenz eines Beispiel-DAG, das eine übermäßige Anzahl von Airflow-Variablen imitiert.
Neue Airflow-Variable erstellen
Bevor Sie den Beispielcode hochladen, erstellen Sie eine neue Airflow-Variable.
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.
Klicken Sie auf Verwaltung > Variablen > Neuen Datensatz hinzufügen.
Legen Sie die folgenden Werte fest:
- Schlüssel:
example_var
- val:
test_airflow_variable
- Schlüssel:
Beispiel-DAG in Ihre Umgebung hochladen
Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt der DAG dag_for_loop_airflow_variable
.
Dieser DAG enthält eine For-Schleife,die 1.000 Mal ausgeführt wird und eine große Anzahl von Airflow-Variablen imitiert. Bei jeder Iteration wird die Variable example_var
gelesen und eine Aufgabe generiert. Jede Aufgabe enthält einen Befehl, der den Wert der Variablen ausgibt.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Parsing-Probleme diagnostizieren
Die DAG-Parsing-Zeit ist die Zeit, die der Airflow-Planer benötigt, um eine DAG-Datei zu lesen und zu parsen. Bevor der Airflow-Planer Aufgaben aus einem DAG planen kann, muss er die DAG-Datei parsen, um die Struktur des DAG und die definierten Aufgaben zu ermitteln.
Wenn das Parsen eines DAG sehr lange dauert, wird dadurch Kapazität des Planers verbraucht und die Leistung von DAG-Ausführungen reduziert.
So überwachen Sie die DAG-Parsing-Zeit:
Führen Sie den
dags report
Airflow-Befehlszeilenbefehl in der gcloud CLI aus, um die Parsing-Zeit für alle Ihre DAGs zu sehen:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME
: der Name Ihrer UmgebungLOCATION
: die Region, in der sich die Umgebung befindet.
Suchen Sie in der Ausgabe des Befehls nach dem Dauerwert für den DAG
dag_for_loop_airflow_variables
. Ein hoher Wert kann darauf hinweisen, dass dieser DAG nicht optimal implementiert ist. Wenn Sie mehrere DAGs haben, können Sie in der Ausgabetabelle ermitteln, welche DAGs eine lange Parsing-Zeit haben.Beispiel:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
So prüfen Sie die DAG-Parsing-Zeiten in der Google Cloud -Konsole:
- Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Klicken Sie auf den Tab Logs und dann auf Alle Logs > DAG-Prozessormanager.
Prüfen Sie die
dag-processor-manager
-Logs und ermitteln Sie mögliche Probleme.Abbildung 13. Die Logs des DAG-Prozessormanagers zeigen die DAG-Parsingzeiten (zum Vergrößern klicken)
Wenn die Gesamtzeit für das DAG-Parsing etwa 10 Sekunden überschreitet, sind Ihre Planer möglicherweise mit dem DAG-Parsing überlastet und können DAGs nicht effektiv ausführen.
DAG-Code optimieren
Es wird empfohlen, unnötigen Python-Code der obersten Ebene in Ihren DAGs zu vermeiden. DAGs mit vielen Importen, Variablen und Funktionen außerhalb des DAG führen zu längeren Parsing-Zeiten für den Airflow-Planer. Dadurch werden Leistung und Skalierbarkeit von Cloud Composer und Airflow beeinträchtigt. Wenn zu viele Airflow-Variablen gelesen werden, führt das zu einer langen Parsing-Zeit und einer hohen Datenbanklast. Wenn sich dieser Code in einer DAG-Datei befindet, werden diese Funktionen bei jedem Scheduler-Heartbeat ausgeführt, was langsam sein kann.
Mit den Vorlagenfeldern von Airflow können Sie Werte aus Airflow-Variablen und Jinja-Vorlagen in Ihre DAGs einfügen. Dadurch wird die unnötige Ausführung von Funktionen während der Scheduler-Heartbeats verhindert.
Um das DAG-Beispiel besser zu implementieren, sollten Sie Airflow-Variablen nicht im Python-Code der obersten Ebene von DAGs verwenden. Übergeben Sie stattdessen Airflow-Variablen über eine Jinja-Vorlage an vorhandene Operatoren. Dadurch wird das Lesen des Werts bis zur Ausführung des Tasks verzögert.
Laden Sie die neue Version des Beispiel-DAG in Ihre Umgebung hoch. In dieser Anleitung heißt der DAG dag_for_loop_airflow_variable_optimized
.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Sehen Sie sich die neue DAG-Parsing-Zeit an:
Warten Sie, bis der DAG-Lauf abgeschlossen ist.
Führen Sie den Befehl
dags report
noch einmal aus, um die Parsing-Zeit für alle Ihre DAGs zu sehen:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
dag-processor-manager
-Logs noch einmal prüfen und die Parsing-Dauer analysieren.Abbildung 14. Die Logs des DAG-Prozessormanagers zeigen die DAG-Parsingzeiten nach der Optimierung des DAG-Codes (zum Vergrößern klicken)
Durch das Ersetzen der Umgebungsvariablen durch Airflow-Vorlagen haben Sie den DAG-Code vereinfacht und die Parsing-Latenz um das Zehnfache reduziert.
Airflow-Umgebungskonfigurationen optimieren
Der Airflow-Planer versucht ständig, neue Aufgaben auszulösen, und parst alle DAGs in Ihrem Umgebungsbucket. Wenn das Parsen Ihrer DAGs lange dauert und der Planer viele Ressourcen verbraucht, können Sie die Airflow-Planerkonfigurationen optimieren, damit der Planer Ressourcen effizienter nutzt.
In dieser Anleitung dauert das Parsen der DAG-Dateien sehr lange und die Parsingzyklen überschneiden sich, wodurch die Kapazität des Planers erschöpft wird. In unserem Beispiel dauert das Parsen des ersten Beispiel-DAGs mehr als 5 Sekunden. Daher konfigurieren Sie den Scheduler so, dass er seltener ausgeführt wird, um Ressourcen effizienter zu nutzen. Sie überschreiben die Airflow-Konfigurationsoption scheduler_heartbeat_sec
. Mit dieser Konfiguration wird festgelegt, wie oft der Scheduler ausgeführt werden soll (in Sekunden). Standardmäßig ist der Wert auf 5 Sekunden festgelegt.
Sie können diese Airflow-Konfigurationsoption überschreiben.
Überschreiben Sie die Airflow-Konfigurationsoption scheduler_heartbeat_sec
:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Airflow-Konfigurationsüberschreibungen auf.
Klicken Sie auf Bearbeiten und dann auf Airflow-Konfigurationsüberschreibung hinzufügen.
Überschreiben Sie die Airflow-Konfigurationsoption:
Bereich Schlüssel Wert scheduler
scheduler_heartbeat_sec
10
Klicken Sie auf Speichern und warten Sie, bis die Umgebung ihre Konfiguration aktualisiert hat.
Planermesswerte prüfen:
Rufen Sie den Tab Monitoring auf und wählen Sie Schedulers (Planer) aus.
Klicken Sie im Diagramm Scheduler-Heartbeat auf die Schaltfläche Weitere Optionen (drei Punkte) und dann auf Im Metrics Explorer ansehen.

Im Diagramm sehen Sie, dass der Scheduler nach der Änderung der Standardkonfiguration von 5 Sekunden auf 10 Sekunden zweimal seltener ausgeführt wird. Wenn Sie die Häufigkeit von Heartbeats verringern, wird der Scheduler nicht gestartet, während der vorherige Parsing-Zyklus noch läuft, und die Ressourcenkapazität des Schedulers wird nicht erschöpft.
Scheduler mehr Ressourcen zuweisen
In Cloud Composer 2 können Sie dem Scheduler mehr CPU- und Arbeitsspeicherressourcen zuweisen. So können Sie die Leistung Ihres Planers steigern und die Parsing-Zeit für Ihren DAG verkürzen.
Weisen Sie dem Planer zusätzliche CPU und Arbeitsspeicher zu:
Rufen Sie in der Google Cloud -Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Umgebungskonfiguration auf.
Suchen Sie nach der Konfiguration Ressourcen > Arbeitslasten und klicken Sie auf Bearbeiten.
Geben Sie im Abschnitt Scheduler im Feld Memory (Arbeitsspeicher) das neue Arbeitsspeicherlimit an. Verwenden Sie in dieser Anleitung 4 GB.
Geben Sie im Feld CPU das neue CPU-Limit an. Verwenden Sie in dieser Anleitung 2 vCPUs.
Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Planer neu gestartet werden.
Klicken Sie auf den Tab Logs und dann auf Alle Logs > DAG-Prozessormanager.
Prüfen Sie die
dag-processor-manager
-Logs und vergleichen Sie die Parsing-Dauer für die Beispiel-DAGs:Abbildung 16. Die Logs des DAG-Prozessormanagers zeigen die DAG-Parsingzeiten, nachdem dem Scheduler mehr Ressourcen zugewiesen wurden (zum Vergrößern klicken).
Durch die Zuweisung von mehr Ressourcen an den Scheduler haben Sie die Kapazität des Schedulers erhöht und die Parsing-Latenz im Vergleich zu den Standardumgebungskonfigurationen deutlich reduziert. Mit mehr Ressourcen kann der Scheduler die DAGs schneller parsen. Die Kosten für Cloud Composer-Ressourcen steigen jedoch auch. Außerdem ist es nicht möglich, die Ressourcen über ein bestimmtes Limit hinaus zu erhöhen.
Wir empfehlen, Ressourcen erst zuzuweisen, nachdem die möglichen Optimierungen des DAG-Codes und der Airflow-Konfiguration implementiert wurden.
Bereinigen
Damit Ihrem Google Cloud -Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, können Sie entweder das Projekt löschen, das die Ressourcen enthält, oder das Projekt beibehalten und die einzelnen Ressourcen löschen.
Projekt löschen
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Einzelne Ressourcen löschen
Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.
Löschen Sie die Cloud Composer-Umgebung. Bei diesem Vorgang löschen Sie auch den Bucket der Umgebung.