Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird beschrieben, wie Sie Aufgaben in Ihren Airflow-Pipelines mithilfe der folgenden Designmuster gruppieren können:
- Gruppieren von Aufgaben in der DAG-Grafik
- Untergeordnete DAGs aus einem übergeordneten DAG auslösen.
- Aufgaben mit dem Operator
TaskGroup
gruppieren (nur mit Airflow 2).
Aufgaben in der DAG-Grafik gruppieren
Zum Gruppieren von Aufgaben in bestimmten Phasen Ihrer Pipeline können Sie Beziehungen zwischen den Aufgaben in Ihrer DAG-Datei verwenden.
Dazu ein Beispiel:
In diesem Workflow werden die Aufgaben op-1
und op-2
nach der ersten Aufgabe start
ausgeführt.
Dies erreichen Sie, indem Sie Aufgaben zusammen mit der Anweisung start >> [task_1, task_2]
gruppieren.
Der folgende Code bietet eine vollständige Implementierung des obigen DAG:
Airflow 2
Airflow 1
Untergeordnete DAGs aus einem übergeordneten DAG auslösen
Sie können mit dem TriggerDagRunOperator
einen DAG aus einem anderen DAG auslösen.
Abhängig von Ihrer Airflow-Version finden Sie den Operator TriggerDagRunOperator
in einem anderen Modul:
- Verwenden Sie für Airflow 1.10.*
airflow.operators.dagrun_operator
-Modul. - Verwenden Sie für Airflow 2 das Modul
airflow.operators.trigger_dagrun
.
Dazu ein Beispiel:
In diesem Workflow stellen die Blöcke dag_1
und dag_2
eine Reihe von Aufgaben dar, die in einem separaten DAG in der Cloud Composer-Umgebung gruppiert sind.
Die Implementierung dieses Workflows erfordert zwei separate DAG-Dateien. Die Steuerungs-DAG-Datei sieht so aus:
Airflow 2
Airflow 1
Die Implementierung des untergeordneten DAG, die vom Steuerungs-DAG ausgelöst wird, sieht so aus:
Airflow 2
Airflow 1
Damit der Workflow funktioniert, müssen Sie beide DAG-Dateien in Ihre Cloud Composer-Umgebung hochladen.
Aufgaben mit dem Operator TaskGroup
gruppieren
In Airflow 2 können Sie die Methode
TaskGroup
-Operator
um Aufgaben in Ihrem DAG zu gruppieren.
Aufgaben, die in einem TaskGroup
-Block definiert sind, sind weiterhin Teil des Haupt-DAG.
Dazu ein Beispiel:
Die Aufgaben op-1
und op-2
werden in einem Block mit der ID taskgroup_1
gruppiert.
Eine Implementierung dieses Workflows sieht so aus: