Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
이 페이지에서는 다음 설계 패턴을 사용하여 Airflow 파이프라인에서 태스크를 그룹화하는 방법을 설명합니다.
- DAG 그래프의 태스크 그룹화
- 상위 DAG에서 하위 DAG 트리거
TaskGroup
연산자로 태스크 그룹화(Airflow 2에만 해당)
DAG 그래프의 태스크 그룹화
파이프라인의 특정 단계에서 태스크를 그룹화하려면 DAG 파일의 태스크 사이의 관계를 사용하면 됩니다.
다음 예시를 참조하세요.
이 워크플로에서 op-1
태스크와 op-2
태스크는 초기 태스크 start
후에 함께 실행됩니다.
이렇게 하려면 태스크를 start >> [task_1, task_2]
문과 함께 그룹화하면 됩니다.
다음 코드는 위의 DAG를 완전히 구현합니다.
Airflow 2
Airflow 1
상위 DAG에서 하위 DAG 트리거
TriggerDagRunOperator
를 사용하여 다른 DAG에서 하나의 DAG를 트리거할 수 있습니다.
Airflow 버전에 따라 다른 모듈에서 TriggerDagRunOperator
연산자를 찾을 수 있습니다.
- Airflow 1.10.*의 경우
airflow.operators.dagrun_operator
모듈을 사용합니다. - Airflow 2의 경우
airflow.operators.trigger_dagrun
모듈을 사용합니다.
다음 예시를 참조하세요.
이 워크플로에서 dag_1
블록과 dag_2
블록은 Cloud Composer 환경의 개별 DAG로 그룹화된 일련의 태스크를 나타냅니다.
이 워크플로를 구현하려면 DAG 파일 두 개가 필요합니다. 제어 DAG 파일은 다음과 같습니다.
Airflow 2
Airflow 1
제어 DAG에서 트리거하는 하위 DAG의 구현은 다음과 같습니다.
Airflow 2
Airflow 1
워크플로가 작동하려면 Cloud Composer 환경에 두 DAG 파일을 업로드해야 합니다.
TaskGroup
연산자로 태스크 그룹화
Airflow 2에서 TaskGroup
연산자를 사용하여 DAG에서 태스크를 함께 그룹화할 수 있습니다.
TaskGroup
블록 내에 정의된 태스크는 여전히 기본 DAG의 일부입니다.
다음 예시를 참조하세요.
op-1
태스크와 op-2
태스크는 ID가 taskgroup_1
인 블록으로 그룹화됩니다.
이 워크플로 구현은 다음 코드와 비슷합니다.