Airflow 스케줄러 문제 해결

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

이 페이지에서는 Airflow 스케줄러 및 DAG 프로세서의 일반적인 문제에 대한 문제 해결 단계 및 정보를 제공합니다.

문제의 출처 식별

문제 해결을 시작하려면 다음 중 문제가 발생하는 경우를 확인하세요.

  • DAG 파싱 시간(Airflow DAG 프로세서가 DAG를 파싱하는 동안)
  • 실행 시간, DAG가 Airflow 스케줄러에 의해 처리되는 동안

파싱 시간 및 실행 시간에 대한 자세한 내용은 DAG 파싱 시간과 DAG 실행 시간 사이의 차이점을 참조하세요.

DAG 처리 문제 검사

  1. DAG 프로세서 로그를 검사합니다.
  2. DAG 파싱 시간 확인

Monitoring 실행 및 큐에 추가된 태스크

큐에서 막혀 있는 태스크가 있는지 확인하려면 다음 단계를 따르세요.

  1. 콘솔에서 Google Cloud 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. Monitoring 탭으로 이동합니다.

  4. Monitoring 탭에서 DAG 실행 섹션의 Airflow 태스크 차트를 검토하고 발생 가능한 문제가 있는지 식별합니다. Airflow 태스크는 Airflow에서 큐에 추가된 상태인 태스크이며 Celery 또는 Kubernetes Executor 브로커 큐로 이동할 수 있습니다. Celery 큐에 추가된 태스크는 Celery 브로커 큐에 배치되는 태스크 인스턴스입니다.

DAG 파싱 시간에 문제 해결

다음 섹션에서는 DAG 파싱 시간에 발생하는 몇 가지 일반적인 문제들에 대한 증상과 가능한 해결 방법을 설명합니다.

태스크 수 및 시간 분포

Airflow에서 동시에 많은 수의 DAG 또는 태스크를 예약할 때 문제가 발생할 수 있습니다. 예약 문제를 방지하려면 다음 단계를 따르세요.

  • 더 적은 수로 보다 통합된 태스크를 사용하도록 DAG를 조정합니다.
  • 시간이 지남에 따라 DAG 실행이 더 균등하게 분산되도록 DAG의 일정 간격을 조정합니다.

Airflow 구성 확장

Airflow는 Airflow에서 동시에 실행할 수 있는 태스크 및 DAG 수를 제어하는 Airflow 구성 옵션을 제공합니다. 이 구성 옵션을 설정하려면 사용자 환경의 값을 재정의합니다. DAG 또는 태스크 수준에서 이러한 값 중 일부를 설정할 수도 있습니다.

  • 작업자 동시 실행

    [celery]worker_concurrency 매개변수는 Airflow 작업자가 동시에 실행할 수 있는 태스크의 최대 개수를 제어합니다. 이 매개변수 값에 Cloud Composer 환경에 있는 Airflow 작업자 수를 곱하면 해당 환경의 특정 순간에 실행될 수 있는 최대 태스크 수를 얻을 수 있습니다. 이 수는 추가로 설명되는 [core]parallelism Airflow 구성 옵션에 따라 제한됩니다.

    Cloud Composer 3 환경에서 [celery]worker_concurrency의 기본값은 작업자가 수용할 수 있는 경량 동시 작업 인스턴스 수를 기반으로 자동으로 계산됩니다. 즉, 이 값은 작업자 리소스 한도에 따라 달라집니다. 작업자 동시 실행 값은 환경의 작업자 수에 따라 달라지지 않습니다.

  • 최대 활성 DAG 실행 수

    [core]max_active_runs_per_dag Airflow 구성 옵션은 DAG당 최대 활성 DAG 실행 수를 제어합니다. 이 한도에 도달해도 스케줄러에서 DAG 실행을 추가로 만들지 않습니다.

    이 매개변수를 잘못 설정하면 지정된 시점에 DAG 실행 인스턴스를 더 만들 수 없으므로 스케줄러가 DAG 실행을 제한하는 문제가 발생할 수 있습니다.

    max_active_runs 매개변수를 사용하여 DAG 수준에서 이 값을 설정할 수도 있습니다.

  • DAG당 최대 활성 태스크 수

    [core]max_active_tasks_per_dag Airflow 구성 옵션은 각 DAG에서 동시에 실행될 수 있는 최대 태스크 인스턴스 수를 제어합니다.

    이 매개변수를 잘못 설정하면 지정된 시점에 실행할 수 있는 DAG 태스크 수가 제한적이므로 단일 DAG 인스턴스 실행에 문제가 발생할 수 있습니다. 이 경우 이 구성 옵션의 값을 늘릴 수 있습니다.

    max_active_tasks 매개변수를 사용하여 DAG 수준에서 이 값을 설정할 수도 있습니다.

    태스크 수준에서 max_active_tis_per_dagmax_active_tis_per_dagrun 매개변수를 사용하여 DAG별 및 DAG 실행별로 특정 태스크 ID가 있는 인스턴스가 실행되도록 허용되는 수를 제어할 수 있습니다.

  • 동시 로드 및 풀 크기

    [core]parallelism Airflow 구성 옵션은 이러한 태스크의 모든 종속 항목이 충족된 후 Airflow 스케줄러가 Executor 큐에서 큐에 추가할 수 있는 태스크 수를 제어합니다.

    전체 Airflow 설정에 대한 전역 매개변수입니다.

    태스크가 한 풀에서 큐에 추가되고 실행됩니다. Cloud Composer 환경에서는 하나의 풀만 사용합니다. 이 풀의 크기로 스케줄러가 특정 시점에 실행을 위해 큐에 추가할 수 있는 태스크 수가 제어됩니다. 풀 크기가 너무 작으면 [core]parallelism 구성 옵션 및 [celery]worker_concurrency 구성 옵션과 Airflow 작업자 수를 곱한 값으로 정의되는 기준에 아직 도달하지 않았더라도 스케줄러에서 실행을 위한 태스크를 큐에 추가할 수 없습니다.

    Airflow UI에서 풀 크기를 구성할 수 있습니다 (메뉴 > 관리자 > ). 해당 환경에서 예상되는 동시 로드 수준에 맞게 풀 크기를 조정하세요.

    일반적으로 [core]parallelism은 최대 작업자 수와 [celery]worker_concurrency의 곱으로 설정됩니다.

실행 및 큐에 추가된 태스크 문제 해결

다음 섹션에서는 실행 및 큐에 추가된 태스크의 몇 가지 일반적인 문제들에 대한 증상과 가능한 해결 방법을 설명합니다.

DAG 실행이 실행되지 않음

증상

DAG의 예약 날짜를 동적으로 설정하면 예기치 않은 여러 부작용이 발생할 수 있습니다. 예를 들면 다음과 같습니다.

  • DAG 실행이 항상 미래이며 DAG는 실행되지 않습니다.

  • 과거 DAG 실행이 실행되지 않았는데도 성공적으로 실행된 것으로 표시됩니다.

자세한 내용은 Apache Airflow 문서를 참고하세요.

제공 가능한 솔루션:

  • Apache Airflow 문서의 권장사항을 따르세요.

  • DAG에 정적 start_date를 설정합니다. 또는 catchup=False를 사용하여 이전 날짜의 DAG 실행을 사용 중지할 수 있습니다.

  • 이 방법의 부작용을 인지하고 있지 않은 경우 datetime.now() 또는 days_ago(<number of days>)를 사용하지 마세요.

Airflow 스케줄러의 TimeTable 기능 사용

시간 테이블은 Airflow 2.2부터 사용할 수 있습니다.

다음 방법 중 하나를 사용하여 DAG의 시간 테이블을 정의할 수 있습니다.

내장 시간표를 사용할 수도 있습니다.

유지보수 기간 중 태스크 예약 방지

DAG를 실행하는 시간 외에 환경의 유지보수가 실행되도록 환경의 유지보수 기간을 정의할 수 있습니다. 일부 태스크가 중단되고 재시도될 수 있는 한 유지보수 기간 동안 DAG를 계속 실행할 수 있습니다. 유지보수 기간이 환경에 미치는 영향에 대한 자세한 내용은 유지보수 기간 지정을 참고하세요.

DAG에서 'wait_for_downstream' 사용

DAG에서 wait_for_downstream 매개변수를 True로 설정한 경우 태스크가 성공하려면 이 태스크의 즉시 다운스트림에 있는 모든 태스크도 성공해야 합니다. 즉, 이전 DAG 실행으로부터 태스크를 실행하면 특정 DAG 실행에 속하는 태스크 실행이 느려질 수 있습니다. 자세한 내용은 Airflow 문서를 참조하세요.

너무 오랫동안 큐에 추가된 작업은 취소되고 다시 예약됨

Airflow 태스크가 큐에 너무 오래 보관되면 [scheduler]task_queued_timeout Airflow 구성 옵션에 설정된 시간이 지난 후 스케줄러가 실행 시간을 다시 예약합니다. 기본값은 2400입니다. Airflow 버전 2.3.1 이전에서는 태스크가 실패로 표시되고 가능한 경우 재시도됩니다.

이 상황의 증상을 관측하는 한 가지 방법은 큐에 추가된 태스크 수가 있는 차트(Cloud Composer UI의 '모니터링' 탭)를 살펴보는 것입니다. 이 차트의 급상승 구간이 약 2시간 내로 하락하지 않을 경우 태스크가 다시 예약될 가능성이 높습니다 (로그 없음). 그 후 스케줄러 로그에 'Adopted tasks were still pending ...' 로그 항목이 표시됩니다. 이러한 경우 태스크가 실행되지 않기 때문에 Airflow 태스크 로그에 '로그 파일을 찾을 수 없습니다...' 메시지가 표시될 수 있습니다.

일반적으로 이러한 동작은 예상된 동작이며, 예약된 태스크의 다음 인스턴스가 일정에 따라 실행됩니다. Cloud Composer 환경에서 이러한 사례가 많이 관측된다면 예약된 모든 태스크를 처리하기에는 환경의 Airflow 작업자가 부족하다는 의미일 수 있습니다.

해결 방법: 이 문제를 해결하려면 큐에 추가된 태스크를 실행할 수 있는 Airflow 작업자 용량이 항상 있는지 확인해야 합니다. 예를 들어 workers 또는 worker_concurrency의 수를 늘릴 수 있습니다. 또한 용량보다 많은 태스크가 큐에 추가되지 않도록 동시 로드 또는 풀을 조정할 수도 있습니다.

큐에 멈춰 있는 태스크로 인해 특정 DAG의 실행이 차단될 수 있습니다.

이 문제를 해결하려면 환경을 Cloud Composer 버전 2.1.12 이상으로 업그레이드합니다.

일반적인 경우 Airflow 스케줄러는 큐에 태스크가 있고 어떤 이유로 해당 태스크를 제대로 실행할 수 없는 상황 (예: 이러한 태스크가 속한 DAG가 삭제되었음)을 처리할 수 있어야 합니다.

이러한 태스크가 스케줄러에 의해 삭제되지 않는 경우 수동으로 삭제해야 할 수 있습니다. 예를 들어 Airflow UI (메뉴 > 브라우저 > 태스크 인스턴스)에서 큐에 추가된 태스크를 찾아 삭제할 수 있습니다.

Cloud Composer의 min_file_process_interval 매개변수 접근 방식

Airflow 스케줄러가 [scheduler]min_file_process_interval을 사용하는 방식이 Cloud Composer에서 변경되었습니다.

Cloud Composer 2.0.26 이전 버전에서는 [scheduler]min_file_process_interval가 무시됩니다.

Cloud Composer 2.0.26 이후 버전:

Airflow 스케줄러는 모든 DAG가 특정 횟수만큼 예약되면 다시 시작되고 [scheduler]num_runs 매개변수는 스케줄러에서 이를 수행하는 횟수를 제어합니다. 스케줄러는 [scheduler]num_runs 예약 루프에 도달하면 다시 시작됩니다. 스케줄러는 스테이트리스(Stateless) 구성요소이며 이러한 재시작은 스케줄러에서 발생할 수 있는 모든 문제를 해결하는 자동 복구 메커니즘입니다. [scheduler]num_runs의 기본값은 5,000입니다.

[scheduler]min_file_process_interval은 DAG 파싱 발생 빈도를 구성하는 데 사용할 수 있지만, 이 매개변수는 스케줄러가 DAG를 예약할 때 [scheduler]num_runs 루프를 수행하는 데 필요한 시간보다 길 수 없습니다.

dagrun_timeout에 도달한 후 태스크를 실패로 표시

스케줄러는 dagrun_timeout(DAG 매개변수) 내에서 DAG 실행이 완료되지 않으면 완료되지 않은(실행 중, 예약됨, 큐에 추가됨) 태스크를 실패한 것으로 표시합니다.

해결책:

과부하 상태인 Airflow 데이터베이스의 증상

경우에 따라 Airflow 스케줄러 로그에 다음 경고 로그 항목이 표시될 수 있습니다.

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Airflow 작업자 로그에서도 유사한 증상이 나타날 수 있습니다.

MySQL의 경우:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

PostgreSQL의 경우:

psycopg2.OperationalError: connection to server at ... failed

이러한 오류 또는 경고는 스케줄러나 작업자, 트리거, 웹 서버 등의 다른 Airflow 구성요소로 인해 Airflow 데이터베이스에서 열린 연결 개수 또는 동시 실행 쿼리 수가 과다하여 발생하는 증상일 수 있습니다.

제공 가능한 솔루션:

웹 서버에 '실행될 스케줄러가 표시되지 않음' 경고가 표시됨

스케줄러는 정기적으로 하트비트를 Airflow 데이터베이스에 보고합니다. 이 정보를 기반으로 Airflow 웹 서버는 스케줄러가 활성 상태인지를 확인합니다.

스케줄러가 과부하 상태인 경우 [scheduler]scheduler_heartbeat_sec마다 하트비트를 보고하지 못할 수 있습니다.

이러한 경우 Airflow 웹 서버에 다음 경고가 표시될 수 있습니다.

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

제공 가능한 솔루션:

  • 스케줄러의 리소스를 CPU 및 메모리에 늘립니다.

  • 더 빠른 경우 파싱 및 예약에서 스케줄러 리소스를 너무 많이 소비하지 않도록 DAG를 최적화합니다.

  • Airflow DAG에서 전역 변수를 사용하지 마세요. 대신 환경 변수Airflow 변수를 사용하세요.

  • 스케줄러를 사용할 수 없다고 보고하기 전에 웹 서버에서 더 오래 기다리도록 [scheduler]scheduler_health_check_threshold Airflow 구성 옵션의 값을 늘립니다.

DAG 백필 시 발생한 문제의 해결 방법

이미 실행된 DAG를 다시 실행해야 하는 경우가 있습니다. Airflow CLI 명령어를 사용하여 수행할 수 있는 방법은 다음과 같습니다.

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

특정 DAG에서 실패한 태스크만 다시 실행하려면 --rerun-failed-tasks 인수도 사용합니다.

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME: 환경 이름
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.
  • START_DATEYYYY-MM-DD 형식의 start_date DAG 매개변수 값으로 바꿉니다.
  • END_DATEYYYY-MM-DD 형식의 end_date DAG 매개변수 값으로 바꿉니다.
  • DAG_NAME를 DAG의 이름으로 바꿉니다.

백필 작업을 수행하면 교착 상태가 발생하는 경우가 있습니다. 이 경우 태스크가 잠금 상태이므로 백필이 불가능합니다. 예를 들면 다음과 같습니다.

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

경우에 따라 다음 해결 방법을 사용하여 교착 상태를 해결할 수 있습니다.

  • [core]schedule_after_task_executionFalse재정의하여 mini-scheduler를 사용 중지합니다.

  • 더 좁은 기간에 대해 백필을 실행합니다. 예를 들어 START_DATEEND_DATE를 설정하여 기간을 1일만 지정합니다.

다음 단계