Aufgaben in DAGs gruppieren

Cloud Composer 1 | Cloud Composer 2

Auf dieser Seite wird beschrieben, wie Sie Aufgaben in Ihren Airflow-Pipelines mithilfe der folgenden Designmuster gruppieren können:

  • Gruppiert Aufgaben in der DAG-Grafik
  • Untergeordnete DAGs von einem übergeordneten DAG auslösen
  • Gruppierung von Aufgaben mit dem Operator TaskGroup (nur mit Airflow 2).

Gruppierung von Aufgaben in der DAG-Grafik

Wenn Sie Aufgaben in bestimmten Phasen Ihrer Pipeline gruppieren möchten, können Sie Beziehungen zwischen den Aufgaben in Ihrer DAG-Datei verwenden.

Dazu ein Beispiel:

Aufgaben können in einem Airflow-DAG gruppiert werden

In diesem Workflow werden die Aufgaben op-1 und op-2 nach der ersten Aufgabe start ausgeführt. Dazu fassen Sie Aufgaben mit der Anweisung start >> [task_1, task_2] zusammen.

Der folgende Code bietet eine vollständige Implementierung des obigen DAG:

Luftstrom 2

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'all_tasks_in_one_dag'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    start = DummyOperator(
        task_id='start'
    )

    task_1 = BashOperator(
        task_id='op-1',
        bash_command=':',
        dag=dag)

    task_2 = BashOperator(
        task_id='op-2',
        bash_command=':',
        dag=dag)

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    task_3 = BashOperator(
        task_id='op-3',
        bash_command=':',
        dag=dag)

    task_4 = BashOperator(
        task_id='op-4',
        bash_command=':',
        dag=dag)

    end = DummyOperator(
        task_id='end'
    )

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Luftstrom 1


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'all_tasks_in_one_dag'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    start = DummyOperator(
        task_id='start'
    )

    task_1 = BashOperator(
        task_id='op-1',
        bash_command=':',
        dag=dag)

    task_2 = BashOperator(
        task_id='op-2',
        bash_command=':',
        dag=dag)

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    task_3 = BashOperator(
        task_id='op-3',
        bash_command=':',
        dag=dag)

    task_4 = BashOperator(
        task_id='op-4',
        bash_command=':',
        dag=dag)

    end = DummyOperator(
        task_id='end'
    )

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Untergeordnete DAGs von einem übergeordneten DAG auslösen

Sie können einen DAG mit einem anderen DAG mit TriggerDagRunOperator auslösen. Je nach Airflow-Version finden Sie den Operator TriggerDagRunOperator in einem anderen Modul:

Dazu ein Beispiel:

DAGs können in einem DAG mit dem TriggerDagRunOperator ausgelöst werden

In diesem Workflow stellen die Blöcke dag_1 und dag_2 eine Reihe von Aufgaben dar, die in einem separaten DAG in der Cloud Composer-Umgebung gruppiert werden.

Die Implementierung dieses Workflows erfordert zwei separate DAG-Dateien. Die steuernde DAG-Datei sieht so aus:

Luftstrom 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(
        task_id='start'
    )

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

Luftstrom 1

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(
        task_id='start'
    )

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"}
    )

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(
        task_id='end'
    )

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

Die Implementierung des untergeordneten DAG, der vom steuernden DAG ausgelöst wird, sieht so aus:

Luftstrom 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'dag-to-trigger'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:

    dag_task = DummyOperator(
        task_id='dag-task'
    )

Luftstrom 1

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'dag-to-trigger'

args = {'owner': 'airflow', 'start_date': days_ago(1), 'schedule_interval': "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(
        task_id='dag-task'
    )

Sie müssen beide DAG-Dateien in Ihre Cloud Composer-Umgebung hochladen, damit der Workflow funktioniert.

Aufgaben mit dem Operator TaskGroup gruppieren

In Airflow 2 können Sie mit dem TaskGroup-Operator Aufgaben in Ihrem DAG gruppieren. Aufgaben, die in einem TaskGroup-Block definiert sind, sind weiterhin Teil des Haupt-DAG.

Dazu ein Beispiel:

Aufgaben können in UI mit dem TaskGroup-Operator in Airflow 2 visuell gruppiert werden

Die Aufgaben op-1 und op-2 sind in einem Block mit der ID taskgroup_1 gruppiert. Eine Implementierung dieses Workflows sieht wie der folgende Code aus:

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=':')
        task_2 = BashOperator(task_id="op-2", bash_command=':')

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=':')
        task_4 = BashOperator(task_id="op-4", bash_command=':')

    some_other_task = DummyOperator(
        task_id='some-other-task'
    )

    end = DummyOperator(task_id='end')

    start >> section_1 >> some_other_task >> section_2 >> end