Airflow 스케줄러 문제 해결

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 Airflow 스케줄러의 일반적인 문제에 대한 문제 해결 단계 및 정보를 제공합니다.

문제의 출처 식별

문제 해결을 시작하기 위해 DAG 파싱 시간에 또는 실행 시 작업을 처리하는 동안 문제가 발생하는지 확인합니다. 파싱 시간 및 실행 시간에 대한 자세한 내용은 DAG 파싱 시간과 DAG 실행 시간 사이의 차이점을 참조하세요.

DAG 프로세서 로그 검사

DAG가 복잡한 경우 스케줄러에서 실행하는 DAG 프로세서가 모든 DAG를 파싱하지 않을 수 있습니다. 이로 인해 다음과 같은 증상이 나타나는 많은 문제가 발생할 수 있습니다.

증상

  • DAG를 파싱할 때 DAG 프로세서에 문제가 발생하면 아래에 나열된 문제가 복합적으로 발생할 수 있습니다. DAG가 동적으로 생성되는 경우 이러한 문제는 정적 DAG에 비해 더 큰 영향을 미치기도 합니다.

  • DAG가 Airflow UI 및 DAG UI에 표시되지 않습니다.

  • DAG의 실행이 예약되지 않습니다.

  • DAG 프로세서 로그에 다음과 같은 오류가 있습니다.

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    또는

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Airflow 스케줄러에 스케줄러가 다시 시작되는 문제가 발생합니다.

  • 실행이 예약된 Airflow 태스크는 취소되고 파싱에 실패한 DAG의 DAG 실행은 failed로 표시될 수 있습니다. 예를 들면 다음과 같습니다.

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

해결책:

  • DAG 파싱과 관련된 매개변수를 늘립니다.

  • DAG 프로세서에 문제를 일으키는 DAG를 수정하거나 삭제합니다.

DAG 파싱 시간 검사

DAG 파싱 시간에 문제가 발생하는지 확인하려면 다음 단계를 따르세요.

콘솔

Google Cloud 콘솔에서 Monitoring 페이지 및 로그 탭을 사용하여 DAG 파싱 시간을 검사할 수 있습니다.

Cloud Composer Monitoring 페이지에서 DAG 파싱 시간을 검사합니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. Monitoring 페이지가 열립니다.

  3. Monitoring 탭에서 DAG 실행 섹션에 있는 모든 DAG 파일의 총 파싱 시간 차트를 검토하고 발생 가능한 문제가 있는지 식별합니다.

    Composer Monitoring 탭의 DAG 실행 섹션에 해당 환경의 DAG에 대한 상태 측정항목이 표시됩니다.

Cloud Composer 로그 탭에서 DAG 파싱 시간을 검사합니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. Monitoring 페이지가 열립니다.

  3. 로그 탭으로 이동하고 모든 로그 탐색 트리에서 DAG 프로세서 관리자 섹션을 선택합니다.

  4. dag-processor-manager 로그를 검토하고 발생 가능한 문제를 식별합니다.

    DAG 파싱 시간이 표시된 DAG 프로세서 로그

gcloud

dags report 명령어를 사용하여 모든 DAG의 파싱 시간을 확인합니다.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.

명령어 출력은 다음과 비슷하게 표시됩니다.

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

표에 나열된 각 DAG의 duration 값을 찾습니다. 큰 값은 DAG 중 하나가 최적의 방식으로 구현되지 않았음을 나타낼 수 있습니다. 출력 테이블에서 파싱 시간이 긴 DAG를 식별할 수 있습니다.

Monitoring 실행 및 큐에 추가된 태스크

큐에서 막혀 있는 태스크가 있는지 확인하려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. Monitoring 탭으로 이동합니다.

  4. Monitoring 탭에서 DAG 실행 섹션의 Airflow 태스크 차트를 검토하고 발생 가능한 문제가 있는지 식별합니다. Airflow 태스크는 Airflow에서 큐에 추가된 상태인 태스크이며 Celery 또는 Kubernetes Executor 브로커 큐로 이동할 수 있습니다. Celery 큐에 추가된 태스크는 Celery 브로커 큐에 배치되는 태스크 인스턴스입니다.

DAG 파싱 시간에 문제 해결

다음 섹션에서는 DAG 파싱 시간에 발생하는 몇 가지 일반적인 문제들에 대한 증상과 가능한 해결 방법을 설명합니다.

스레드 수 제한

DAG 파일을 처리하는 스케줄러의 일부인 DAG 프로세서 관리자가 제한된 스레드 수만 사용하도록 허용하면 DAG 파싱 시간에 영향을 줄 수 있습니다.

이 문제를 해결하려면 다음 Airflow 구성 옵션을 재정의하세요.

  • Airflow 1.10.12 및 이전 버전의 경우 max_threads 매개변수를 재정의합니다.

    섹션 참고
    scheduler max_threads NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE을 워커 노드 머신에 있는
    코어 수로 바꿉니다.
  • Airflow 1.10.14 및 이후 버전의 경우 parsing_processes 매개변수를 재정의합니다.

    섹션 참고
    scheduler parsing_processes NUMBER_OF_CORES_IN_MACHINE - 1 NUMBER_OF_CORES_IN_MACHINE을 워커 노드 머신에 있는
    코어 수로 바꿉니다.

태스크 수 및 시간 분포

Airflow는 많은 수의 작은 태스크를 예약하는 데 문제가 있는 것으로 알려져 있습니다. 이러한 경우에는 더 적은 수로 보다 통합된 태스크를 사용해야 합니다.

동시에 많은 수의 DAG 또는 태스크를 예약하는 것도 문제의 원인이 될 수 있습니다. 이러한 문제를 방지하기 위해서는 시간 경과에 따라 태스크를 더 고르게 분포해야 합니다.

실행 및 큐에 추가된 태스크 문제 해결

다음 섹션에서는 실행 및 큐에 추가된 태스크의 몇 가지 일반적인 문제들에 대한 증상과 가능한 해결 방법을 설명합니다.

태스크 큐가 너무 긺

일부 경우에 태스크 큐가 스케줄러에 대해 너무 길 수 있습니다. 작업자 및 celery 매개변수를 최적화하는 방법은 비즈니스와 함께 Cloud Composer 환경 확장을 참조하세요.

Airflow 스케줄러의 TimeTable 기능 사용

Airflow 2.2부터 TimeTable이라는 새로운 기능을 사용하여 DAG의 시간 테이블을 정의할 수 있습니다.

다음 방법 중 하나를 사용하여 시간 테이블을 정의할 수 있습니다.

제한된 클러스터 리소스

이 섹션은 Cloud Composer 1에만 적용됩니다.

해당 환경의 GKE 클러스터가 너무 작아서 DAG 및 태스크를 모두 처리하지 못하는 경우 성능 문제가 발생할 수 있습니다. 이 경우 다음 솔루션 중 하나를 시도해보세요.

  • 더 많은 성능을 제공하는 머신 유형으로 새 환경을 만들고 DAG를 여기로 마이그레이션합니다.
  • Cloud Composer 환경을 더 만들고 이들 간에 DAG을 분할합니다.
  • GKE 노드의 머신 유형 업그레이드에 설명된 대로 GKE 노드의 머신 유형을 변경합니다. 이 절차는 오류가 발생하기 쉬우므로 가장 마지막에 사용하는 것이 좋습니다.
  • gcloud composer environments update 명령어를 사용하는 것과 같이 해당 환경에서 Airflow 데이터베이스를 실행하는 Cloud SQL 인스턴스의 머신 유형을 업그레이드합니다. Airflow 데이터베이스의 낮은 성능이 속도가 느린 스케줄러의 원인일 수 있습니다.

유지보수 기간 중 태스크 예약 방지

환경의 특정 유지보수 기간을 정의할 수 있습니다. 이 기간 동안 Cloud SQL 및 GKE의 유지보수 이벤트가 수행됩니다.

Airflow 스케줄러가 불필요한 파일을 무시하도록 설정

DAG 폴더에서 불필요한 파일을 건너뛰어 Airflow 스케줄러 성능을 향상시킬 수 있습니다. Airflow 스케줄러는 .airflowignore 파일에 지정된 파일 및 폴더를 무시합니다.

Airflow 스케줄러가 불필요한 파일을 무시하도록 설정하려면 다음 안내를 따르세요.

  1. .airflowignore 파일을 만듭니다.
  2. 이 파일에서 무시할 파일 및 폴더를 나열합니다.
  3. 환경 버킷의 /dags 폴더에 이 파일을 업로드합니다.

.airflowignore 파일 형식에 대한 자세한 내용은 Airflow 문서를 참조하세요.

Airflow 스케줄러가 일시중지된 DAG 처리

Airflow 사용자는 실행 방지를 위해 DAG를 일시중지합니다. 이렇게 하면 Airflow 작업자의 처리 주기가 줄어듭니다.

Airflow 스케줄러에서 일시중지된 DAG를 계속 파싱합니다. Airflow 스케줄러 성능을 실질적으로 개선하려면 .airflowignore를 사용하거나 DAG 폴더에서 일시중지된 DAG를 삭제합니다.

DAG에서 'wait_for_downstream' 사용

DAG에서 wait_for_downstream 매개변수를 True로 설정한 경우 태스크가 성공하려면 이 태스크의 즉시 다운스트림에 있는 모든 태스크도 성공해야 합니다. 즉, 이전 DAG 실행으로부터 태스크를 수행하면 특정 DAG 실행에 속하는 태스크 수행이 느려질 수 있습니다. 자세한 내용은 Airflow 문서를 참조하세요.

너무 오랫동안 큐에 추가된 작업은 취소되고 다시 예약됨

Airflow 태스크가 큐에 너무 오래 보관되면 스케줄러가 실행 시간을 다시 예약합니다. 또한 Airflow 버전 2.3.1 이전에서는 태스크가 실패로 표시되고 가능한 경우 재시도됩니다.

이러한 상황의 증상을 관측하는 한 가지 방법은 큐에 추가된 태스크 수가 표시된 차트(Cloud Composer UI의 "모니터링" 탭)를 살펴보는 것입니다. 이 차트에서 급증이 2시간 내로 하락하지 않으면 태스크가 다시 예약되고(로그 없음) 스케줄러 로그에 "채택된 태스크가 여전히 대기 중..." 로그 항목이 표시될 가능성이 높습니다. 이러한 경우 태스크가 실행되지 않기 때문에 Airflow 태스크 로그에 '로그 파일을 찾을 수 없습니다...' 메시지가 표시될 수 있습니다.

일반적으로 이러한 동작은 예상된 동작이며, 예약된 태스크의 다음 인스턴스가 일정에 따라 실행됩니다. Cloud Composer 환경에서 이러한 사례가 많이 관측된다면 예약된 모든 태스크를 처리하기에는 환경의 Airflow 작업자가 부족하다는 의미일 수 있습니다.

해결 방법: 이 문제를 해결하려면 큐에 추가된 태스크를 실행할 수 있는 Airflow 작업자 용량이 항상 있는지 확인해야 합니다. 예를 들어 workers 또는 worker_concurrency의 수를 늘릴 수 있습니다. 또한 용량보다 많은 태스크가 큐에 추가되지 않도록 동시 로드 또는 풀을 조정할 수도 있습니다.

비활성 태스크로 인한 특정 DAG 실행의 우발적인 차단

일반적인 경우 Airflow 스케줄러는 큐에 비활성 태스크가 있고 어떤 이유로 해당 태스크를 제대로 실행할 수 없는 상황(예: 비활성 태스크가 속한 DAG가 삭제되었음)을 처리할 수 있어야 합니다.

이러한 비활성 태스크가 스케줄러에 의해 삭제되지 않는 경우 수동으로 삭제해야 할 수 있습니다. 예를 들면 Airflow UI에서 삭제할 수 있습니다. 메뉴 > 브라우저 > 태스크 인스턴스로 이동하여 비활성 DAG에 속한 큐에 추가된 태스크를 찾아서 삭제하면 됩니다.

이 문제를 해결하려면 환경을 Cloud Composer 버전 2.1.12 이상으로 업그레이드합니다.

[scheduler]min_file_process_interval 매개변수에 대한 Cloud Composer 접근 방식

Airflow 스케줄러가 [scheduler]min_file_process_interval을 사용하는 방식이 Cloud Composer에서 변경되었습니다.

Airflow 1

Airflow 1을 사용하는 Cloud Composer의 경우 사용자가 [scheduler]min_file_process_interval 값을 0~600초로 설정할 수 있습니다. 600초보다 큰 값은 [scheduler]min_file_process_interval이 600초로 설정된 경우와 동일한 결과를 가져옵니다.

Airflow 2

Airflow 2에서 [scheduler]min_file_process_interval은 최신 버전 1.19.9 및 2.0.26 이상에서만 사용할 수 있습니다.

  • Cloud Composer 1.19.9 및 2.0.26 이전 버전

    이러한 버전에서 [scheduler]min_file_process_interval은 무시됩니다.

  • Cloud Composer 1.19.9 또는 2.0.26 이상 버전

    Airflow 스케줄러는 모든 DAG가 특정 횟수만큼 예약되면 다시 시작되고 [scheduler]num_runs 매개변수는 스케줄러에서 이를 수행하는 횟수를 제어합니다. 스케줄러는 [scheduler]num_runs 예약 루프에 도달하면 다시 시작됩니다. 스케줄러는 스테이트리스(Stateless) 구성요소이며 이러한 재시작은 스케줄러에서 발생할 수 있는 모든 문제를 해결하는 자동 복구 메커니즘입니다. 지정하지 않으면 [scheduler]num_runs의 기본값 5,000이 적용됩니다.

    [scheduler]min_file_process_interval은 DAG 파싱 발생 빈도를 구성하는 데 사용할 수 있지만, 이 매개변수는 스케줄러가 DAG를 예약할 때 [scheduler]num_runs 루프를 수행하는 데 필요한 시간보다 길 수 없습니다.

Airflow 구성 확장

Airflow는 Airflow에서 동시에 실행할 수 있는 태스크 및 DAG 수를 제어하는 Airflow 구성 옵션을 제공합니다. 이 구성 옵션을 설정하려면 사용자 환경의 값을 재정의합니다.

  • 작업자 동시 실행

    [celery]worker_concurrency 매개변수는 Airflow 작업자가 동시에 실행할 수 있는 태스크의 최대 개수를 제어합니다. 이 매개변수 값에 Cloud Composer 환경에 있는 Airflow 작업자 수를 곱하면 해당 환경의 특정 순간에 실행될 수 있는 최대 태스크 수를 얻을 수 있습니다. 이 수는 추가로 설명되는 [core]parallelism Airflow 구성 옵션에 따라 제한됩니다.

    Cloud Composer 2 환경에서 기본값 [celery]worker_concurrency는 자동으로 계산됩니다.

    • Airflow 버전 2.3.3 이상의 경우 [celery]worker_concurrency는 32, 12 * worker_CPU, 8 * worker_memory 중에서 가장 작은 값으로 설정됩니다.

    • Airflow 버전 2.2.5 이하의 경우 [celery]worker_concurrency는 12 * 작업자의 CPU 수로 설정됩니다.

  • 최대 활성 DAG 실행 수

    [core]max_active_runs_per_dag Airflow 구성 옵션은 DAG당 최대 활성 DAG 실행 수를 제어합니다. 이 한도에 도달해도 스케줄러에서 DAG 실행을 추가로 만들지 않습니다.

    이 매개변수를 잘못 설정하면 지정된 시점에 DAG 실행 인스턴스를 더 만들 수 없으므로 스케줄러가 DAG 실행을 제한하는 문제가 발생할 수 있습니다.

  • DAG당 최대 활성 태스크 수

    [core]max_active_tasks_per_dag Airflow 구성 옵션은 각 DAG에서 동시에 실행될 수 있는 최대 태스크 인스턴스 수를 제어합니다. DAG 수준 매개변수입니다.

    이 매개변수를 잘못 설정하면 지정된 시점에 실행할 수 있는 DAG 태스크 수가 제한적이므로 단일 DAG 인스턴스 실행에 문제가 발생할 수 있습니다.

    해결 방법: [core]max_active_tasks_per_dag를 늘립니다.

  • 동시 로드 및 풀 크기

    [core]parallelism Airflow 구성 옵션은 이러한 태스크의 모든 종속 항목이 충족된 후 Airflow 스케줄러가 Executor 큐에서 큐에 추가할 수 있는 태스크 수를 제어합니다.

    전체 Airflow 설정에 대한 전역 매개변수입니다.

    태스크가 한 풀에서 큐에 추가되고 실행됩니다. Cloud Composer 환경에서는 하나의 풀만 사용합니다. 이 풀의 크기로 스케줄러가 특정 시점에 실행을 위해 큐에 추가할 수 있는 태스크 수가 제어됩니다. 풀 크기가 너무 작으면 [core]parallelism 구성 옵션 및 [celery]worker_concurrency 구성 옵션과 Airflow 작업자 수를 곱한 값으로 정의되는 기준에 아직 도달하지 않았더라도 스케줄러에서 실행을 위한 태스크를 큐에 추가할 수 없습니다.

    Airflow UI에서 풀 크기를 구성할 수 있습니다(메뉴 > 관리자 > ). 해당 환경에서 예상되는 동시 로드 수준에 맞게 풀 크기를 조정하세요.

    일반적으로 [core]parallelism은 최대 작업자 수와 [celery]worker_concurrency의 곱으로 설정됩니다.

DAG 프로세서 시간 초과로 인해 DAG가 스케줄러를 통해 예약되지 않음

이 문제에 대한 자세한 내용은 DAG 문제 해결을 참조하세요.

dagrun_timeout에 도달한 후 태스크를 실패로 표시

스케줄러는 dagrun_timeout(DAG 매개변수) 내에서 DAG 실행이 완료되지 않으면 완료되지 않은(실행 중, 예약됨, 큐에 추가됨) 태스크를 실패한 것으로 표시합니다.

솔루션:

과부하 상태인 Airflow 데이터베이스의 증상

경우에 따라 Airflow 스케줄러 로그에 다음 경고 로그 항목이 표시될 수 있습니다.

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Airflow 작업자 로그에서도 유사한 증상이 나타날 수 있습니다.

MySQL의 경우:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

PostgreSQL의 경우:

psycopg2.OperationalError: connection to server at ... failed

이러한 오류 또는 경고는 스케줄러나 작업자, 트리거, 웹 서버 등의 다른 Airflow 구성요소로 인해 Airflow 데이터베이스에서 열린 연결 개수 또는 동시 실행 쿼리 수가 과다하여 발생하는 증상일 수 있습니다.

제공 가능한 솔루션:

웹 서버에 '실행될 스케줄러가 표시되지 않음' 경고가 표시됨

스케줄러는 정기적으로 하트비트를 Airflow 데이터베이스에 보고합니다. 이 정보를 기반으로 Airflow 웹 서버는 스케줄러가 활성 상태인지를 확인합니다.

스케줄러가 과부하 상태인 경우 [scheduler]scheduler-heartbeat-sec마다 하트비트를 보고하지 못할 수 있습니다.

이러한 경우 Airflow 웹 서버에 다음 경고가 표시될 수 있습니다.

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

제공 가능한 솔루션:

  • 스케줄러의 리소스를 CPU 및 메모리에 늘립니다.

  • 더 빠른 경우 파싱 및 예약에서 스케줄러 리소스를 너무 많이 소비하지 않도록 DAG를 최적화합니다.

  • Airflow DAG에서 전역 변수(Cloud Composer 환경 변수Airflow 변수)를 사용하지 마세요.

  • 스케줄러를 사용할 수 없다고 보고하기 전에 웹 서버에서 더 오래 기다리도록 [scheduler]scheduler-health-check-threshold 값을 늘립니다.

DAG 백필 시 발생한 문제의 해결 방법

이미 실행된 DAG를 다시 실행해야 하는 경우가 있습니다. Airflow 명령줄 도구를 사용하여 수행할 수 있는 방법은 다음과 같습니다.

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

특정 DAG에서 실패한 태스크만 다시 실행하려면 --rerun_failed_tasks 인수도 사용합니다.

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

특정 DAG에서 실패한 태스크만 다시 실행하려면 --rerun-failed-tasks 인수도 사용합니다.

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.
  • START_DATEYYYY-MM-DD 형식의 start_date DAG 매개변수 값으로 바꿉니다.
  • END_DATEYYYY-MM-DD 형식의 end_date DAG 매개변수 값으로 바꿉니다.
  • DAG_NAME를 DAG의 이름으로 바꿉니다.

백필 작업을 수행하면 교착 상태가 발생하는 경우가 있습니다. 이 경우 태스크가 잠금 상태이므로 백필이 불가능합니다. 예를 들면 다음과 같습니다.

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

경우에 따라 다음 해결 방법을 사용하여 교착 상태를 해결할 수 있습니다.

  • [core]schedule-after-task-executionFalse재정의하여 mini-Scheduler를 사용 중지합니다.

  • 더 좁은 기간에 대해 백필을 실행합니다. 예를 들어 START_DATEEND_DATE를 설정하여 기간을 1일만 지정합니다.

다음 단계