Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
In dieser Anleitung erfahren Sie, wie Sie Probleme mit der Aufgabenplanung und dem Parsen diagnostizieren und beheben, die zu Fehlfunktionen des Schedulers, zu Parsefehlern und Latenz sowie zu Aufgabenausfällen führen.
Einleitung
Der Airflow-Planer wird hauptsächlich von zwei Faktoren beeinflusst: Aufgabenplanung und DAG-Parsing. Probleme in einem dieser Faktoren können sich negativ auf das Zustand und Leistung der Umwelt.
Manchmal werden zu viele Aufgaben gleichzeitig geplant. In dieser Situation die Warteschlange ist voll und die Aufgaben verbleiben im Zeitplan "geplant" oder neu geplant, nachdem sie in die Warteschlange gestellt wurde, was zu einem Aufgabenfehler und einer höheren Leistung führen kann Latenz.
Ein weiteres häufiges Problem ist die Parselatenz und Fehler, die durch die Komplexität eines DAG-Codes verursacht werden. Beispiel: Ein DAG-Code mit Airflow-Variablen am Anfang Codeebene kann zu Verzögerungen beim Parsen, Datenbanküberlastung, Planung Fehler und DAG-Zeitüberschreitungen.
In dieser Anleitung diagnostizieren Sie die Beispiel-DAGs. Außerdem erfahren Sie, wie Sie Fehler bei der Planung und beim Parsen beheben, die DAG-Planung verbessern und Ihren DAG-Code und Ihre Umgebungskonfigurationen optimieren, um die Leistung zu verbessern.
Lernziele
In diesem Abschnitt werden Ziele als Beispiele in dieser Anleitung aufgeführt.
Beispiel: Fehlfunktion des Planers und Latenz aufgrund hoher Nebenläufigkeit von Aufgaben
Laden Sie den Beispiel-DAG hoch, der mehrmals gleichzeitig ausgeführt wird, und diagnostizieren Sie die Fehlfunktion des Schedulers und Latenzprobleme mit Cloud Monitoring.
Optimieren Sie Ihren DAG-Code, indem Sie die Aufgaben konsolidieren und den auf die Leistung auswirken.
Aufgaben gleichmäßiger über einen längeren Zeitraum verteilen und die Leistung bewerten Auswirkungen.
Airflow-Konfigurationen und Umgebungskonfigurationen optimieren und um die Auswirkungen zu bewerten.
Beispiel: DAG-Parsingfehler und Latenz durch komplexen Code
Beispiel-DAG mit Airflow-Variablen hochladen und Parsing-Fehler diagnostizieren bei Problemen mit Cloud Monitoring.
Optimieren Sie den DAG-Code, indem Sie Airflow-Variablen auf der obersten Codeebene vermeiden, und bewerten Sie die Auswirkungen auf die Parsezeit.
Airflow-Konfigurationen und Umgebungskonfigurationen zu optimieren die Auswirkungen auf die Parsing-Zeit.
Kosten
In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:
Nach Abschluss dieser Anleitung können Sie weitere Kosten durch Löschen von erstellten Ressourcen vermeiden. Weitere Informationen finden Sie unter Bereinigen.
Hinweise
In diesem Abschnitt werden die Aktionen beschrieben, die vor dem Start der Anleitung erforderlich sind.
Projekt erstellen und konfigurieren
Für diese Anleitung benötigen Sie ein Google Cloud-Projekt. Konfigurieren Sie das Projekt so:
Wählen Sie in der Google Cloud Console ein Projekt aus oder erstellen Sie eines:
Die Abrechnung für Ihr Projekt muss aktiviert sein. Hier erfahren Sie, wie Sie prüfen, ob die Abrechnung für ein Projekt aktiviert ist.
Sorgen Sie dafür, dass Ihr Google Cloud-Projektnutzer die folgenden Rollen hat, um die erforderlichen Ressourcen zu erstellen:
- Administrator für Umgebung und Storage-Objekte
(
roles/composer.environmentAndStorageObjectAdmin
) - Compute-Administrator (
roles/compute.admin
)
- Administrator für Umgebung und Storage-Objekte
(
Die APIs für Ihr Projekt aktivieren
Enable the Cloud Composer API.
Cloud Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer 2-Umgebung.
Um die Umgebung zu schaffen,
Sie gewähren die Dienst-Agent-Erweiterung für die Cloud Composer v2 API.
Rolle (roles/composer.ServiceAgentV2Ext
) für den Composer-Dienst-Agent
Konto. Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud-Projekt auszuführen.
Beispiel: Fehlfunktion des Schedulers und Aufgabenfehler aufgrund von Problemen mit der Aufgabenplanung
In diesem Beispiel wird die Fehlerbehebung bei einem nicht funktionierenden Scheduler und die Latenz gezeigt, die durch eine hohe Aufgabenparallelität verursacht wird.
Beispiel-DAG in die Umgebung hochladen
Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch.
die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG
dag_10_tasks_200_seconds_1
Dieser DAG hat 200 Aufgaben. Jede Aufgabe wartet 1 Sekunde und gibt „Abgeschlossen!“ aus. Der DAG wird nach dem Hochladen automatisch ausgelöst. Cloud Composer führt diesen DAG zehnmal aus. Alle DAG-Ausführungen erfolgen parallel.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 200
seconds = 1
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2023, 11, 22, 20, 0),
end_date=datetime(2023, 11, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Fehlfunktionen des Schedulers und Aufgabenfehler diagnostizieren
Öffnen Sie nach Abschluss der DAG-Ausführung die Airflow-UI und klicken Sie auf
dag_10_tasks_200_seconds_1
-DAG. Sie sehen, dass insgesamt 10 DAG-Ausführungen
und jede hat 200 erfolgreich abgeschlossene Aufgaben.
Prüfen Sie die Airflow-Aufgabenlogs:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf und klicken Sie dann auf Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen
Im Log-Histogramm sind die rot markierten Fehler und Warnungen zu sehen. und orangefarbene Farben:
Der Beispiel-DAG führte zu etwa 130 Warnungen und 60 Fehlern. Klicken Sie auf eine Spalte mit gelben und roten Balken. Einige der folgenden Elemente werden angezeigt: Warnungen und Fehlermeldungen:
State of this instance has been externally set to success. Terminating
instance.
Received SIGTERM. Terminating subprocesses.
worker: Warm shutdown (MainProcess).
Diese Logs können darauf hinweisen, dass die Ressourcennutzung die Limits überschritten hat Worker neu gestartet.
Wenn eine Airflow-Aufgabe zu lange in der Warteschlange verbleibt, kennzeichnet der Scheduler sie als fehlgeschlagen und als „up_for_retry“ (Wiederholung möglich) und plant sie noch einmal für die Ausführung. Eine Möglichkeit zur Beobachtung der Symptome besteht darin, Diagramm mit der Anzahl der Aufgaben in der Warteschlange und wenn die Spitzen in diesem Diagramm nicht nach etwa 10 Minuten fallen, wird es bei Aufgaben zu Fehlern kommen (ohne Logs).
Prüfen Sie die Monitoring-Informationen:
Wechseln Sie zum Tab Monitoring und wählen Sie Übersicht aus.
Sehen Sie sich das Diagramm mit den Airflow-Aufgaben an.
In der Airflow-Tasks-Grafik gibt es eine Spitze bei den Aufgaben in der Warteschlange, die länger als 10 Minuten dauert, was bedeuten kann, dass nicht genügend Ressourcen in Ihrer Umgebung, um alle geplanten Aufgaben zu verarbeiten.
Sehen Sie sich das Diagramm Aktive Worker an:
Das Diagramm Aktive Worker zeigt an, dass der DAG das Autoscaling ausgelöst hat. während der DAG-Ausführung auf das maximal zulässige Limit von drei Workern begrenzt.
Diagramme zur Ressourcennutzung können darauf hinweisen, dass die Airflow-Worker nicht genügend Kapazität haben, um anstehende Aufgaben auszuführen. Wählen Sie auf dem Tab Monitoring die Option Workers und Sehen Sie sich die Diagramme Gesamte Worker-CPU-Nutzung und Gesamte Worker-Arbeitsspeichernutzung an.
Die Diagramme zeigen, dass zu viele Aufgaben gleichzeitig ausgeführt werden. das CPU-Limit erreicht hat. Die Ressourcen wurden bereits seit über 30 Jahren Minuten, was sogar länger ist als die Gesamtdauer von 200 Aufgaben in 10 Minuten DAG-Ausführungen werden nacheinander ausgeführt.
Dies sind die Indikatoren für das Auffüllen der Warteschlange und das Fehlen von Ressourcen. um alle geplanten Aufgaben zu verarbeiten.
Aufgaben zusammenführen
Mit dem aktuellen Code werden viele DAGs und Aufgaben ohne ausreichende Ressourcen erstellt, um alle Aufgaben parallel verarbeiten, was dazu führt, dass die Warteschlange gefüllt wird. Wenn Aufgaben zu lange in der Warteschlange aufbewahrt werden, werden Aufgaben möglicherweise verschoben oder fehlschlagen. In solchen Fällen sollten Sie eine kleinere Anzahl von konsolidierten Aufgaben.
Der folgende Beispiel-DAG ändert die Anzahl der Aufgaben im ersten Beispiel von 200 auf 20 und erhöhen die Wartezeit von 1 auf 10 Sekunden, um mehr zu imitieren. konsolidierten Aufgaben mit der gleichen Arbeit.
Laden Sie den folgenden Beispiel-DAG in die von Ihnen erstellte Umgebung hoch. In dieser Anleitung heißt dieser DAG
dag_10_tasks_20_seconds_10
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Bewerten Sie die Auswirkungen konsolidierter Aufgaben auf die Planungsprozesse:
Warten Sie, bis die DAG-Ausführungen abgeschlossen sind.
Klicken Sie in der Airflow-Benutzeroberfläche auf der Seite DAGs auf den DAG
dag_10_tasks_20_seconds_10
. Sie sehen zehn DAG-Ausführungen mit jeweils 20 Aufgaben, die erfolgreich waren.Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf und klicken Sie dann auf Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen
Das zweite Beispiel mit stärker konsolidierten Aufgaben führte zu etwa 10 Warnungen und 7 Fehler Im Histogramm können Sie die Anzahl der Fehler und Warnungen im ersten Beispiel (frühere Werte) mit dem zweiten Beispiel (spätere Werte) vergleichen.
Wenn Sie das erste Beispiel mit dem konsolidierten Beispiel vergleichen, sehen Sie, dass im zweiten Beispiel deutlich weniger Fehler und Warnungen auftreten. Aufgrund von Ressourcenüberlastung werden jedoch weiterhin dieselben Fehler im Zusammenhang mit dem Warmstart in den Protokollen angezeigt.
Wählen Sie auf dem Tab Monitoring die Option Worker aus und sehen Sie sich die Diagramme an.
Wenn Sie das Diagramm Airflow-Aufgaben mit dem ersten Beispiel (früher Werte) mit dem Diagramm für das zweite Beispiel mit stärker konsolidierten Aufgaben sehen Sie, dass der Anstieg der Aufgaben in der Warteschlange wenn die Aufgaben stärker konsolidiert wurden. Sie dauerte jedoch fast 10 Minuten, was immer noch nicht optimal ist.
Im Diagramm mit den aktiven Workern sehen Sie das erste Beispiel (links Seite des Diagramms) viel länger Ressourcen verwendet haben. als das zweite, obwohl beide Beispiele die gleiche Menge an Informationen arbeiten.
Sehen Sie sich die Diagramme zur Worker-Ressourcennutzung an. Auch wenn der Unterschied zwischen den im Beispiel verwendeten Ressourcen mit konsolidierteren Aufgaben und Das erste Beispiel ist ziemlich signifikant, die CPU-Auslastung steigt immer noch stark an 70% des Limits erreichen.
Aufgaben gleichmäßiger über einen längeren Zeitraum verteilen
Zu viele gleichzeitige Aufgaben führen dazu, dass die Warteschlange voll ist. Das wiederum kann dazu führen, dass Aufgaben in der Warteschlange hängen bleiben oder verschoben werden. In den vorherigen Schritten haben Sie die Anzahl der Aufgaben reduziert, indem Sie diese zusammengeführt haben. Die Ausgabeprotokolle und die Überwachung haben jedoch gezeigt, dass die Anzahl der gleichzeitigen Aufgaben immer noch nicht optimal ist.
Sie können die Anzahl der gleichzeitigen Aufgabenausführungen steuern, indem Sie einen Zeitplan implementieren oder Limits dafür festlegen, wie viele Aufgaben gleichzeitig ausgeführt werden können.
In dieser Anleitung verteilen Sie Aufgaben gleichmäßiger über die Zeit, indem Sie dem DAG dag_10_tasks_20_seconds_10
Parameter auf DAG-Ebene hinzufügen:
Fügen Sie dem DAG-Kontextmanager das Argument
max_active_runs=1
hinzu. Dieses Argument legt ein Limit von nur einer Instanz einer DAG-Ausführung fest, die in einem bestimmten Moment ausgeführt wird.Fügen Sie dem DAG-Kontextmanager das Argument
max_active_tasks=5
hinzu. Mit diesem Argument wird die maximale Anzahl von Aufgabeninstanzen gesteuert, die in jedem DAG gleichzeitig ausgeführt werden können.
Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch.
die Sie erstellt haben. In dieser Anleitung heißt dieser DAG dag_10_tasks_20_seconds_10_scheduled.py
.
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task
tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5
with DAG(
dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
start_date=datetime(2021, 12, 22, 20, 0),
end_date=datetime(2021, 12, 22, 20, 49),
schedule_interval=timedelta(minutes=minutes),
max_active_runs=active_runs,
max_active_tasks=active_tasks,
catchup=True,
) as dag:
@task
def create_subtasks(seconds: int) -> None:
time.sleep(seconds)
for i in range(tasks_amount):
create_subtasks(seconds)
Bewerten Sie die Auswirkungen der Verteilung von Aufgaben über einen längeren Zeitraum auf die Planungsprozesse:
Warten Sie, bis die DAG-Ausführungen abgeschlossen sind.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Logs auf und klicken Sie dann auf Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen
Im Histogramm sehen Sie, dass der dritte DAG mit aktiven Aufgaben und Ausführungen keine Warnungen oder Fehler generiert haben und der die Verteilung der Logs im Vergleich zu den vorherigen Werten gleichmäßiger aussieht.
Die Aufgaben im Beispiel dag_10_tasks_20_seconds_10_scheduled
mit einem
eine begrenzte Anzahl aktiver Aufgaben
und Ausführungen nicht zu Ressourcenproblemen führten,
wurden die Aufgaben gleichmäßig
in die Warteschlange gestellt.
Nachdem Sie die beschriebenen Schritte ausgeführt haben, haben Sie die Ressourcennutzung optimiert, indem Sie kleine Aufgaben zusammengeführt und im Zeitverlauf gleichmäßiger verteilt haben.
Umgebungskonfigurationen optimieren
Sie können Ihre Umgebungskonfigurationen so anpassen, dass immer genügend Kapazität in den Airflow-Workern vorhanden ist, um Aufgaben in der Warteschlange auszuführen.
Anzahl der Worker und Worker-Nebenläufigkeit
Sie können die maximal zulässige Anzahl von Workern anpassen. damit Cloud Composer Ihre Umgebung automatisch die festgelegten Limits zu erreichen.
Der Parameter [celery]worker_concurrency
definiert die maximale Anzahl von Aufgaben.
die ein einzelner Worker
aus der Aufgabenwarteschlange übernehmen kann. Ändern dieses Parameters
passt die Anzahl der Aufgaben an, die ein einzelner Worker gleichzeitig ausführen kann.
Sie können diese Airflow-Konfigurationsoption ändern, indem Sie
überschreiben. Standardmäßig ist die Worker-Nebenläufigkeit auf eine
mindestens der folgenden Werte: 32, 12 * worker_CPU, 8 * worker_memory
, was bedeutet
abhängig von den Worker-Ressourcenlimits. Weitere Informationen zu den Standardwerten für die Worker-Gleichzeitigkeit finden Sie unter Umgebungen optimieren.
Die Anzahl der Worker und die Worker-Parallelität wirken zusammen und die Leistung Ihrer Umgebung hängt stark von beiden Parametern ab. Berücksichtigen Sie bei der Auswahl der richtigen Kombination die folgenden Überlegungen:
Mehrere schnelle Aufgaben, die parallel ausgeführt werden. Sie können die Anzahl der Worker erhöhen, Nebenläufigkeit, wenn sich Aufgaben in der Warteschlange befinden und Ihre Worker einen geringen Prozentsatz der CPUs und des Arbeitsspeichers. Unter kann die Warteschlange unter Umständen nie gefüllt werden. Dadurch wird das Autoscaling nie auslösen. Wenn die Ausführung kleiner Aufgaben bis zum Zeitpunkt der neuen Worker abgeschlossen ist kann ein vorhandener Worker die verbleibenden Aufgaben übernehmen werden keine Aufgaben für neu erstellte Worker sein.
In diesen Fällen wird empfohlen, die minimale Anzahl von Workern und die Nebenläufigkeit von Workern zu erhöhen, um eine zu schnelle Skalierung zu vermeiden.
Mehrere lange Aufgaben, die parallel ausgeführt werden. Die hohe Worker-Nebenläufigkeit verhindert, dass das System die Anzahl der Worker skaliert. Wenn mehrere Aufgaben ressourcenintensiv sind und lange dauern, kann eine hohe Worker-Parallelität dazu führen, dass die Warteschlange nie voll ist und alle Aufgaben nur von einem Worker übernommen werden, was zu Leistungsproblemen führt. In diesen ist es empfehlenswert, Maximale Anzahl von Workern erhöhen und Nebenläufigkeit von Workern verringern
Die Bedeutung der Parallelität
Airflow-Planer steuern die Planung von DAG-Ausführungen und einzelnen Aufgaben von DAGs. Mit der Airflow-Konfigurationsoption [core]parallelism
wird gesteuert,
Aufgaben, die der Airflow-Planer schließlich in die Warteschlange des Executors in die Warteschlange stellen kann
für diese Aufgaben erfüllt sind.
Parallelität ist ein Schutzmechanismus von Airflow, der bestimmt, wie viele Aufgaben können unabhängig von der Anzahl der Worker für jeden Planer zur gleichen Zeit ausgeführt werden. Der Parallelitätswert multipliziert mit der Anzahl der Planer in Ihrem Cluster ist die maximale Anzahl von Aufgabeninstanzen, die Ihre Umgebung in die Warteschlange stellen kann.
Normalerweise wird [core]parallelism
als Produkt einer maximalen Anzahl von Workern festgelegt
und [celery]worker_concurrency
. Außerdem wird sie vom Pool beeinflusst.
Sie können diese Airflow-Konfigurationsoption ändern, indem Sie sie überschreiben. Weitere Informationen zum Anpassen von Airflow
Konfigurationen für die Skalierung, siehe
Airflow-Konfiguration skalieren
Optimale Umgebungskonfigurationen finden
Wir empfehlen, kleinere Aufgaben in größere Aufgaben zusammenzuführen und Aufgaben gleichmäßiger über die Zeit zu verteilen, um Probleme mit der Planung zu beheben. Zusätzlich zu können Sie auch Umgebungskonfigurationen optimieren, um eine Ausreichend Kapazität zum gleichzeitigen Ausführen mehrerer Aufgaben.
Beispiel: Sie haben Aufgaben in Ihrem DAG konsolidiert. so viel wie möglich zu tun, aber aktive Aufgaben zu beschränken, um sie gleichmäßiger Zeit ist keine bevorzugte Lösung für Ihren speziellen Anwendungsfall.
Sie können die Parallelität, die Anzahl der Worker und die Worker-Nebenläufigkeit anpassen
Parameter, um den DAG dag_10_tasks_20_seconds_10
ohne aktive Begrenzung auszuführen
Aufgaben. In diesem Beispiel wird der DAG zehnmal ausgeführt und jede Ausführung enthält 20 kleine Aufgaben.
Wenn Sie alle gleichzeitig ausführen möchten:
Sie benötigen eine größere Umgebung, da diese die Leistung steuert Parameter der verwalteten Cloud Composer-Infrastruktur Ihrer Umgebung.
Airflow-Worker müssen in der Lage sein, 20 Aufgaben gleichzeitig auszuführen. müssen Sie die Worker-Parallelität auf 20 festlegen.
Die Worker benötigen ausreichend CPU und Arbeitsspeicher, um alle Aufgaben zu bewältigen. Die Worker-Parallelität wird von der CPU und dem Arbeitsspeicher der Worker beeinflusst. Sie benötigen daher mindestens
worker_concurrency / 12
CPU undleast worker_concurrency / 8
Arbeitsspeicher.Sie müssen die Parallelität erhöhen, um der höheren Worker-Nebenläufigkeit zu entsprechen. Damit die Worker 20 Aufgaben aus der Warteschlange aufnehmen können, führt der Planer diese 20 Aufgaben zuerst einplanen müssen.
Passen Sie die Umgebungskonfigurationen so an:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Umgebungskonfiguration auf.
Suchen Sie die Konfiguration unter Ressourcen > Arbeitslasten und Klicken Sie auf Bearbeiten.
Geben Sie im Abschnitt Worker im Feld Memory den neuen Arbeitsspeicher an. Limit für Airflow-Worker. Verwenden Sie in dieser Anleitung 4 GB.
Geben Sie im Feld CPU das neue CPU-Limit für Airflow-Worker an. In dieser Anleitung verwenden Sie zwei vCPUs.
Speichern Sie die Änderungen und warten Sie einige Minuten, bis Ihre Airflow-Worker neu starten.
Als Nächstes überschreiben Sie die Airflow-Konfigurationsoptionen für Parallelität und Worker-Nebenläufigkeit:
Rufen Sie den Tab Airflow-Konfigurationsüberschreibungen auf.
Klicken Sie auf Bearbeiten und dann auf Airflow-Konfigurationsüberschreibung hinzufügen.
Überschreiben Sie die Parallelitätskonfiguration:
Bereich Schlüssel Wert core
parallelism
20
Klicken Sie auf Airflow-Konfigurationsüberschreibung hinzufügen und überschreiben Sie die Konfiguration der Worker-Parallelität:
Bereich Schlüssel Wert celery
worker_concurrency
20
Klicken Sie auf Speichern und warten Sie, bis die Konfiguration der Umgebung aktualisiert wurde.
Lösen Sie denselben Beispiel-DAG noch einmal mit den angepassten Konfigurationen aus:
Rufen Sie in der Airflow-Benutzeroberfläche die Seite DAGs auf.
Suchen Sie den DAG
dag_10_tasks_20_seconds_10
und löschen Sie ihn.Nachdem der DAG gelöscht wurde, prüft Airflow den DAGs-Ordner in Ihrem und führt den DAG automatisch noch einmal aus.
Sehen Sie sich nach Abschluss der DAG-Ausführungen das Log-Histogramm noch einmal an. Im Diagramm
sehen Sie, dass das Beispiel dag_10_tasks_20_seconds_10
mit mehr
konsolidierten Aufgaben bei der Ausführung mit
der angepassten Umgebungskonfiguration. Vergleichen Sie die Ergebnisse mit den vorherigen Daten.
im Diagramm, wobei das gleiche Beispiel Fehler und Warnungen erzeugte,
wird mit der Konfiguration der Standardumgebung ausgeführt.
Umgebungskonfigurationen und Airflow-Konfigurationen spielen eine entscheidende Rolle bei Aufgabenplanung. Es ist jedoch nicht möglich, die Konfigurationen über bestimmte Grenzen hinaus.
Wir empfehlen, den DAG-Code zu optimieren, Aufgaben zu konsolidieren und eine Planung zu verwenden, um Leistung und Effizienz zu optimieren.
Beispiel: DAG-Parsingfehler und Latenz aufgrund komplexen DAG-Codes
In diesem Beispiel untersuchen Sie die Parsing-Latenz eines Beispiel-DAG, der ahmt eine Überschreitung von Airflow-Variablen nach.
Neue Airflow-Variable erstellen
Erstellen Sie vor dem Hochladen des Beispielcodes eine neue Airflow-Variable.
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.
Klicken Sie auf Verwaltung > Variablen > Neuen Datensatz hinzufügen.
Stellen Sie folgende Werte ein:
- Schlüssel:
example_var
- val:
test_airflow_variable
- Schlüssel:
Beispiel-DAG in die Umgebung hochladen
Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch.
die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG
dag_for_loop_airflow_variable
Dieser DAG enthält eine For-Schleife, die 1.000-mal ausgeführt wird und eine große Anzahl von Airflow-Variablen simuliert. Bei jeder Iteration wird die Variable example_var
gelesen und eine Aufgabe generiert. Jede Aufgabe enthält einen Befehl, der den Wert der Variablen ausgibt.
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable',
default_args=default_args,
catchup=False,
schedule_interval="@daily"
)
for i in range(1000):
a = Variable.get('example_var', 'N/A')
task = BashOperator(
task_id=f'task_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': a}
)
Parsing-Probleme diagnostizieren
Die DAG-Analysezeit ist die Zeit, die der Airflow-Planer zum Lesen benötigt und parsen Sie sie. Bevor der Airflow-Planer Aufgaben aus einem DAG planen kann, muss er die DAG-Datei parsen, um die Struktur des DAG und die definierten Aufgaben zu ermitteln.
Wenn das Parsen eines DAG lange dauert, verbraucht dies die Kapazität des Planers. die Leistung von DAG-Ausführungen beeinträchtigen kann.
So überwachen Sie die DAG-Analysezeit:
Führen Sie den Befehl
dags report
Airflow-Befehlszeilenbefehl in der gcloud-Befehlszeile aus, um die Parsing-Zeit für alle DAGs anzuzeigen:gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION \ dags report
Ersetzen Sie Folgendes:
ENVIRONMENT_NAME
: der Name Ihrer UmgebungLOCATION
: die Region, in der sich die Umgebung befindet.
Suchen Sie in der Ausgabe des Befehls nach dem Wert für die Dauer des DAG
dag_for_loop_airflow_variables
. Ein hoher Wert kann darauf hinweisen, dass diese DAG nicht optimal implementiert ist. Wenn Sie mehrere DAGs haben, Anhand der Ausgabetabelle können Sie erkennen, welche DAGs lange Parsing-Zeiten haben.Beispiel:
file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:14.773594 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /airflow_monitoring | 0:00:00.003035 | 1 | 1 | airflow_monitoring .py
DAG-Parsing-Zeiten in der Google Cloud Console prüfen:
- Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Protokolle und dann Alle Protokolle > DAG-Prozessormanager auf.
Prüfen Sie die
dag-processor-manager
-Protokolle und identifizieren Sie mögliche Probleme.
Wenn die Gesamtzeit der DAG-Analyse ca. 10 Sekunden überschreitet, werden die Planer möglicherweise mit DAG-Parsing überlastet ist und DAGs nicht effektiv ausführen können.
DAG-Code optimieren
Es ist empfohlen um unnötige „oberste Ebene“ zu vermeiden, Python-Code in Ihren DAGs. DAGs mit vielen Importen, Variablen und Funktionen außerhalb des DAGs führen zu längeren Parsezeiten für den Airflow-Planer. Dadurch werden Leistung und Skalierbarkeit Cloud Composer und Airflow. Übermäßige Anzahl gelesener Airflow-Variablen führt zu langer Parsing-Zeit und hoher Datenbanklast. Wenn sich dieser Code in einer DAG-Datei befindet, werden diese Funktionen bei jedem Scheduler-Heartbeat ausgeführt, was zu Verzögerungen führen kann.
Mit den Vorlagenfeldern von Airflow können Sie Werte aus Airflow-Variablen und Jinja-Vorlagen in Ihre DAGs einbinden. So vermeiden Sie unnötige während Planer-Heartbeats ausgeführt werden.
Für eine bessere Implementierung des DAG-Beispiels sollten Sie auf die Verwendung von Airflow-Variablen den übergeordneten Python-Code von DAGs. Übergeben Sie stattdessen Airflow-Variablen an durch eine Jinja-Vorlage, was das Lesen des Werts der Aufgabenausführung.
Laden die neue Version des Beispiel-DAG in Ihren
zu verbessern. In dieser Anleitung heißt dieser DAG
dag_for_loop_airflow_variable_optimized
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 2, 17),
'retries': 0
}
dag = DAG(
'dag_for_loop_airflow_variable_optimized',
default_args=default_args,
catchup=False,
schedule_interval='@daily'
)
for i in range(1000):
task = BashOperator(
task_id=f'bash_use_variable_good_{i}',
bash_command='echo variable foo=${foo_env}',
dag=dag,
env={'foo_env': '{{ var.value.get("example_var") }}'},
)
Prüfen Sie die neue DAG-Analysezeit:
Warten Sie, bis die DAG-Ausführung abgeschlossen ist.
Führen Sie den Befehl
dags report
noch einmal aus, um die zum Parsen der Zeit für alle DAGs:file | duration | dag_num | task_num | dags ====================+================+=========+==========+===================== /dag_for_loop_airfl | 0:00:37.000369 | 1 | 1000 | dag_for_loop_airflow ow_variable.py | | | | _variable /dag_for_loop_airfl | 0:00:01.109457 | 1 | 1000 | dag_for_loop_airflow ow_variable_optimiz | | | | _variable_optimized ed.py | | | | /airflow_monitoring | 0:00:00.040510 | 1 | 1 | airflow_monitoring .py | | | |
Prüfen Sie
dag-processor-manager
-Logs noch einmal und die Dauer des Parsens.
Durch das Ersetzen der Umgebungsvariablen durch Airflow-Vorlagen haben Sie den DAG-Code vereinfacht und die Parsing-Latenz um etwa das Zehnfache reduziert.
Airflow-Umgebungskonfigurationen optimieren
Der Airflow-Planer versucht kontinuierlich, neue Aufgaben auszulösen und parst alle DAGs in Ihrem Umgebungs-Bucket. Wenn Ihre DAGs lange Parsing-Zeiten haben und der der Planer viele Ressourcen verbraucht, können Sie den Airflow-Planer optimieren. damit der Planer Ressourcen effizienter nutzen kann.
In dieser Anleitung benötigen die DAG-Dateien viel Zeit für das Parsen und das Parsen von Zyklen.
und die Kapazität des Planers
erschöpft ist. In unserem Beispiel dauert das Parsen des ersten Beispiel-DAG mehr als 5 Sekunden. Sie konfigurieren den Scheduler daher so, dass er seltener ausgeführt wird, um die Ressourcen effizienter zu nutzen. Ich
überschreiben die
scheduler_heartbeat_sec
Airflow-Konfigurationsoption. Diese Konfiguration legt fest, wie oft
Planer ausgeführt werden soll (in Sekunden). Standardmäßig ist der Wert auf 5 Sekunden festgelegt.
Sie können diese Airflow-Konfigurationsoption ändern, indem Sie sie überschreiben.
Überschreiben Sie die Airflow-Konfigurationsoption scheduler_heartbeat_sec
:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Airflow-Konfigurationsüberschreibungen auf.
Klicken Sie auf Bearbeiten und dann auf Airflow-Konfigurationsüberschreibung hinzufügen.
Überschreiben Sie die Airflow-Konfigurationsoption:
Bereich Schlüssel Wert scheduler
scheduler_heartbeat_sec
10
Klicken Sie auf Speichern und warten Sie, bis die Konfiguration der Umgebung aktualisiert wurde.
Prüfen Sie die Planermesswerte:
Wechseln Sie zum Tab Monitoring und wählen Sie Schedulers (Planer) aus.
Klicken Sie im Diagramm Scheduler Heartbeat auf die Schaltfläche More options (Weitere Optionen). (drei Punkte) und klicken Sie dann auf Im Metrics Explorer ansehen.
Im Diagramm sehen Sie, dass der Planer zweimal seltener ausgeführt wird, nachdem Sie die Standardkonfiguration von 5 Sekunden in 10 Sekunden geändert. Durch die Reduzierung der Häufigkeit von Herzschlägen einstellen, wird der Planer nicht gestartet, während der vorherige Parsing-Zyklus läuft, und der Ressourcenkapazität nicht erschöpft ist.
Dem Planer weitere Ressourcen zuweisen
In Cloud Composer 2 können Sie dem Scheduler mehr CPU- und Arbeitsspeicherressourcen zuweisen. So können Sie die Leistung Ihres Planers steigern und die Parsing-Zeit des DAG zu beschleunigen.
Weisen Sie dem Planer zusätzliche CPU und Arbeitsspeicher zu:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.
Rufen Sie den Tab Umgebungskonfiguration auf.
Suchen Sie die Konfiguration unter Ressourcen > Arbeitslasten und Klicken Sie auf Bearbeiten.
Geben Sie im Abschnitt Scheduler im Feld Memory das neue Arbeitsspeicherlimit an. Verwenden Sie in dieser Anleitung 4 GB.
Geben Sie im Feld CPU das neue CPU-Limit an. In dieser Anleitung verwenden Sie zwei vCPUs.
Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Planer neu gestartet wurden.
Rufen Sie den Tab Protokolle und dann Alle Protokolle > DAG-Prozessormanager auf.
Prüfen Sie die
dag-processor-manager
-Logs und vergleichen Sie die Parsing-Dauer für Beispiel-DAGs:
Durch die Zuweisung weiterer Ressourcen an den Scheduler haben Sie die Kapazität des Schedulers erhöht und die Parsing-Latenz im Vergleich zu den Standardumgebungskonfigurationen deutlich reduziert. Mit mehr Ressourcen kann der Scheduler die DAGs schneller parsen. Die mit Cloud Composer-Ressourcen verbundenen Kosten steigen jedoch ebenfalls. Außerdem ist es nicht möglich, den Wert über ein bestimmtes Limit hinaus zu arbeiten.
Wir empfehlen, Ressourcen erst nach dem möglichen DAG-Code und Airflow-Konfigurationsoptimierungen wurden implementiert.
Bereinigen
Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder Sie behalten das Projekt und löschen die einzelnen Ressourcen.
Projekt löschen
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Einzelne Ressourcen löschen
Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.
Löschen Sie die Cloud Composer-Umgebung. Außerdem wird dabei der Bucket der Umgebung gelöscht.