Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
이 가이드에서는 Cloud Composer 환경에서 실행되는 Apache Airflow 방향성 비순환 그래프(DAG)를 작성하는 방법을 보여줍니다.
Apache Airflow는 강력한 DAG 및 태스크 격리를 제공하지 않으므로 DAG 간섭이 방지되도록 프로덕션 환경과 테스트 환경을 별도로 사용하는 것이 좋습니다. 자세한 내용은 DAG 테스트를 참조하세요.
Airflow DAG 구성
Airflow DAG는 Python 파일에 정의되며 다음 요소로 구성되어 있습니다.
- DAG 정의
- Airflow 연산자
- 연산자 관계
다음 코드 스니펫은 컨텍스트 외부에 있는 각 구성요소의 예시를 보여줍니다.
DAG 정의
다음 예는 Airflow DAG 정의를 보여줍니다.
Airflow 2
Airflow 1
연산자 및 태스크
Airflow 연산자는 수행할 작업을 설명합니다. 태스크는 연산자의 특정 인스턴스입니다.
Airflow 2
Airflow 1
태스크 관계
태스크 관계는 작업을 완료해야 하는 순서를 설명합니다.
Airflow 2
Airflow 1
Python의 전체 DAG 워크플로 예시
다음 워크플로는 hello_python
태스크 및 goodbye_bash
태스크의 2개 태스크로 구성된 완전히 작동하는 DAG 템플릿입니다.
Airflow 2
Airflow 1
Airflow DAG를 정의하는 방법에 대한 자세한 내용은 Airflow 튜토리얼 및 Airflow 개념을 참조하세요.
Airflow 연산자
다음 예에서는 많이 사용되는 몇 가지 Airflow 연산자를 보여줍니다. Airflow 연산자에 대한 신뢰할 수 있는 참고 자료가 필요하면 연산자 및 후크 참조 및 제공업체 색인을 참고하세요.
BashOperator
명령줄 프로그램을 실행하려면 BashOperator를 사용합니다.
Airflow 2
Airflow 1
Cloud Composer는 Airflow 작업자의 Bash 스크립트에서 제공한 명령어를 실행합니다. 작업자는 Debian 기반 Docker 컨테이너이며 여러 패키지를 포함합니다.
- Cloud Storage 버킷을 사용하는
gcloud storage
하위 명령어를 포함한gcloud
명령어 bq
명령어kubectl
명령어
PythonOperator
임의의 Python 코드를 실행하려면 PythonOperator를 사용합니다.
Cloud Composer는 컨테이너에서 해당 환경에 사용되는 Cloud Composer 이미지 버전의 패키지를 포함하는 Python 코드를 실행합니다.
추가 Python 패키지를 설치하려면 Python 종속 항목 설치를 참조하세요.
Google Cloud 연산자
Google Cloud 제품을 사용하는 태스크를 실행하려면 Google Cloud Airflow 연산자를 사용하세요. 예를 들어 BigQuery 연산자는 BigQuery의 데이터를 쿼리하고 처리합니다.
Google Cloud 및 Google Cloud에서 제공하는 개별 서비스에 대한 Airflow 연산자가 더 많이 있습니다. 전체 목록은 Google Cloud 연산자를 참고하세요.
Airflow 2
Airflow 1
EmailOperator
DAG에서 이메일을 보내려면 EmailOperator를 사용합니다. Cloud Composer 환경에서 이메일을 보내려면 SendGrid를 사용하도록 환경을 구성해야 합니다.
Airflow 2
Airflow 1
연산자 오류 알림
DAG의 연산자가 실패할 때 이메일 알림을 보내려면 email_on_failure
를 True
로 설정합니다. Cloud Composer 환경에서 이메일 알림을 보내려면 SendGrid를 사용하도록 환경을 구성해야 합니다.
Airflow 2
Airflow 1
DAG 워크플로 가이드라인
중첩 디렉토리에 있는 DAG의 ZIP 파일에 커스텀 Python 라이브러리를 배치합니다. DAG 디렉터리의 최상위 수준에 라이브러리를 배치하지 마세요.
Airflow가
dags/
폴더를 검색할 때는 DAG 폴더의 최상위 수준에 있는 Python 모듈과 최상위dags/
폴더에도 있는 ZIP 파일의 최상위 수준에 있는 Python 모듈에서만 DAG를 확인합니다. Airflow는airflow
및DAG
하위 문자열을 모두 포함하지 않는 ZIP 파일에서 Python 모듈을 발견하면 ZIP 파일 처리를 중지합니다. Airflow는 해당 지점까지 발견된 DAG만 반환합니다.Airflow 1 대신 Airflow 2를 사용합니다.
Airflow 커뮤니티는 더 이상 Airflow 1의 새로운 부 출시 버전이나 패치 출시 버전을 게시하지 않습니다.
내결함성을 위해 동일한 Python 모듈에 여러 DAG 객체를 정의하지 않습니다.
SubDAG를 사용하지 마세요. 대신 DAG 내에서 태스크를 그룹화하세요.
DAG에서 시간을 파싱하는 데 필요한 파일을
data/
폴더가 아닌dags/
폴더에 배치합니다.DAG 테스트 안내에서 권장한 대로 개발되거나 수정된 DAG 테스트
개발된 DAG에서 DAG 파싱 시간을 너무 많이 늘리지 않는지 확인합니다.
Airflow 태스크는 여러 가지 이유로 인해 실패할 수 있습니다. 전체 DAG 실행이 실패하지 않도록 태스크 재시도를 사용 설정하는 것이 좋습니다. 최대 재시도 횟수를
0
으로 설정하면 재시도가 수행되지 않습니다.default_task_retries
옵션을0
이외의 태스크 재시도 값으로 재정의하는 것이 좋습니다. 또한 태스크 수준에서retries
매개변수를 설정할 수 있습니다.Airflow 태스크에 GPU를 사용하려면 GPU가 있는 머신을 사용하는 노드를 기반으로 별도의 GKE 클러스터를 만듭니다. GKEStartPodOperator를 사용하여 태스크를 실행합니다.
다른 Airflow 구성요소(스케줄러, 작업자, 웹 서버)가 실행 중인 클러스터의 노드 풀에서 CPU 및 메모리를 많이 사용하는 태스크를 실행하지 마세요. 대신 KubernetesPodOperator 또는 GKEStartPodOperator를 사용하세요.
DAG를 환경에 배포할 때 DAG를 해석하고 실행하는 데 반드시 필요한 파일만
/dags
폴더에 업로드합니다./dags
폴더의 DAG 파일 수를 제한합니다.Airflow는
/dags
폴더에서 DAG를 지속적으로 파싱합니다. 파싱은 DAG 폴더를 통해 반복되는 프로세스입니다. 종속 항목과 함께 로드해야 하는 파일 수는 DAG 파싱 및 태스크 예약 성능에 영향을 줍니다. 1개 DAG에 10,000개 파일을 사용하는 대신 각 100개 DAG에 100개 파일을 사용하는 것이 더 효율적이고, 이러한 방식으로 최적화하는 것이 좋습니다. 이 최적화는 파싱 시간과 DAG 작성 및 관리 효율 사이의 균형을 위한 것입니다.예를 들어 10,000개의 DAG 파일을 배포하려면 각각 100개의 DAG 파일이 포함된 100개의 ZIP 파일을 만들 수 있습니다.
위의 힌트 외에도 DAG 파일이 10,000개 이상 있으면 프로그래매틱 방식으로 DAG를 생성하는 것이 좋은 방법일 수 있습니다. 예를 들어 일정 개수의 DAG 객체를 생성하는 단일 Python DAG 파일을 구현할 수 있습니다(예: 20, 100개 DAG 객체).
지원 중단된 Airflow 연산자 사용 금지
다음 표에 나열된 연산자는 지원 중단되었습니다. 이러한 연산자 중 일부는 Cloud Composer 1의 초기 버전에서 지원되었습니다. DAG에서 사용하지 마세요. 대신 제공된 최신 대안을 사용하세요.
지원 중단된 연산자 | 사용할 연산자 |
---|---|
BigQueryExecuteQueryOperator | BigQueryInsertJobOperator |
BigQueryPatchDatasetOperator | BigQueryUpdateTableOperator |
DataflowCreateJavaJobOperator | BeamRunJavaPipelineOperator |
DataflowCreatePythonJobOperator | BeamRunPythonPipelineOperator |
DataprocScaleClusterOperator | DataprocUpdateClusterOperator |
DataprocSubmitPigJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkSqlJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkJobOperator | DataprocSubmitJobOperator |
DataprocSubmitHadoopJobOperator | DataprocSubmitJobOperator |
DataprocSubmitPySparkJobOperator | DataprocSubmitJobOperator |
MLEngineManageModelOperator | MLEngineCreateModelOperator, MLEngineGetModelOperator |
MLEngineManageVersionOperator | MLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion |
GCSObjectsWtihPrefixExistenceSensor | GCSObjectsWithPrefixExistenceSensor |
DAG 작성 FAQ
여러 DAG에서 동일하거나 유사한 태스크를 실행하려 할 경우 코드 반복을 최소화하는 방법은 무엇인가요?
코드 반복을 최소화하려면 래퍼와 라이브러리를 정의하는 것이 좋습니다.
DAG 파일 간에 코드를 재사용하려면 어떻게 해야 하나요?
유틸리티 함수를 로컬 Python 라이브러리에 배치하고 함수를 가져옵니다. 환경 버킷의 dags/
폴더에 있는 모든 DAG에서 함수를 참조할 수 있습니다.
다른 정의가 발생할 위험을 최소화하려면 어떻게 해야 하나요?
예를 들어 원시 데이터를 수익 측정항목으로 집계하려는 두 팀이 있습니다. 두 팀은 목표는 동일하지만 약간 다른 2가지 작업을 작성합니다. DAG 구현자가 집계되는 수익 정의를 명확하게 설명하도록 수익 데이터와 함께 사용할 라이브러리를 정의하세요.
DAG 간의 종속 항목은 어떻게 설정하나요?
이는 종속성을 정의하고자 하는 방식에 따라 달라집니다.
DAG 두 개(DAG A 및 DAG B)가 있고 DAG B가 DAG A 다음에 트리거되도록 하려면 DAG A 끝 부분에 TriggerDagRunOperator
를 배치하면 됩니다.
DAG B가 DAG A에서 생성하는 아티팩트(예: Pub/Sub 메시지)에만 의존할 경우 센서가 보다 효율적으로 작동할 수 있습니다.
DAG B가 DAG A와 긴밀하게 통합되면 두 DAG를 하나의 DAG로 병합할 수 있습니다.
고유한 실행 ID를 DAG 및 해당 작업에 전달하려면 어떻게 해야 하나요?
예를 들어 Dataproc 클러스터 이름과 파일 경로를 전달하려고 합니다.
PythonOperator
에서 str(uuid.uuid4())
을 반환하면 임의의 고유 ID를 생성할 수 있습니다. 그러면 ID가 XComs
에 배치되므로 템플릿 필드를 통해 다른 연산자에서 ID를 참조할 수 있습니다.
uuid
를 생성하기 전에 DagRun 관련 ID가 더 중요한지 고려합니다. 또한 매크로를 사용하여 Jinja 대체에서 이러한 ID를 참조할 수 있습니다.
DAG에서 태스크를 분리하려면 어떻게 해야 하나요?
각 태스크는 멱등한 작업 단위여야 합니다. 따라서 다단계 워크플로를 단일 태스크 내에 캡슐화해서는 안 됩니다(예: PythonOperator
에서 복잡한 프로그램 실행).
여러 소스의 데이터를 집계하려면 단일 DAG에 여러 태스크를 정의해야 하나요?
예를 들어 원시 데이터가 있는 여러 테이블이 있고 각 테이블에 대한 일일 집계를 만들려고 합니다. 태스크는 서로 종속되지 않습니다. 테이블마다 태스크와 DAG를 하나씩 만들어야 하나요? 아니면 전체 DAG 하나를 만들어야 하나요?
각 태스크가 동일한 DAG 수준 속성(예: schedule_interval
)을 공유해도 괜찮다면 단일 DAG에 여러 태스크를 정의하는 것이 적합합니다. 그렇지 않을 경우 코드 반복이 최소화되도록 단일 Python 모듈에서 여러 DAG를 모듈 globals()
에 배치하여 생성할 수 있습니다
DAG에서 실행되는 동시 태스크 수를 제한하려면 어떻게 해야 하나요?
예를 들어 API 사용량 한도 또는 할당량을 초과하지 않거나 동시 프로세스를 너무 많이 실행하고 싶지 않습니다.
Airflow 웹 UI에서 Airflow 풀을 정의하고 태스크를 DAG의 기존 풀과 연결할 수 있습니다.
연산자 사용 FAQ
DockerOperator
를 사용해야 하나요?
DockerOperator
는 환경의 클러스터 내에서가 아닌 원격 Docker 설치에서 컨테이너를 실행하는 데 사용되는 경우가 아니면 사용하지 않는 것이 좋습니다. Cloud Composer 환경에서는 연산자가 Docker 데몬에 액세스할 수 없습니다.
대신 KubernetesPodOperator
또는 GKEStartPodOperator
를 사용합니다. 이러한 연산자는 각각 Kubernetes 또는 GKE 클러스터에서 Kubernetes 포드를 시작합니다. 환경의 클러스터에서 포드를 실행하면 리소스 경쟁이 발생할 수 있으므로 권장되지 않습니다.
SubDagOperator
를 사용해야 하나요?
SubDagOperator
를 사용하지 않는 것이 좋습니다.
그룹화 태스크 안내의 제안대로 대안을 사용합니다.
Python 연산자를 완전히 구분하려면 PythonOperators
에서만 Python 코드를 실행해야 하나요?
목표에 따라 몇 가지 옵션이 있습니다.
별도의 Python 종속 항목을 유지하는 것이 유일한 목적이면 PythonVirtualenvOperator
를 사용하면 됩니다.
KubernetesPodOperator
를 사용하는 것이 좋습니다. 이 연산자를 사용하면 Kubernetes 포드를 정의하고 다른 클러스터에서 포드를 실행할 수 있습니다.
커스텀 바이너리 또는 PyPI 이외 패키지를 추가하려면 어떻게 해야 하나요?
비공개 패키지 저장소에 호스팅된 패키지를 설치할 수 있습니다.
DAG 및 해당 태스크에 동일하게 인수를 전달하려면 어떻게 해야 하나요?
Airflow의 기본 제공 Jinja 템플릿을 사용하여 템플릿 필드에서 사용 가능한 인수를 전달할 수 있습니다.
템플릿 대체는 언제 발생하나요?
템플릿 대체는 연산자의 pre_execute
함수가 호출되기 바로 전에 Airflow 작업자에서 발생합니다. 실제로 이는 태스크가 실행되기 직전까지 템플릿이 대체되지 않는다는 것을 의미합니다.
템플릿 대체를 지원하는 연산자 인수를 어떻게 알 수 있나요?
Jinja2 템플릿 대체를 지원하는 연산자 인수는 명시적으로 표시됩니다.
연산자 정의에서 템플릿 대체를 수행할 인수 이름 목록이 포함된 template_fields
필드를 찾습니다.
예를 들어 bash_command
및 env
인수의 템플릿을 지원하는 BashOperator
를 확인하세요.