DAG 내 태스크 그룹화

Cloud Composer 1 | Cloud Composer 2

이 페이지에서는 다음 설계 패턴을 사용하여 Airflow 파이프라인에서 태스크를 그룹화하는 방법을 설명합니다.

  • DAG 그래프의 태스크 그룹화
  • 상위 DAG에서 하위 DAG 트리거
  • TaskGroup 연산자로 태스크 그룹화(Airflow 2에만 해당)

DAG 그래프의 태스크 그룹화

파이프라인의 특정 단계에서 태스크를 그룹화하려면 DAG 파일의 태스크 사이의 관계를 사용하면 됩니다.

다음 예시를 참조하세요.

Airflow DAG에서 태스크를 그룹화할 수 있습니다.

이 워크플로에서 op-1 태스크와 op-2 태스크는 초기 태스크 start 후에 함께 실행됩니다. 이렇게 하려면 태스크를 start >> [task_1, task_2] 문과 함께 그룹화하면 됩니다.

다음 코드는 위의 DAG를 완전히 구현합니다.

Airflow 2

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Airflow 1


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

상위 DAG에서 하위 DAG 트리거

TriggerDagRunOperator를 사용하여 다른 DAG에서 하나의 DAG를 트리거할 수 있습니다. Airflow 버전에 따라 다른 모듈에서 TriggerDagRunOperator 연산자를 찾을 수 있습니다.

다음 예시를 참조하세요.

TriggerDagRunOperator를 사용해 DAG 내에서 DAG를 트리거할 수 있습니다.

이 워크플로에서 dag_1 블록과 dag_2 블록은 Cloud Composer 환경의 개별 DAG로 그룹화된 일련의 태스크를 나타냅니다.

이 워크플로를 구현하려면 DAG 파일 두 개가 필요합니다. 제어 DAG 파일은 다음과 같습니다.

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

Airflow 1

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

제어 DAG에서 트리거하는 하위 DAG의 구현은 다음과 같습니다.

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

Airflow 1

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

워크플로가 작동하려면 Cloud Composer 환경에 두 DAG 파일을 업로드해야 합니다.

TaskGroup 연산자로 태스크 그룹화

Airflow 2에서 TaskGroup 연산자를 사용하여 DAG에서 태스크를 함께 그룹화할 수 있습니다. TaskGroup 블록 내에 정의된 태스크는 여전히 기본 DAG의 일부입니다.

다음 예시를 참조하세요.

Airflow 2의 TaskGroup 연산자를 사용하여 UI에서 태스크를 시각적으로 그룹화할 수 있습니다

op-1 태스크와 op-2 태스크는 ID가 taskgroup_1인 블록으로 그룹화됩니다. 이 워크플로 구현은 다음 코드와 비슷합니다.

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=":")
        task_4 = BashOperator(task_id="op-4", bash_command=":")

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> section_1 >> some_other_task >> section_2 >> end