Airflow 스케줄러 문제 해결

Cloud Composer 1 | Cloud Composer 2

이 페이지에서는 Airflow 스케줄러와 관련된 일반적인 문제의 문제 해결 단계와 정보를 제공합니다.

문제의 출처 식별

문제 해결을 시작하기 위해 DAG 파싱 시간에 또는 실행 시 작업을 처리하는 동안 문제가 발생하는지 확인합니다. 파싱 시간 및 실행 시간에 대한 자세한 내용은 DAG 파싱 시간과 DAG 실행 시간 사이의 차이점을 참조하세요.

DAG 파싱 시간 검사

DAG 파싱 시간에 문제가 발생하는지 확인하려면 다음 단계를 따르세요.

Console

  1. Google Cloud Console에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. Monitoring 탭으로 이동합니다.

  4. Monitoring 탭에서 DAG 실행 섹션에 있는 모든 DAG 파일의 총 파싱 시간 차트를 검토하고 발생 가능한 문제가 있는지 식별합니다.

    Composer Monitoring 탭의 DAG 실행 섹션에 해당 환경의 DAG에 대한 상태 측정항목이 표시됩니다.

gcloud

-r 플래그와 함께 list_dags 명령어를 사용하여 모든 DAG의 파싱 시간을 확인합니다.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    list_dags -- -r

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.

결과는 다음과 비슷하게 표시됩니다.

-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file       | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py  | 0.6477   |       1 |        2 | ['dag_1']
/dag_2.py  | 0.018652 |       1 |        2 | ['dag_2']
/dag_3.py  | 0.004024 |       1 |        6 | ['dag_3']
/dag_4.py  | 0.003476 |       1 |        2 | ['dag_4']
/dag_5.py  | 0.002666 |       1 |        1 | ['dag_5']
-----------+----------+---------+----------+-----------------------

DagBag 파싱 시간 값을 찾습니다. 큰 값은 DAG 중 하나가 최적의 방식으로 구현되지 않았음을 나타낼 수 있습니다. 출력 테이블에서 파싱 시간이 긴 DAG를 식별할 수 있습니다.

Monitoring 실행 및 큐에 추가된 태스크

큐에서 막혀 있는 태스크가 있는지 확인하려면 다음 단계를 따르세요.

  1. Google Cloud Console에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. Monitoring 탭으로 이동합니다.

  4. Monitoring 탭에서 DAG 실행 섹션의 실행 및 큐에 추가된 태스크 차트를 검토하고 발생 가능한 문제가 있는지 식별합니다.

DAG 파싱 시간에 문제 해결

다음 섹션에서는 DAG 파싱 시간에 발생하는 몇 가지 일반적인 문제들에 대한 증상과 가능한 해결 방법을 설명합니다.

스레드 수 제한

DAG 파일을 처리하는 스케줄러의 일부인 DAG 프로세서 관리자가 제한된 스레드 수만 사용하도록 허용하면 DAG 파싱 시간에 영향을 줄 수 있습니다.

이 문제를 해결하려면 airflow.cfg 구성 파일에 다음 변경사항을 적용합니다.

  • Airflow 1.10.12 및 이전 버전의 경우 max_threads 매개변수를 사용합니다.

    [scheduler]
    max_threads = <NUMBER_OF_CORES_IN_MACHINE - 1>
    
  • Airflow 1.10.14 및 이후 버전의 경우 parsing_processes 매개변수를 사용합니다.

    [scheduler]
    parsing_processes = <NUMBER_OF_CORES_IN_MACHINE - 1>
    

NUMBER_OF_CORES_IN_MACHINE을 워커 노드 머신에 있는 코어 수로 바꿉니다.

태스크 수 및 시간 분포

Airflow는 많은 수의 작은 태스크를 예약하는 데 문제가 있는 것으로 알려져 있습니다. 이러한 경우에는 더 적은 수로 보다 통합된 태스크를 사용해야 합니다.

동시에 많은 수의 DAG 또는 태스크를 예약하는 것도 문제의 원인이 될 수 있습니다. 이러한 문제를 방지하기 위해서는 시간 경과에 따라 태스크를 더 고르게 분포해야 합니다.

실행 및 큐에 추가된 태스크 문제 해결

다음 섹션에서는 실행 및 큐에 추가된 태스크의 몇 가지 일반적인 문제들에 대한 증상과 가능한 해결 방법을 설명합니다.

태스크 큐가 너무 긺

일부 경우에 태스크 큐가 스케줄러에 대해 너무 길 수 있습니다. 작업자 및 celery 매개변수를 최적화하는 방법은 비즈니스와 함께 Cloud Composer 환경 확장을 참조하세요.

제한된 클러스터 리소스

이 섹션은 Cloud Composer 1에만 적용됩니다.

해당 환경의 GKE 클러스터가 너무 작아서 DAG 및 태스크를 모두 처리하지 못하는 경우 성능 문제가 발생할 수 있습니다. 이 경우 다음 솔루션 중 하나를 시도해보세요.

  • 더 많은 성능을 제공하는 머신 유형으로 새 환경을 만들고 DAG를 여기로 마이그레이션합니다.
  • Cloud Composer 환경을 더 만들고 이들 간에 DAG을 분할합니다.
  • GKE 노드의 머신 유형 업그레이드에 설명된 대로 GKE 노드의 머신 유형을 변경합니다. 이 절차는 오류가 발생하기 쉬우므로 가장 마지막에 사용하는 것이 좋습니다.
  • gcloud composer environments update 명령어를 사용하는 것과 같이 해당 환경에서 Airflow 데이터베이스를 실행하는 Cloud SQL 인스턴스의 머신 유형을 업그레이드합니다. Airflow 데이터베이스의 낮은 성능이 속도가 느린 스케줄러의 원인일 수 있습니다.

유지보수 기간 중 태스크 예약 방지

환경의 특정 유지보수 기간을 정의할 수 있습니다. 이 기간 동안 Cloud SQL 및 GKE의 유지보수 이벤트가 수행됩니다.

Airflow 스케줄러가 불필요한 파일을 무시하도록 설정

DAG 폴더에서 불필요한 파일을 건너뛰어 Airflow 스케줄러 성능을 향상시킬 수 있습니다. Airflow 스케줄러는 .airflowignore 파일에 지정된 파일 및 폴더를 무시합니다.

Airflow 스케줄러가 불필요한 파일을 무시하도록 설정하려면 다음 안내를 따르세요.

  1. .airflowignore 파일을 만듭니다.
  2. 이 파일에서 무시할 파일 및 폴더를 나열합니다.
  3. 환경 버킷의 /dags 폴더에 이 파일을 업로드합니다.

.airflowignore 파일 형식에 대한 자세한 내용은 Airflow 문서를 참조하세요.

Airflow 스케줄러가 일시중지된 DAG 처리

Airflow 사용자는 실행 방지를 위해 DAG를 일시중지합니다. 이렇게 하면 Airflow 작업자의 처리 주기가 줄어듭니다.

Airflow 스케줄러에서 일시중지된 DAG를 계속 파싱합니다. Airflow 스케줄러 성능을 실질적으로 개선하려면 .airflowignore를 사용하거나 DAG 폴더에서 일시중지된 DAG를 삭제합니다.

DAG에서 'wait_for_downstream' 사용

DAG에서 wait_for_downstream 매개변수를 True로 설정한 경우 태스크가 성공하려면 이 태스크의 즉시 다운스트림에 있는 모든 태스크도 성공해야 합니다. 즉, 이전 DAG 실행으로부터 태스크를 수행하면 특정 DAG 실행에 속하는 태스크 수행이 느려질 수 있습니다. 자세한 내용은 Airflow 문서를 참조하세요.

너무 오랫동안 큐에 추가된 작업은 취소되고 다시 예약됨

Airflow 태스크가 큐에 너무 오래 보관되면 스케줄러에서 이 태스크를 failed/up_for_retry로 표시하고 실행을 위해 다시 한 번 예약합니다. 이러한 상황의 증상을 관측하는 한 가지 방법은 큐에 추가된 태스크 수가 표시된 차트(Cloud Composer UI의 '모니터링' 탭)를 살펴보는 것입니다. 이 차트에서 급증이 최대 10분 내로 하락하지 않으면 태스크가 실패하고(로그 없음) 스케줄러 로그에 '채택된 태스크가 여전히 대기 중...' 로그 항목이 표시될 가능성이 높습니다. 이러한 경우 태스크가 실행되지 않기 때문에 Airflow 태스크 로그에 '로그 파일을 찾을 수 없습니다...' 메시지가 표시될 수 있습니다.

일반적으로 이러한 태스크 실패는 예상되며, 예약된 태스크의 다음 인스턴스가 일정에 따라 실행됩니다. Cloud Composer 환경에서 이러한 사례가 많이 관측된다면 예약된 모든 태스크를 처리하기에는 환경의 Airflow 작업자가 부족하다는 의미일 수 있습니다.

해결 방법: 이 문제를 해결하려면 큐에 추가된 태스크를 실행할 수 있는 Airflow 작업자 용량이 항상 있는지 확인해야 합니다. 예를 들어 workers 또는 worker_concurrency의 수를 늘릴 수 있습니다. 또한 용량보다 많은 태스크가 큐에 추가되지 않도록 동시 로드 또는 풀을 조정할 수도 있습니다.

비활성 태스크로 인한 특정 DAG 실행의 우발적인 차단

일반적인 경우 Airflow 스케줄러는 큐에 비활성 태스크가 있고 어떤 이유로 해당 태스크를 제대로 실행할 수 없는 상황(예: 비활성 태스크가 속한 DAG가 삭제되었음)을 처리할 수 있어야 합니다.

이러한 비활성 태스크가 스케줄러에 의해 삭제되지 않는 경우 수동으로 삭제해야 할 수 있습니다. 예를 들면 Airflow UI에서 삭제할 수 있습니다. 메뉴 > 브라우저 > 태스크 인스턴스로 이동하여 비활성 DAG에 속한 큐에 추가된 태스크를 찾아서 삭제하면 됩니다.

Cloud Composer의 min_file_process_interval 매개변수 접근 방식

Airflow 스케줄러가 min_file_process_interval을 사용하는 방식이 Cloud Composer에서 변경되었습니다.

Airflow 1을 사용하는 Cloud Composer의 경우 사용자가 min_file_process_interval 값을 0~600초로 설정할 수 있습니다. 값이 600초 이상이면 min_file_process_interval이 600초로 설정되었을 때와 동일한 결과를 가져옵니다.

Airflow 2를 사용하는 Cloud Composer의 경우 사용자가 min_file_process_interval 값을 0~1200초 사이의 값으로 설정할 수 있습니다. 값이 1,200초 이상이면 min_file_process_interval이 1,200초로 설정되었을 때와 동일한 결과를 가져옵니다.

Airflow 구성 확장

Airflow는 Airflow에서 동시에 실행할 수 있는 태스크 및 DAG 수를 제어하는 Airflow 구성 옵션을 제공합니다. 이 구성 옵션을 설정하려면 사용자 환경의 값을 재정의합니다.

  • 작업자 동시 실행

    [celery]worker_concurrency 매개변수는 Airflow 작업자가 동시에 실행할 수 있는 태스크의 최대 개수를 제어합니다. 이 매개변수 값에 Cloud Composer 환경에 있는 Airflow 작업자 수를 곱하면 해당 환경의 특정 순간에 실행될 수 있는 최대 태스크 수를 얻을 수 있습니다. 이 수는 추가로 설명되는 [core]parallelism Airflow 구성 옵션에 따라 제한됩니다.

  • 최대 활성 DAG 실행 수

    [core]max_active_runs_per_dag Airflow 구성 옵션은 DAG당 최대 활성 DAG 실행 수를 제어합니다. 이 한도에 도달해도 스케줄러에서 DAG 실행을 추가로 만들지 않습니다.

    이 매개변수를 잘못 설정하면 지정된 시점에 DAG 실행 인스턴스를 더 만들 수 없으므로 스케줄러가 DAG 실행을 제한하는 문제가 발생할 수 있습니다.

  • DAG 동시 실행

    [core]dag_concurrency Airflow 구성 옵션은 각 DAG에서 동시에 실행될 수 있는 최대 태스크 인스턴스 수를 제어합니다. DAG 수준 매개변수입니다.

    이 매개변수를 잘못 설정하면 지정된 시점에 실행할 수 있는 DAG 태스크 수가 제한적이므로 단일 DAG 인스턴스 실행에 문제가 발생할 수 있습니다.

  • 동시 로드 및 풀 크기

    [core]parallelism Airflow 구성 옵션은 이러한 태스크의 모든 종속 항목이 충족된 후 Airflow 스케줄러가 Executor 큐에서 큐에 추가할 수 있는 태스크 수를 제어합니다.

    전체 Airflow 설정에 대한 전역 매개변수입니다.

    태스크가 한 풀에서 큐에 추가되고 실행됩니다. Cloud Composer 환경에서는 하나의 풀만 사용합니다. 이 풀의 크기로 스케줄러가 특정 시점에 실행을 위해 큐에 추가할 수 있는 태스크 수가 제어됩니다. 풀 크기가 너무 작으면 [core]parallelism 구성 옵션 및 [celery]worker_concurrency 구성 옵션과 Airflow 작업자 수를 곱한 값으로 정의되는 기준에 아직 도달하지 않았더라도 스케줄러에서 실행을 위한 태스크를 큐에 추가할 수 없습니다.

    Airflow UI에서 풀 크기를 구성할 수 있습니다(메뉴 > 관리자 > ). 해당 환경에서 예상되는 동시 로드 수준에 맞게 풀 크기를 조정하세요.

부하 과부하가 걸리는 Airflow Database의 증상

Airflow 스케줄러 로그에 다음과 같은 경고 로그 항목이 표시될 수 있습니다.

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

이러한 오류 또는 경고는 작업에 과부하가 걸린 Airflow 메타데이터 데이터베이스의 증상일 수 있습니다.

제공 가능한 해결책:

다음 단계