In dieser Anleitung erfahren Sie, wie Sie einen gerichteten azyklischen Graphen (Directed Acyclic Graph; DAG) in Apache Airflow für die Ausführung in einer Cloud Composer-Umgebung schreiben.
Airflow-DAG strukturieren
Ein DAG wird in einer Python-Datei definiert und besteht aus einer DAG-Definition, Operatoren und Operatorbeziehungen. Die folgenden Code-Snippets zeigen Beispiele der einzelnen Komponenten außerhalb des Kontexts:
Eine DAG-Definition.
Airflow 2
Airflow 1
Operatoren, mit denen die auszuführende Arbeit beschrieben wird. Eine Instanziierung eines Operators wird als Aufgabe bezeichnet.
Airflow 2
Airflow 1
Aufgabenbeziehungen, die beschreiben, in welcher Reihenfolge die Ausführung erfolgen soll:
Airflow 2
Airflow 1
Beispiel für einen vollständigen DAG-Workflow in Python
Der folgende Workflow ist eine vollständige funktionierende DAG-Vorlage, die sich aus zwei Aufgaben zusammensetzt: einer hello_python
-Aufgabe und einer goodbye_bash
-Aufgabe:
Airflow 2
Airflow 1
Airflow-Anleitungen und -Konzepte
Weitere Informationen zum Definieren von Airflow-DAGs finden Sie in der Airflow-Anleitung und in den Airflow-Konzepten.
Airflow-Operatoren
Die folgenden Beispiele enthalten einige beliebte Airflow-Operatoren. Informationen zu weiteren Airflow-Operatoren finden Sie in der Apache Airflow API-Referenz und im Quellcode der Operatoren core, contrib und providers.
BashOperator
Mit dem BashOperator können Sie Befehlszeilenprogramme ausführen.
Airflow 2
Airflow 1
Cloud Composer führt die bereitgestellten Befehle in einem Bash-Skript auf einem Worker aus. Der Worker ist ein Docker-Container auf Basis von Debian und enthält mehrere Pakete.
PythonOperator
Verwenden Sie den PythonOperator, um beliebigen Python-Code auszuführen.
Cloud Composer führt den Python-Code in einem Container aus, der Pakete für die in Ihrer Umgebung verwendete Cloud Composer-Image-Version enthält.
Informationen zum Installieren weiterer Python-Pakete finden Sie unter Python-Abhängigkeiten installieren.
Google Cloud-Operatoren
Mit den Google Cloud-Airflow-Operatoren können Sie Aufgaben ausführen, die Google Cloud-Produkte nutzen. Cloud Composer konfiguriert automatisch eine Airflow-Verbindung zum Projekt der Umgebung.
BigQuery-Operatoren fragen Daten in BigQuery ab und verarbeiten sie.
Airflow 2
Airflow 1
Mit Cloud Dataflow-Operatoren werden Apache Beam-Jobs in Dataflow ausgeführt.
Mit Cloud Data Fusion-Operatoren werden Cloud Data Fusion-Pipelines verwaltet und ausgeführt.
Mit Dataproc-Operatoren werden Hadoop- und Spark-Jobs in Dataproc ausgeführt.
Mit Datastore-Operatoren werden Daten in Datastore gelesen und geschrieben.
Mit AI Platform-Operatoren werden Trainings- und Vorhersagejobs in AI Platform ausgeführt.
Mit Cloud Storage-Operatoren werden Daten in Cloud Storage gelesen und geschrieben.
EmailOperator
Verwenden Sie den EmailOperator, um E-Mails von einem DAG zu senden. Wenn Sie E-Mails aus einer Cloud Composer-Umgebung senden möchten, müssen Sie die Verwendung von SendGrid in der Umgebung konfigurieren.
Airflow 2
Airflow 1
Benachrichtigungen
Zum Senden einer E-Mail-Benachrichtigung, wenn ein Operator im DAG fehlerhaft ist, legen Sie für email_on_failure
den Wert True
fest. Zum Senden von E-Mails aus einer Cloud Composer-Umgebung müssen Sie Ihre Umgebung für die Verwendung von SendGrid konfigurieren.
Airflow 2
Airflow 1
DAG-Workflowrichtlinien
Airflow 2 statt Airflow 1 verwenden.
Die Airflow-Community veröffentlicht keine neuen Neben- oder Patchreleases für Airflow 1 mehr.
Platzieren Sie benutzerdefinierte Python-Bibliotheken im ZIP-Archiv eines DAG in einem verschachtelten Verzeichnis. Platzieren Sie Bibliotheken nicht auf der obersten Ebene des DAG-Verzeichnisses.
Airflow prüft den Ordner
dags/
nur auf DAGs in Python-Modulen, die sich auf der obersten Ebene des Ordners "DAGs" und auf der obersten Ebene eines ZIP-Archivs befinden, das ebenfalls im Ordnerdags/
auf oberster Ebene enthalten ist. Wenn Airflow in einem ZIP-Archiv ein Python-Modul ermittelt, das wederairflow
- nochDAG
-Teilstrings enthält, beendet Airflow die Verarbeitung des ZIP-Archivs. Airflow liefert nur diejenigen DAGs, die bis zum jeweiligen Zeitpunkt ermittelt wurden.Achten Sie aus Gründen der Fehlertoleranz darauf, nicht mehrere DAG-Objekte im gleichen Python-Modul zu definieren.
Verwenden Sie keine SubDAGs. Verwenden Sie stattdessen Alternativen.
Legen Sie Dateien, die zum Zeitpunkt der DAG-Analyse erforderlich sind, im Ordner
dags/
und nicht im Verzeichnisdata/
ab.Testen Sie entwickelte oder geänderte DAGs wie in der Anleitung zum Testen von DAGs empfohlen.
Achten Sie darauf, dass die entwickelten DAGs die DAG-Analysezeiten nicht zu stark erhöhen.
Airflow-Aufgaben können aus mehreren Gründen fehlschlagen. Wir empfehlen, Aufgabenwiederholungen zu aktivieren, um Fehler bei ganzen DAG-Ausführungen zu vermeiden. Wenn Sie maximale Wiederholungen auf
0
festlegen, werden keine Wiederholungen ausgeführt.Wir empfehlen, die Option
default_task_retries
mit einem Wert für die Aufgabe außer0
zu überschreiben. Darüber hinaus können Sie den Parameter für Wiederholungsversuche auf Aufgabenebene festlegen (falls erforderlich).Wenn Sie GPU in Ihren Airflow-Aufgaben verwenden möchten, erstellen Sie einen separaten GKE-Cluster basierend auf Knoten, die Maschinen mit GPUs verwenden. Führen Sie Ihre Aufgaben mit GKEStartPodOperator aus.
Vermeiden Sie CPU- und Arbeitsspeicher-intensive Aufgaben im Knotenpool des Clusters, in dem andere Airflow-Komponenten (Planer, Worker, Webserver) ausgeführt werden. Verwenden Sie stattdessen KubernetesPodOperator oder GKEPodOperator. Auswählen
Wenn Sie DAGs in einer Umgebung bereitstellen, laden Sie nur die Dateien hoch, die für die Interpretation und Ausführung von DAGs im Ordner
/dags
unbedingt erforderlich sind.Anzahl der DAG-Dateien im Ordner
/dags
begrenzenAirflow parst DAGs kontinuierlich im Ordner
/dags
. Das Parsing ist ein Prozess, der den DAGs-Ordner durchläuft und die Anzahl der Dateien, die (mit ihren Abhängigkeiten) geladen werden müssen, die Leistung des DAG-Parsings und der Aufgabenplanung beeinflusst. Es ist viel effizienter, 100 Dateien mit jeweils 100 DAGs mit jeweils 10.000 Dateien mit einem DAG zu verwenden. Daher wird eine solche Optimierung empfohlen. Diese Optimierung ist ein Kompromiss zwischen Parsing-Zeit und Effizienz der DAG-Erstellung und -Verwaltung.Sie können beispielsweise auch 100 ZIP-Dateien mit 100 DAG-Dateien erstellen, um 10.000 DAG-Dateien bereitzustellen.
Wenn Sie mehr als 10.000 DAG-Dateien haben, kann es sinnvoll sein, DAGs programmatisch zu generieren. Sie können beispielsweise eine einzelne Python-DAG-Datei implementieren, die eine bestimmte Anzahl von DAG-Objekten generiert (z.B. 20, 100 DAG-Objekte).
FAQs zum Schreiben von DAGs
Wie minimiere ich Codewiederholungen, wenn ich die gleichen oder ähnliche Aufgaben in mehreren DAGs ausführen möchte?
Wir empfehlen die Definition von Bibliotheken und Wrappern, um Codewiederholungen zu minimieren.
Wie kann ich Code in mehreren DAG-Dateien wiederverwenden?
Binden Sie Hilfsfunktionen in eine lokale Python-Bibliothek ein und importieren Sie die Funktionen. Sie können in allen DAGs, die sich im dags/
-Ordner Ihres Buckets befinden, auf die Funktionen verweisen.
Wie minimiere ich das Risiko unterschiedlicher Definitionen?
Angenommen, es gibt zwei Teams, die Rohdaten zu Umsatzkennzahlen zusammenfassen möchten. Die Teams schreiben zwei geringfügig unterschiedliche Aufgaben für den gleichen Sachverhalt. Definieren Sie Bibliotheken für die Arbeit mit den Umsatzdaten, sodass diejenigen, die DAGs implementieren, die Definition des zusammengefassten Umsatzes eindeutig festlegen müssen.
Wie lege ich Abhängigkeiten zwischen DAGs fest?
Das hängt davon ab, wie Sie die Abhängigkeit definieren möchten.
Wenn Sie zwei DAGs haben (DAG A und DAG B) und DAG B nach DAG A ausgelöst werden soll, können Sie einen TriggerDagRunOperator
am Ende von DAG A platzieren.
Wenn DAG B nur von einem von DAG A generierten Artefakt abhängt (z. B. eine Pub/Sub-Meldung), ist ein Sensor möglicherweise besser geeignet.
Wenn DAG B eng mit DAG A integriert ist, können Sie die beiden DAGs möglicherweise in einen DAG zusammenführen.
Wie übergebe ich eindeutige Ausführungs-IDs an einen DAG und die zugehörigen Aufgaben?
Angenommen, es sollen Dataproc-Clusternamen und -Dateipfade übergeben werden.
In diesem Fall können Sie eine zufällige eindeutige ID generieren und dafür str(uuid.uuid4())
in einem PythonOperator
zurückgeben. Dadurch wird die ID in XComs
abgelegt, sodass Sie in anderen Operatoren über Vorlagenfelder darauf verweisen können.
Prüfen Sie vor dem Generieren einer uuid
, ob eine DagRun-spezifische ID sinnvoller wäre. Sie können auf diese IDs in Jinja-Substitutionen auch mit Makros verweisen.
Wie trenne ich Aufgaben in einem DAG?
Eine Aufgabe sollte eine idempotente Arbeitseinheit sein. Vermeiden Sie es deshalb, einen aus mehreren Schritten bestehenden Workflow in eine einzelne Aufgabe aufzunehmen, z. B. in ein komplexes Programm, das in einem PythonOperator
ausgeführt wird.
Soll ich mehrere Aufgaben in einem einzelnen DAG definieren, um Daten aus mehreren Quellen zusammenzufassen?
Angenommen, ich habe mehrere Tabellen mit Rohdaten und möchte tägliche Zusammenfassungen für jede einzelne Tabelle erstellen. Die Aufgaben sind nicht voneinander abhängig. Soll ich eine Aufgabe und einen DAG für jede Tabelle oder einen allgemeinen DAG erstellen?
Wenn es für Sie kein Problem ist, dass jede Aufgabe die gleichen Attribute auf DAG-Ebene verwendet (z. B. schedule_interval
), ist es sinnvoll, mehrere Aufgaben in einem einzigen DAG zu definieren. Andernfalls können zur Minimierung der Codewiederholung mehrere DAGs aus einem einzigen Python-Modul generiert werden. Dazu platzieren Sie diese in den globalen globals()
des Moduls.
Wie beschränke ich die Anzahl gleichzeitiger Aufgaben, die in einem DAG ausgeführt werden?
Ich möchte z. B. vermeiden, dass API-Nutzungslimits und -kontingente überschritten oder zu viele Prozesse gleichzeitig ausgeführt werden.
Sie können dazu Airflow-Pools in der Airflow-Weboberfläche definieren und in Ihren DAGs Aufgaben mit vorhandenen Pools verknüpfen.
FAQs zur Verwendung von Operatoren
Soll ich den DockerOperator
verwenden?
Wir raten von der Verwendung des DockerOperator
ab, es sei denn, er wird zum Starten von Containern in einer Remote-Docker-Installation verwendet (nicht im Cluster einer Umgebung). In einer Cloud Composer-Umgebung hat der Operator keinen Zugriff auf Docker-Daemons.
Verwenden Sie stattdessen KubernetesPodOperator
oder GKEStartPodOperator
. Mit diesen Operatoren werden Kubernetes-Pods in Kubernetes- oder GKE-Clustern gestartet. Wir raten davon ab, Pods im Cluster einer Umgebung zu starten, da dies zu einem Ressourcenwettbewerb führen kann.
Soll ich den SubDagOperator
verwenden?
Die Verwendung von SubDagOperator
wird nicht empfohlen.
Verwenden Sie Alternativen wie in Anleitung für Gruppierungsaufgaben beschrieben.
Soll ich Python-Code nur in PythonOperators
ausführen, um Python-Operatoren vollständig zu trennen?
Abhängig von Ihrem Ziel haben Sie mehrere Optionen.
Falls Ihr einziges Ziel ist, separate Python-Abhängigkeiten beizubehalten, können Sie PythonVirtualenvOperator
verwenden.
Erwägen Sie, den KubernetesPodOperator
zu verwenden.
Mit diesem Operator können Sie Kubernetes-Pods definieren und die Pods in anderen Clustern ausführen.
Wie füge ich benutzerdefinierte binäre oder Nicht-PyPI-Pakete hinzu?
Sie können dazu Pakete installieren, die in privaten Paket-Repositories gehostet werden,
Sie können auch den KubernetesPodOperator
verwenden, um einen Kubernetes-Pod mit einem eigenen Image auszuführen, das Sie aus benutzerdefinierten Paketen erstellt haben.
Wie übergebe ich Argumente einheitlich an einen DAG und die zugehörigen Aufgaben?
Sie können die integrierte Airflow-Unterstützung für Jinja-Vorlagen nutzen, um Argumente zu übergeben, die in Vorlagenfeldern verwendet werden können.
Wann findet die Vorlagenersetzung statt?
Die Vorlagen werden auf den Airflow-Workern unmittelbar vor dem Aufruf der pre_execute
-Funktion eines Operators ersetzt. In der Praxis bedeutet dies, dass Vorlagen erst unmittelbar vor der Ausführung einer Aufgabe ersetzt werden.
Wie kann ich erkennen, welche Operatorargumente die Vorlagenersetzung unterstützen?
Operatorargumente, die die Jinja2-Vorlagenersetzung unterstützen, sind explizit entsprechend gekennzeichnet.
Suchen Sie in der Operatordefinition nach dem Feld template_fields
. Es enthält eine Liste der Argumentnamen, für die die Vorlagenersetzung verwendet wird.
Dazu gehört beispielsweise der BashOperator
, mit dem Vorlagen für die Argumente bash_command
und env
unterstützt werden.
Veraltete Airflow-Operatoren vermeiden
Die in der folgenden Tabelle aufgeführten Operatoren wurden verworfen. Vermeiden Sie die Verwendung in Ihren DAGs. Verwenden Sie stattdessen aktuelle Alternativen.
Verworfener Operator | Operator |
---|---|
Operator „BigQueryExecuteQuery“ | BigQueryInsertJobOperator |
Operator „BigQueryPatchDataset“ | BigQueryUpdateTableOperator |
DataflowCreateJavaJobOperator | BeamRunJavaPipelineOperator |
DataflowCreatePythonJobOperator | BeamRunPythonPipelineOperator |
DataprocScaleClusterOperator | DataprocUpdateClusterOperator |
DataprocSubmitPigJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkSqlJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkJobOperator | DataprocSubmitJobOperator |
DataprocSubmitHadoopJobOperator | DataprocSubmitJobOperator |
DataprocSubmitPySparkJobOperator | DataprocSubmitJobOperator |
MLEngineManageModelOperator | MLEngineCreateModelOperator, MLEngineGetModelOperator |
Operator von MLEngineManageVersion | MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion |
GCSObjectsWtihPrefixExistenceSensor | GCSObjectsWithPrefixExistenceSensor |
Nächste Schritte
- Fehlerbehebung bei DAGs
- Fehlerbehebung für Planer
- Google-Operatoren
- Google Cloud Platform-Operatoren
- Apache Airflow-Anleitung
- Referenz zur Apache Airflow API
- Core-Airflow-Operatoren auf GitHub. Lesen Sie die Informationen zum Branch für Ihr Airflow-Release.
- Contrib-Airflow-Operatoren auf GitHub. (siehe Informationen zum Zweig für Ihr Airflow-Release)