Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
이 페이지에서는 Airflow 스케줄러 및 DAG 프로세서의 일반적인 문제에 대한 문제 해결 단계 및 정보를 제공합니다.
문제의 출처 식별
문제 해결을 시작하려면 다음 중 문제가 발생하는 경우를 확인하세요.
- DAG 파싱 시간(Airflow DAG 프로세서가 DAG를 파싱하는 동안)
- 실행 시간, DAG가 Airflow 스케줄러에 의해 처리되는 동안
파싱 시간 및 실행 시간에 대한 자세한 내용은 DAG 파싱 시간과 DAG 실행 시간 사이의 차이점을 참조하세요.
DAG 처리 문제 검사
Monitoring 실행 및 큐에 추가된 태스크
큐에서 막혀 있는 태스크가 있는지 확인하려면 다음 단계를 따르세요.
콘솔에서 Google Cloud 환경 페이지로 이동합니다.
환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.
Monitoring 탭으로 이동합니다.
Monitoring 탭에서 DAG 실행 섹션의 Airflow 태스크 차트를 검토하고 발생 가능한 문제가 있는지 식별합니다. Airflow 태스크는 Airflow에서 큐에 추가된 상태인 태스크이며 Celery 또는 Kubernetes Executor 브로커 큐로 이동할 수 있습니다. Celery 큐에 추가된 태스크는 Celery 브로커 큐에 배치되는 태스크 인스턴스입니다.
DAG 파싱 시간에 문제 해결
다음 섹션에서는 DAG 파싱 시간에 발생하는 몇 가지 일반적인 문제들에 대한 증상과 가능한 해결 방법을 설명합니다.
태스크 수 및 시간 분포
Airflow에서 동시에 많은 수의 DAG 또는 태스크를 예약할 때 문제가 발생할 수 있습니다. 예약 문제를 방지하려면 다음 단계를 따르세요.
- 더 적은 수로 보다 통합된 태스크를 사용하도록 DAG를 조정합니다.
- 시간이 지남에 따라 DAG 실행이 더 균등하게 분산되도록 DAG의 일정 간격을 조정합니다.
Airflow 구성 확장
Airflow는 Airflow에서 동시에 실행할 수 있는 태스크 및 DAG 수를 제어하는 Airflow 구성 옵션을 제공합니다. 이 구성 옵션을 설정하려면 사용자 환경의 값을 재정의합니다. DAG 또는 태스크 수준에서 이러한 값 중 일부를 설정할 수도 있습니다.
-
[celery]worker_concurrency
매개변수는 Airflow 작업자가 동시에 실행할 수 있는 태스크의 최대 개수를 제어합니다. 이 매개변수 값에 Cloud Composer 환경에 있는 Airflow 작업자 수를 곱하면 해당 환경의 특정 순간에 실행될 수 있는 최대 태스크 수를 얻을 수 있습니다. 이 수는 추가로 설명되는[core]parallelism
Airflow 구성 옵션에 따라 제한됩니다.Cloud Composer 3 환경에서
[celery]worker_concurrency
의 기본값은 작업자가 수용할 수 있는 경량 동시 작업 인스턴스 수를 기반으로 자동으로 계산됩니다. 즉, 이 값은 작업자 리소스 한도에 따라 달라집니다. 작업자 동시 실행 값은 환경의 작업자 수에 따라 달라지지 않습니다. -
[core]max_active_runs_per_dag
Airflow 구성 옵션은 DAG당 최대 활성 DAG 실행 수를 제어합니다. 이 한도에 도달해도 스케줄러에서 DAG 실행을 추가로 만들지 않습니다.이 매개변수를 잘못 설정하면 지정된 시점에 DAG 실행 인스턴스를 더 만들 수 없으므로 스케줄러가 DAG 실행을 제한하는 문제가 발생할 수 있습니다.
max_active_runs
매개변수를 사용하여 DAG 수준에서 이 값을 설정할 수도 있습니다. -
[core]max_active_tasks_per_dag
Airflow 구성 옵션은 각 DAG에서 동시에 실행될 수 있는 최대 태스크 인스턴스 수를 제어합니다.이 매개변수를 잘못 설정하면 지정된 시점에 실행할 수 있는 DAG 태스크 수가 제한적이므로 단일 DAG 인스턴스 실행에 문제가 발생할 수 있습니다. 이 경우 이 구성 옵션의 값을 늘릴 수 있습니다.
max_active_tasks
매개변수를 사용하여 DAG 수준에서 이 값을 설정할 수도 있습니다.태스크 수준에서
max_active_tis_per_dag
및max_active_tis_per_dagrun
매개변수를 사용하여 DAG별 및 DAG 실행별로 특정 태스크 ID가 있는 인스턴스가 실행되도록 허용되는 수를 제어할 수 있습니다. 동시 로드 및 풀 크기
[core]parallelism
Airflow 구성 옵션은 이러한 태스크의 모든 종속 항목이 충족된 후 Airflow 스케줄러가 Executor 큐에서 큐에 추가할 수 있는 태스크 수를 제어합니다.전체 Airflow 설정에 대한 전역 매개변수입니다.
태스크가 한 풀에서 큐에 추가되고 실행됩니다. Cloud Composer 환경에서는 하나의 풀만 사용합니다. 이 풀의 크기로 스케줄러가 특정 시점에 실행을 위해 큐에 추가할 수 있는 태스크 수가 제어됩니다. 풀 크기가 너무 작으면
[core]parallelism
구성 옵션 및[celery]worker_concurrency
구성 옵션과 Airflow 작업자 수를 곱한 값으로 정의되는 기준에 아직 도달하지 않았더라도 스케줄러에서 실행을 위한 태스크를 큐에 추가할 수 없습니다.Airflow UI에서 풀 크기를 구성할 수 있습니다 (메뉴 > 관리자 > 풀). 해당 환경에서 예상되는 동시 로드 수준에 맞게 풀 크기를 조정하세요.
일반적으로
[core]parallelism
은 최대 작업자 수와[celery]worker_concurrency
의 곱으로 설정됩니다.
실행 및 큐에 추가된 태스크 문제 해결
다음 섹션에서는 실행 및 큐에 추가된 태스크의 몇 가지 일반적인 문제들에 대한 증상과 가능한 해결 방법을 설명합니다.
DAG 실행이 실행되지 않음
증상
DAG의 예약 날짜를 동적으로 설정하면 예기치 않은 여러 부작용이 발생할 수 있습니다. 예를 들면 다음과 같습니다.
DAG 실행이 항상 미래이며 DAG는 실행되지 않습니다.
과거 DAG 실행이 실행되지 않았는데도 성공적으로 실행된 것으로 표시됩니다.
자세한 내용은 Apache Airflow 문서를 참고하세요.
제공 가능한 솔루션:
Apache Airflow 문서의 권장사항을 따르세요.
DAG에 정적
start_date
를 설정합니다. 또는catchup=False
를 사용하여 이전 날짜의 DAG 실행을 사용 중지할 수 있습니다.이 방법의 부작용을 인지하고 있지 않은 경우
datetime.now()
또는days_ago(<number of days>)
를 사용하지 마세요.
Airflow 스케줄러의 TimeTable 기능 사용
Cloud Composer 3은 DAG에 구현된 시간표를 비롯한 Airflow 스케줄러용 맞춤 플러그인을 지원하지 않습니다. 플러그인은 환경의 스케줄러와 동기화되지 않습니다.
Cloud Composer 3에서도 기본 제공 일정을 계속 사용할 수 있습니다.
유지보수 기간 중 태스크 예약 방지
DAG를 실행하는 시간 외에 환경의 유지보수가 실행되도록 환경의 유지보수 기간을 정의할 수 있습니다. 일부 태스크가 중단되고 재시도될 수 있는 한 유지보수 기간 동안 DAG를 계속 실행할 수 있습니다. 유지보수 기간이 환경에 미치는 영향에 대한 자세한 내용은 유지보수 기간 지정을 참고하세요.
DAG에서 'wait_for_downstream' 사용
DAG에서 wait_for_downstream
매개변수를 True
로 설정한 경우 태스크가 성공하려면 이 태스크의 즉시 다운스트림에 있는 모든 태스크도 성공해야 합니다. 즉, 이전 DAG 실행으로부터 태스크를 실행하면 특정 DAG 실행에 속하는 태스크 실행이 느려질 수 있습니다. 자세한 내용은 Airflow 문서를 참조하세요.
너무 오랫동안 큐에 추가된 작업은 취소되고 다시 예약됨
Airflow 태스크가 큐에 너무 오래 보관되면 [scheduler]task_queued_timeout
Airflow 구성 옵션에 설정된 시간이 지난 후 스케줄러가 실행 시간을 다시 예약합니다. 기본값은 2400
입니다.
이 상황의 증상을 관측하는 한 가지 방법은 큐에 추가된 태스크 수가 있는 차트(Cloud Composer UI의 '모니터링' 탭)를 살펴보는 것입니다. 이 차트의 급상승 구간이 약 2시간 내로 하락하지 않을 경우 태스크가 다시 예약될 가능성이 높습니다 (로그 없음). 그 후 스케줄러 로그에 'Adopted tasks were still pending ...' 로그 항목이 표시됩니다. 이러한 경우 태스크가 실행되지 않기 때문에 Airflow 태스크 로그에 '로그 파일을 찾을 수 없습니다...' 메시지가 표시될 수 있습니다.
일반적으로 이러한 동작은 예상된 동작이며, 예약된 태스크의 다음 인스턴스가 일정에 따라 실행됩니다. Cloud Composer 환경에서 이러한 사례가 많이 관측된다면 예약된 모든 태스크를 처리하기에는 환경의 Airflow 작업자가 부족하다는 의미일 수 있습니다.
해결 방법: 이 문제를 해결하려면 큐에 추가된 태스크를 실행할 수 있는 Airflow 작업자 용량이 항상 있는지 확인해야 합니다. 예를 들어 workers 또는 worker_concurrency의 수를 늘릴 수 있습니다. 또한 용량보다 많은 태스크가 큐에 추가되지 않도록 동시 로드 또는 풀을 조정할 수도 있습니다.
Cloud Composer의 min_file_process_interval 매개변수 접근 방식
Airflow 스케줄러가 [scheduler]min_file_process_interval
을 사용하는 방식이 Cloud Composer에서 변경되었습니다.
Airflow 스케줄러는 모든 DAG가 특정 횟수만큼 예약되면 다시 시작되고 [scheduler]num_runs
매개변수는 스케줄러에서 이를 수행하는 횟수를 제어합니다. 스케줄러는 [scheduler]num_runs
예약 루프에 도달하면 다시 시작됩니다. 스케줄러는 스테이트리스(Stateless) 구성요소이며 이러한 재시작은 스케줄러에서 발생할 수 있는 모든 문제를 해결하는 자동 복구 메커니즘입니다. [scheduler]num_runs
의 기본값은 5,000입니다.
[scheduler]min_file_process_interval
은 DAG 파싱 발생 빈도를 구성하는 데 사용할 수 있지만, 이 매개변수는 스케줄러가 DAG를 예약할 때 [scheduler]num_runs
루프를 수행하는 데 필요한 시간보다 길 수 없습니다.
dagrun_timeout에 도달한 후 태스크를 실패로 표시
스케줄러는 dagrun_timeout
(DAG 매개변수) 내에서 DAG 실행이 완료되지 않으면 완료되지 않은(실행 중, 예약됨, 큐에 추가됨) 태스크를 실패한 것으로 표시합니다.
해결책:
dagrun_timeout
을 연장하여 제한 시간을 충족시킵니다.DAG가 빠르게 실행되도록 작업자 수를 늘리거나 작업자 성능 매개변수를 늘립니다.
과부하 상태인 Airflow 데이터베이스의 증상
경우에 따라 Airflow 작업자 로그에 다음 경고 로그 항목이 표시될 수 있습니다.
psycopg2.OperationalError: connection to server at ... failed
이러한 오류 또는 경고는 스케줄러나 작업자, 트리거, 웹 서버 등의 다른 Airflow 구성요소로 인해 Airflow 데이터베이스에서 열린 연결 개수 또는 동시 실행 쿼리 수가 과다하여 발생하는 증상일 수 있습니다.
제공 가능한 솔루션:
환경 크기를 조정하여 Airflow 데이터베이스를 수직 확장합니다.
스케줄러 및 DAG 프로세서 수를 줄입니다. 스케줄러 1개와 DAG 프로세서 1개로 시작한 다음 이러한 구성요소가 리소스 한도에 가까워지면 스케줄러 또는 DAG 프로세서 수를 늘립니다.
Airflow DAG에서 전역 변수를 사용하지 마세요. 대신 환경 변수 및 Airflow 변수를 사용하세요.
[scheduler]scheduler_heartbeat_sec
를 더 높은 값으로 설정합니다(예: 15초 이상).[scheduler]job_heartbeat_sec
을 더 높은 값으로 설정합니다(예: 30초 이상).[scheduler]scheduler_health_check_threshold
을[scheduler]job_heartbeat_sec
에4
를 곱한 값과 같은 값으로 설정합니다.
웹 서버에 '실행될 스케줄러가 표시되지 않음' 경고가 표시됨
스케줄러는 정기적으로 하트비트를 Airflow 데이터베이스에 보고합니다. 이 정보를 기반으로 Airflow 웹 서버는 스케줄러가 활성 상태인지를 확인합니다.
스케줄러가 과부하 상태인 경우 [scheduler]scheduler_heartbeat_sec
마다 하트비트를 보고하지 못할 수 있습니다.
이러한 경우 Airflow 웹 서버에 다음 경고가 표시될 수 있습니다.
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
제공 가능한 솔루션:
스케줄러의 리소스를 CPU 및 메모리에 늘립니다.
더 빠른 경우 파싱 및 예약에서 스케줄러 리소스를 너무 많이 소비하지 않도록 DAG를 최적화합니다.
Airflow DAG에서 전역 변수를 사용하지 마세요. 대신 환경 변수 및 Airflow 변수를 사용하세요.
스케줄러를 사용할 수 없다고 보고하기 전에 웹 서버에서 더 오래 기다리도록
[scheduler]scheduler_health_check_threshold
Airflow 구성 옵션의 값을 늘립니다.
DAG 백필 시 발생한 문제의 해결 방법
이미 실행된 DAG를 다시 실행해야 하는 경우가 있습니다. Airflow CLI 명령어를 사용하여 수행할 수 있는 방법은 다음과 같습니다.
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
특정 DAG에서 실패한 태스크만 다시 실행하려면 --rerun-failed-tasks
인수도 사용합니다.
다음과 같이 바꿉니다.
ENVIRONMENT_NAME
: 환경 이름LOCATION
을 환경이 위치한 리전으로 바꿉니다.START_DATE
를YYYY-MM-DD
형식의start_date
DAG 매개변수 값으로 바꿉니다.END_DATE
를YYYY-MM-DD
형식의end_date
DAG 매개변수 값으로 바꿉니다.DAG_NAME
를 DAG의 이름으로 바꿉니다.
백필 작업을 수행하면 교착 상태가 발생하는 경우가 있습니다. 이 경우 태스크가 잠금 상태이므로 백필이 불가능합니다. 예를 들면 다음과 같습니다.
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
경우에 따라 다음 해결 방법을 사용하여 교착 상태를 해결할 수 있습니다.
[core]schedule_after_task_execution
을False
로 재정의하여 mini-scheduler를 사용 중지합니다.더 좁은 기간에 대해 백필을 실행합니다. 예를 들어
START_DATE
및END_DATE
를 설정하여 기간을 1일만 지정합니다.