DAG 문제 해결

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 일반적인 워크플로 문제의 문제해결 단계와 정보를 제공합니다.

많은 DAG 실행 문제가 최적이 아닌 환경 성능으로 인해 발생합니다. 환경 성능 및 비용 최적화 가이드에 따라 Cloud Composer 2 환경을 최적화할 수 있습니다.

일부 DAG 실행 문제는 올바르거나 최적이 아닌 방식으로 작동하는 Airflow 스케줄러로 인해 발생할 수 있습니다. 이러한 문제를 해결하려면 스케줄러 문제 해결 안내를 따르세요.

워크플로 문제해결

문제해결을 시작하려면 다음 안내를 따르세요.

  1. Airflow 로그를 확인합니다.

    다음 Airflow 구성 옵션을 재정의하여 Airflow의 로깅 수준을 높일 수 있습니다.

    Airflow 2

    섹션
    logging logging_level 기본값은 INFO입니다. 로그 메시지의 세부정보 수준을 더 높이려면 DEBUG로 설정합니다.

    Airflow 1

    섹션
    core logging_level 기본값은 INFO입니다. 로그 메시지의 세부정보 수준을 더 높이려면 DEBUG로 설정합니다.
  2. Monitoring 대시보드를 확인합니다.

  3. Cloud Monitoring을 검토합니다.

  4. Google Cloud 콘솔에서 환경 구성요소 페이지에 오류가 있는지 확인합니다.

  5. Airflow 웹 인터페이스의 DAG 그래프 뷰에서 실패한 태스크 인스턴스가 있는지 확인합니다.

    섹션
    webserver dag_orientation LR, TB, RL 또는 BT

연산자 오류 디버깅

연산자 오류를 디버깅하려면 다음 안내를 따르세요.

  1. 작업 관련 오류가 있는지 확인합니다.
  2. Airflow 로그를 확인합니다.
  3. Cloud Monitoring을 검토합니다.
  4. 연산자 관련 로그를 확인합니다.
  5. 오류를 수정합니다.
  6. dags/ 폴더에 DAG를 업로드합니다.
  7. Airflow 웹 인터페이스에서 DAG의 이전 상태를 지웁니다.
  8. DAG를 다시 시작하거나 실행합니다.

태스크 실행 문제 해결

Airflow는 태스크 큐와 Airflow 데이터베이스를 통해 서로 통신하고 신호(예: SIGTERM)를 보내는 스케줄러, 실행자, 작업자와 같은 여러 항목이 있는 분산 시스템입니다. 다음 다이어그램은 Airflow 구성요소 간의 상호 연결 개요를 보여줍니다.

Airflow 구성요소 간의 상호작용
그림 1. Airflow 구성요소 간의 상호작용(확대하려면 클릭)

Airflow와 같은 분산 시스템에서 일부 네트워크 연결 문제가 발생하거나 기본 인프라가 간헐적인 문제를 일으킬 수 있습니다. 이로 인해 태스크가 실패하고 실행을 위해 다시 예약되거나 태스크가 성공적으로 완료되지 않을 수 있는 상황(예: 좀비 태스크 또는 실행에 멈춘 태스크)이 발생할 수 있습니다. Airflow에는 이러한 상황을 처리하고 정상적인 기능을 자동으로 재개하는 메커니즘이 있습니다. 다음 섹션에서는 좀비 태스크, 독약 및 SIGTERM 신호 등 Airflow에서 태스크 실행 중에 발생하는 일반적인 문제를 설명합니다.

좀비 태스크 문제 해결

Airflow는 태스크와 태스크를 실행하는 프로세스 간에 두 가지 불일치를 감지합니다.

  • 좀비 태스크는 실행 중이어야 하지만, 실행되지 않는 태스크입니다. 태스크 프로세스가 종료되었거나 응답하지 않는 경우, Airflow 작업자가 과부하로 인해 태스크 상태를 제시간에 보고하지 않은 경우, 태스크가 실행된 VM이 종료된 경우 발생할 수 있습니다. Airflow는 이러한 태스크를 주기적으로 찾고 태스크 설정에 따라 작업에 실패하거나 작업을 다시 시도합니다.

    좀비 태스크 찾기

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • 언데드 태스크는 실행하지 않아야 하는 태스크입니다. Airflow는 이러한 태스크를 주기적으로 찾고 종료합니다.

좀비 태스크의 가장 일반적인 원인과 해결 방법은 다음과 같습니다.

Airflow 작업자의 메모리 부족

각 Airflow 작업자는 최대 [celery]worker_concurrency 태스크 인스턴스까지 동시에 실행할 수 있습니다. 이러한 태스크 인스턴스의 누적 메모리 소비량이 Airflow 작업자의 메모리 한도를 초과하면 리소스를 확보하기 위해 임의의 프로세스가 종료됩니다.

Airflow 작업자의 메모리 부족 이벤트 살펴보기

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

솔루션:

Airflow 작업자가 제거됨

포드 제거는 Kubernetes에서 워크로드 실행의 일반적인 부분입니다. GKE는 스토리지가 부족한 경우나 우선순위가 더 높은 워크로드의 리소스를 확보하기 위해 포드를 삭제합니다.

Airflow 작업자 제거 살펴보기

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

솔루션:

Airflow 작업자가 종료됨

Airflow 작업자가 외부에서 삭제될 수 있습니다. 현재 실행 중인 태스크가 단계적 종료 기간 중에 완료되지 않으면 태스크가 중단되고 좀비로 감지될 수 있습니다.

Airflow 작업자 포드 종료 살펴보기

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

가능한 시나리오 및 솔루션:

  • 업그레이드 또는 패키지 설치와 같은 환경 수정 중에 Airflow 작업자가 다시 시작됩니다.

    Composer 환경 수정사항 살펴보기

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    중요한 태스크가 실행 중이 아니거나 태스크 재시도를 사용 설정하면 이러한 작업을 수행할 수 있습니다.

  • 유지보수 작업 중에는 다음과 같은 다양한 구성요소를 일시적으로 사용하지 못할 수 있습니다.

    GKE 유지보수 작업 살펴보기

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    유지보수 기간을 지정하여 중요한 태스크 실행 중복을 최소화할 수 있습니다.

  • 2.4.5 이전의 Cloud Composer 2 버전에서는 종료 중인 Airflow 작업자가 SIGTERM 신호를 무시하고 태스크를 계속 실행할 수 있습니다.

    Composer 자동 확장으로 축소 살펴보기

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-worker-set")
    textPayload:"Workers deleted"

    이 문제가 수정된 Cloud Composer 이후 버전으로 업그레이드할 수 있습니다.

Airflow 작업자가 과부하 상태임

Airflow 작업자가 사용할 수 있는 CPU 및 메모리 리소스의 양은 환경 구성에 따라 제한됩니다. 사용률이 한도에 가까워지면 태스크 실행 중에 리소스 경합과 불필요한 지연이 발생합니다. 극단적인 상황에서는 장시간 리소스가 부족하면 좀비 태스크가 발생할 수 있습니다.

솔루션:

Airflow 데이터베이스가 과부하 상태임

다양한 Airflow 구성요소에서 데이터베이스를 사용하여 서로 통신하며 특히 태스크 인스턴스의 하트비트를 저장합니다. 데이터베이스에서 리소스가 부족하면 쿼리 시간이 길어져 태스크 실행이 영향을 받을 수 있습니다.

솔루션:

Airflow 데이터베이스를 일시적으로 사용할 수 없음

Airflow 작업자가 일시적인 연결 문제와 같은 간헐적인 오류를 감지하고 적절하게 처리하는 데 시간이 걸릴 수 있습니다. 기본 좀비 감지 기준점을 초과할 수 있습니다.

Airflow 하트비트 제한 시간 살펴보기

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

솔루션:

  • 좀비 태스크의 제한 시간을 늘리고 [scheduler]scheduler_zombie_task_threshold Airflow 구성 옵션의 값을 재정의합니다.

    섹션 참고
    scheduler scheduler_zombie_task_threshold 새 제한 시간(초) 기본값은 300입니다.

독약 문제 해결

독약은 Airflow에서 Airflow 태스크를 종료하는 데 사용하는 메커니즘입니다.

Airflow는 다음과 같은 상황에서 독약을 사용합니다.

  • 스케줄러가 제시간에 완료되지 않은 태스크를 종료하는 경우
  • 태스크가 시간 초과되거나 너무 오랫동안 실행되는 경우

Airflow가 독약을 사용하는 경우 태스크를 실행한 Airflow 작업자의 로그에서 다음 로그 항목을 볼 수 있습니다.

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

제공 가능한 솔루션:

  • 태스크 코드에 너무 오랫동안 실행될 수 있는 오류가 있는지 확인합니다.
  • (Cloud Composer 2) 태스크가 더 빨리 실행되도록 Airflow 작업자의 CPU 및 메모리를 늘립니다.
  • [celery_broker_transport_options]visibility-timeout Airflow 구성 옵션의 값을 늘립니다.

    그 결과 스케줄러는 태스크를 좀비 태스크로 간주하기 전에 태스크가 완료될 때까지 더 오래 기다립니다. 이 옵션은 몇 시간 동안 지속되는 시간 소모적인 태스크에 특히 유용합니다. 값이 너무 낮으면(예: 3시간) 스케줄러는 5시간 또는 6시간 동안 실행되는 태스크를 '중지'(좀비 태스크)로 간주합니다.

  • [core]killed_task_cleanup_time Airflow 구성 옵션의 값을 늘립니다.

    값이 길면 Airflow 작업자가 태스크를 정상적으로 완료하는 데 더 많은 시간을 제공합니다. 값이 너무 낮으면 작업을 정상적으로 완료할 충분한 시간 없이 Airflow 태스크가 갑자기 중단될 수 있습니다.

SIGTERM 신호 문제 해결

SIGTERM 신호는 Linux, Kubernetes, Airflow 스케줄러 및 Celery에서 Airflow 작업자 또는 Airflow 태스크 실행을 담당하는 프로세스를 종료하는 데 사용됩니다.

SIGTERM 신호가 환경에 전송되는 이유는 다음과 같습니다.

  • 좀비 태스크가 되어서 중지해야 합니다.

  • 스케줄러가 태스크의 중복을 발견하고 독약 및 SIGTERM 신호를 태스크로 전송하여 중지합니다.

  • 수평형 포드 자동 확장에서 GKE 제어 영역은 SIGTERM 신호를 전송하여 더 이상 필요하지 않은 포드를 삭제합니다.

  • 스케줄러는 SIGTERM 신호를 DagFileProcessorManager 프로세스에 보낼 수 있습니다. 이러한 SIGTERM 신호는 스케줄러에서 DagFileProcessorManager 프로세스 수명 주기를 관리하는 데 사용되며 이 신호를 무시해도 됩니다.

    예를 들면 다음과 같습니다.

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • 태스크 실행을 모니터링하는 local_task_job의 하트비트 콜백과 종료 콜백 간의 경합 상태. 하트비트는 태스크가 성공으로 표시되었음을 감지하더라도 태스크 자체가 성공했는지 또는 Airflow에서 태스크를 성공으로 간주했는지를 구분할 수 없습니다. 그럼에도 불구하고 태스크 실행자가 종료되기를 기다리지 않고 태스크 실행자를 종료합니다.

    이러한 SIGTERM 신호를 무시해도 됩니다. 태스크는 이미 성공 상태이며 DAG는 영향을 받지 않고 실행됩니다.

    일반 종료와 성공 상태의 태스크 종료 간에 유일한 차이점은 Received SIGTERM. 로그 항목입니다.

    하트비트와 종료 콜백 간의 경합 상태
    그림 2. 하트비트와 종료 콜백 간의 경합 상태(확대하려면 클릭)
  • Airflow 구성요소가 클러스터 노드에서 허용하는 것보다 더 많은 리소스(CPU, 메모리)를 사용합니다.

  • GKE 서비스는 유지보수 작업을 수행하고 업그레이드하려는 노드에서 실행되는 포드에 SIGTERM 신호를 보냅니다. 태스크 인스턴스가 SIGTERM으로 종료되면 태스크를 실행한 Airflow 작업자의 로그에서 다음 로그 항목을 볼 수 있습니다.

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

제공 가능한 솔루션:

이 문제는 태스크를 실행하는 VM의 메모리가 부족할 때 발생합니다. 이는 Airflow 구성과 관련이 없지만 VM에 제공되는 메모리 양과 관련이 있습니다.

메모리 증가는 사용하는 Cloud Composer 버전에 따라 다릅니다. 예를 들면 다음과 같습니다.

  • Cloud Composer 2에서는 Airflow 작업자에게 더 많은 CPU 및 메모리 리소스를 할당할 수 있습니다.

  • Cloud Composer 1의 경우 더 높은 성능을 제공하는 머신 유형을 사용하여 환경을 다시 만들 수 있습니다.

  • 두 버전의 Cloud Composer에서 [celery]worker_concurrency 동시 실행 Airflow 구성 옵션의 값을 낮출 수 있습니다. 이 옵션은 지정된 Airflow 작업자가 동시에 실행하는 작업 수를 결정합니다.

Cloud Composer 2 환경 최적화에 대한 자세한 내용은 환경 성능 및 비용 최적화를 참조하세요.

포드 재시작 또는 제거 이유를 찾기 위한 Cloud Logging 쿼리

Cloud Composer 환경은 GKE 클러스터를 컴퓨팅 인프라 레이어로 사용합니다. 이 섹션에서는 Airflow 작업자 또는 Airflow 스케줄러가 재시작되거나 제거된 이유를 찾는 데 도움이 되는 유용한 쿼리를 찾을 수 있습니다.

아래 표시된 쿼리를 다음 방식으로 조정할 수 있습니다.

  • 이전 6시간, 이전 3일과 같이 Cloud Logging에서 원하는 타임라인을 지정할 수 있습니다. 또는 커스텀 시간 범위를 정의할 수 있습니다.

  • Cloud Composer의 CLUSTER_NAME을 지정합니다.

  • 또한 POD_NAME을 추가하여 특정 포드로 검색을 제한할 수 있습니다.

다시 시작된 컨테이너 탐색

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

특정 포드로 결과를 제한하는 대체 쿼리:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

메모리 부족 이벤트의 결과로 종료된 컨테이너 탐색

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

특정 포드로 결과를 제한하는 대체 쿼리:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

실행이 중지된 컨테이너 탐색

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

특정 포드로 결과를 제한하는 대체 쿼리:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

업데이트 또는 업그레이드 작업이 Airflow 태스크 실행에 미치는 영향

태스크가 지연 가능한 모드에서 실행되지 않는 한 업데이트 또는 업그레이드 작업은 현재 실행 중인 Airflow 태스크를 중단합니다.

Airflow 태스크 실행에 최소한의 영향이 예상되는 경우 이러한 작업을 수행하고 DAG 및 태스크에서 적절한 재시도 메커니즘을 설정하는 것이 좋습니다.

KubernetesExecutor 태스크 문제 해결

CeleryKubernetesExecutor는 CeleryExecutor와 KubernetesExecutor를 동시에 사용할 수 있는 Cloud Composer 3의 실행자 유형입니다.

KubernetesExecutor로 실행되는 태스크 문제 해결에 대한 자세한 내용은 CeleryKubernetesExecutor 사용 페이지를 참조하세요.

일반적인 문제

다음 섹션에서는 몇 가지 일반적인 DAG 문제의 증상 및 가능한 해결 방법을 설명합니다.

Negsignal.SIGKILL에 의해 Airflow 태스크 중단

태스크가 Airflow 작업자에 할당된 것보다 더 많은 메모리를 사용 중일 수 있습니다. 이러한 경우 Negsignal.SIGKILL에 의해 중단될 수 있습니다. 시스템은 다른 Airflow 태스크 실행에 영향을 줄 수 있는 추가 메모리 소비를 방지하기 위해 이 신호를 보냅니다. Airflow 작업자 로그에 다음 로그 항목이 표시될 수 있습니다.

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL은 코드 -9로 표시될 수도 있습니다.

제공 가능한 솔루션:

  • Airflow 작업자의 worker_concurrency 낮추기

  • Cloud Composer 2의 경우 Airflow 작업자의 메모리 증설

  • Cloud Composer 1의 경우 Cloud Composer 클러스터에 사용되는 더 큰 머신 유형으로 업그레이드

  • 메모리 절약을 위한 태스크 최적화

  • 태스크 격리 및 맞춤설정된 리소스 할당을 위해 KubernetesPodOperator 또는 GKEStartPodOperator를 사용하여 Cloud Composer에서 리소스 집약적인 태스크를 관리합니다.

DAG 파싱 오류로 인해 로그가 배출되지 않는 태스크 실패

경우에 따라 Airflow 스케줄러와 DAG 프로세서가 태스크 실행을 예약하고 DAG 파일을 각각 파싱할 수 있지만 Python DAG 파일에 프로그래밍 오류가 있어서 Airflow 작업자가 그러한 DAG에서 태스크를 수행할 수 없는 미묘한 DAG 오류가 발생할 수 있습니다. 그 결과 Airflow 태스크가 Failed로 표시되고 실행 로그가 없는 상황이 발생할 수 있습니다.

해결책:

  • Airflow 작업자 로그에서 DAG 누락 또는 DAG 파싱 오류와 관련하여 Airflow 작업자에서 발생한 오류가 없는지 확인합니다.

  • DAG 파싱과 관련된 매개변수를 늘립니다.

  • 또한 DAG 프로세서 로그 검사를 참조하세요.

리소스 압력으로 인해 로그가 배출되지 않는 태스크 실패

증상: 태스크 실행 중 Airflow 태스크 실행을 담당하는 Airflow 작업자의 하위 프로세스가 갑자기 중단됩니다. Airflow 작업자 로그에 아래 로그와 비슷한 오류가 표시될 수 있습니다.

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

솔루션:

포드 제거로 인해 로그가 배출되지 않는 태스크 실패

Google Kubernetes Engine 포드에는 Kubernetes 포드 수명 주기와 포드 제거가 적용됩니다. 태스크 급증 및 작업자 공동 예약은 Cloud Composer에서 포드 제거가 발생하는 가장 일반적인 두 가지 원인입니다.

포드 제거는 특정 포드가 노드에 대해 구성된 리소스 소비 기대치를 기준으로 노드의 리소스를 과도하게 사용할 때 발생할 수 있습니다. 예를 들어 높은 메모리 용량의 태스크가 포드에서 실행될 때 제거가 발생할 수 있으며, 이러한 통합 로드로 인해 포드가 실행되는 노드가 메모리 소비 한도를 초과합니다.

Airflow 작업자 포드가 제거되면 해당 포드에서 실행 중인 모든 태스크 인스턴스가 중단되고 나중에 Airflow에서 실패로 표시됩니다.

로그가 버퍼링됩니다. 버퍼가 삭제되기 전에 작업자 포드가 제거되면 로그를 내보내지 않습니다. 로그 없이 태스크가 실패한 경우 이는 메모리 부족(OOM)으로 인해 Airflow 작업자가 다시 시작된 것입니다. Airflow 로그를 내보내지 않았더라도 일부 로그가 Cloud Logging에 있을 수 있습니다.

로그를 보려면 다음 안내를 따르세요.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. 로그 탭으로 이동합니다.

  4. 모든 로그 -> Airflow 로그 -> 작업자 -> (개별 작업자)에서 개별 작업자의 로그를 볼 수 있습니다.

DAG 실행은 메모리가 제한적입니다. 각 태스크 실행은 두 가지 Airflow 프로세스, 즉 작업 실행과 모니터링으로 시작됩니다. 각 노드는 동시 작업을 최대 6개까지 수행할 수 있습니다(Airflow 모듈에 프로세스가 약 12개 로드됨). DAG의 속성에 따라 더 많은 메모리를 사용할 수 있습니다.

증상

  1. Google Cloud 콘솔에서 워크로드 페이지로 이동합니다.

    워크로드로 이동

  2. Evicted를 표시하는 airflow-worker 포드가 있는 경우 삭제된 각 포드를 클릭하고 창 상단에서 The node was low on resource: memory 메시지를 찾습니다.

해결:

  • Cloud Composer 1에서 현재 머신 유형보다 큰 머신 유형을 사용하여 새 Cloud Composer 환경을 만듭니다.
  • Cloud Composer 2에서 Airflow 작업자의 메모리 한도를 늘립니다.
  • airflow-worker 포드 로그에서 가능한 제거 원인을 확인합니다. 개별 포드에서 로그 가져오기에 대한 자세한 내용은 배포된 워크로드 문제 해결을 참조하세요.
  • DAG의 태스크가 멱등적이고 재시도 가능한지 확인합니다.
  • Airflow 작업자의 로컬 파일 시스템에 불필요한 파일을 다운로드하지 마세요.

    Airflow 작업자는 로컬 파일 시스템 용량이 제한적입니다. 예를 들어 Cloud Composer 2에서 작업자는 스토리지 용량을 1GB~10GB까지 사용할 수 있습니다. 저장공간이 부족하면 GKE 제어 영역에 의해 Airflow 작업자 포드가 제거됩니다. 그러면 제거된 작업자가 실행하던 모든 태스크가 실패합니다.

    문제가 있는 작업의 예시:

    • 파일 또는 객체를 다운로드하고 Airflow 작업자에 로컬로 저장. 대신 이러한 객체를 Cloud Storage 버킷과 같은 적합한 서비스에 직접 저장합니다.
    • Airflow 작업자에서 /data 폴더의 큰 객체에 액세스. Airflow 작업자가 객체를 로컬 파일 시스템으로 다운로드합니다. 대신 큰 파일이 Airflow 작업자 포드 외부에서 처리되도록 DAG를 구현합니다.

DAG 로드 가져오기 제한 시간

증상

  • Airflow 웹 인터페이스의 DAG 목록 페이지 위에서 경고 알림 상자에 Broken DAG: [/path/to/dagfile] Timeout이 표시됩니다.
  • Cloud Monitoring: airflow-scheduler 로그에 다음과 비슷한 항목이 포함됩니다.

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

해결:

dag_file_processor_timeout Airflow 구성 옵션을 재정의하고 DAG 파싱 제한 시간을 늘립니다.

섹션
core dag_file_processor_timeout 새 제한 시간 값

DAG 실행이 예상 시간 내에 종료되지 않음

증상

Airflow 태스크가 중단되고 DAG 실행이 예상보다 오래 지속되어 DAG 실행이 종료되지 않는 경우가 있습니다. Airflow에는 이러한 상황을 방지하는 데 도움이 되는 제한 시간 및 정리 절차가 있으므로 정상 조건에서 Airflow 태스크는 큐에 추가되거나 실행 중인 상태로 무기한 유지되지 않습니다.

해결:

  • DAG에 dagrun_timeout 매개변수를 사용합니다. 예를 들면 dagrun_timeout=timedelta(minutes=120)입니다. 결과적으로 각 DAG 실행은 DAG 실행 제한 시간 내에 완료되어야 하며 완료되지 않은 태스크는 Failed 또는 Upstream Failed로 표시됩니다. Airflow 태스크 상태에 대한 자세한 내용은 Apache Airflow 문서를 참조하세요.

  • 태스크 실행 제한 시간 매개변수를 사용하여 Apache Airflow 연산자에 따라 실행되는 태스크의 기본 제한 시간을 정의합니다.

DAG 실행이 실행되지 않음

증상

DAG의 예약 날짜를 동적으로 설정하면 예기치 않은 여러 부작용이 발생할 수 있습니다. 예를 들면 다음과 같습니다.

  • DAG 실행이 항상 미래이며 DAG는 실행되지 않습니다.

  • 과거 DAG 실행이 실행되지 않았는데도 성공적으로 실행된 것으로 표시됩니다.

자세한 내용은 Apache Airflow 문서를 참조하세요.

해결:

  • Apache Airflow 문서의 권장사항을 따르세요.

  • DAG에 정적 start_date를 설정합니다. 또는 catchup=False를 사용하여 이전 날짜의 DAG 실행을 사용 중지할 수 있습니다.

  • 이 방법의 부작용을 인지하고 있지 않은 경우 datetime.now() 또는 days_ago(<number of days>)를 사용하지 마세요.

Airflow 데이터베이스에 대한 네트워크 트래픽 증가

해당 환경의 GKE 클러스터와 Airflow 데이터베이스 사이의 네트워크 트래픽 양은 DAG 수, DAG의 태스크 수, DAG가 Airflow 데이터베이스의 데이터에 액세스하는 방법에 따라 달라집니다. 네트워크 사용량에 영향을 줄 수 있는 요소는 다음과 같습니다.

  • Airflow 데이터베이스에 대한 쿼리. DAG가 많은 쿼리를 수행하는 경우 많은 트래픽을 발생시킵니다. 예를 들어 다른 태스크를 진행하기 전 태스크 상태를 확인하거나, XCom 테이블을 쿼리하거나, Airflow 데이터베이스 콘텐츠를 덤프할 수 있습니다.

  • 많은 수의 태스크. 예약할 태스크가 많을수록 발생하는 네트워크 트래픽이 많아집니다. 이러한 고려 사항은 DAG에 있는 총 태스크 수 그리고 예약 빈도 모두에 적용됩니다. Airflow Scheduler가 DAG 실행을 예약할 때는 Airflow 데이터베이스에 대해 쿼리를 만들고 트래픽을 발생시킵니다.

  • Airflow 웹 인터페이스는 Airflow 데이터베이스에 대해 쿼리를 만들기 때문에 네트워크 트래픽을 발생시킵니다. 그래프, 태스크, 다이어그램이 포함된 페이지를 집중적으로 사용하면 네트워크 트래픽이 대량으로 발생할 수 있습니다.

DAG가 Airflow 웹 서버를 다운시키거나 502 gateway timeout 오류를 반환하는 경우

웹 서버 오류는 여러 가지 이유로 발생할 수 있습니다. Cloud Logging에서 airflow-webserver 로그를 참조하여 502 gateway timeout 오류 원인을 확인하세요.

고강도의 컴퓨팅

이 섹션은 Cloud Composer 1에만 적용됩니다.

DAG 파싱 중에 고강도 컴퓨팅을 실행하지 마세요.

머신 유형을 맞춤설정하여 CPU 및 메모리 용량을 늘릴 수 있는 작업자 및 스케줄러 노드와 달리, 웹 서버는 고정 머신 유형을 사용하므로, 파싱 중에 고강도의 컴퓨팅이 수행되면 DAG 파싱 오류가 발생할 수 있습니다.

웹 서버에는 vCPU 2개와 메모리 2GB가 있습니다. core-dagbag_import_timeout의 기본값은 30초입니다. 이 제한 시간 값은 Airflow에서 Python 모듈을 dags/ 폴더에 로드하는 데 소요되는 시간의 상한을 정의합니다.

잘못된 권한

이 섹션은 Cloud Composer 1에만 적용됩니다.

웹 서버는 작업자 및 스케줄러와 동일한 서비스 계정으로 실행되지 않습니다. 따라서 작업자와 스케줄러는 웹 서버가 액세스할 수 없는 사용자 관리 리소스에 액세스할 수 있습니다.

DAG 파싱 중에는 비공개 리소스에 액세스하지 않는 것이 좋습니다. 불가피한 경우에는 웹 서버의 서비스 계정에 권한을 부여해야 합니다. 서비스 계정 이름은 웹 서버 도메인에서 파생됩니다. 예를 들어 도메인이 example-tp.appspot.com이면 서비스 계정은 example-tp@appspot.gserviceaccount.com입니다.

DAG 오류

이 섹션은 Cloud Composer 1에만 적용됩니다.

웹 서버는 App Engine에서 실행되며 사용자 환경의 GKE 클러스터와 별개입니다. 웹 서버는 DAG 정의 파일을 파싱하며 DAG에 오류가 있으면 502 gateway timeout이 발생할 수 있습니다. 문제가 있는 DAG가 GKE에서 실행 중인 프로세스를 손상시키지 않는다면 웹 서버가 작동하지 않아도 Airflow는 정상적으로 작동합니다. 이런 경우에 웹 서버를 사용할 수 없게 되면 임시 방편으로 gcloud composer environments run을 사용하여 환경에서 세부정보를 검색할 수 있습니다.

다른 경우 GKE에서 DAG 파싱을 실행하고, 치명적인 Python 예외를 유발하거나 타임아웃된(기본값 30초) DAG를 찾아볼 수 있습니다. 문제를 해결하려면 Airflow 작업자 컨테이너의 원격 셸에 연결하고 문법 오류를 테스트하세요. 자세한 내용은 DAG 테스트를 참조하세요.

dags 및 plugins 폴더에서 다수의 DAG 및 플러그인 처리

/dags/plugins 폴더의 콘텐츠가 해당 환경 버킷에서 Airflow 작업자 및 스케줄러의 로컬 파일 시스템으로 동기화됩니다.

이러한 폴더에 저장된 데이터가 많을수록 동기화를 수행하는 데 시간이 오래 걸립니다. 이러한 상황을 해결하려면 다음 안내를 따르세요.

  • /dags/plugins 폴더의 파일 수를 제한합니다. 필요한 최소 파일만 저장합니다.

  • 가능하면 Airflow 스케줄러 및 작업자에 사용 가능한 디스크 공간을 늘립니다.

  • 가능하면 동기화 작업이 더 빠르게 수행되도록 Airflow 스케줄러 및 작업자의 CPU와 메모리를 늘립니다.

  • DAG 수가 매우 많으면 DAG를 여러 배치로 나누고, 이를 ZIP 파일로 압축하고, 이러한 보관 파일을 /dags 폴더에 배포합니다. 이렇게 하면 DAG 동기화 프로세스가 빨라집니다. Airflow 구성요소는 DAG를 처리하기 전 ZIP 파일을 압축 해제합니다.

  • 프로그래매틱 방식으로 DAG를 생성하는 것도 /dags 폴더에 저장되는 DAG 수를 제한하기 위한 방법일 수 있습니다. 프로그래매틱 방식으로 생성된 DAG를 예약 및 실행할 때 문제가 발생하지 않도록 하려면 프로그래매틱 DAG 섹션을 참조하세요.

프로그래매틱 방식으로 생성된 DAG를 동시에 예약하지 않음

DAG 파일에서 프로그래매틱 방식으로 DAG 객체를 생성하는 것은 차이가 크지 않은 비슷한 여러 DAG를 작성하기 위한 효율적인 방법입니다.

이러한 DAG가 즉시 실행되도록 예약하지 않는 것이 중요합니다. Airflow 작업자는 예약된 모든 태스크를 동시에 실행하기에 충분한 CPU 및 메모리 리소스를 갖고 있지 않을 가능성이 매우 높습니다.

프로그래매틱 방식의 DAG 예약과 관련된 문제를 방지하려면 다음 안내를 따르세요.

  • 더 많은 작업을 동시에 수행할 수 있도록 작업자 동시성을 늘리고 환경을 확장합니다.
  • Airflow 작업자가 예약된 모든 태스크를 천천히 실행 수 있도록 수백 개의 태스크가 동시에 예약되는 것을 방지하려면 일정 시간에 따라 일정이 고르게 분산되도록 DAG를 생성합니다.

Airflow 웹 서버에 액세스할 때 오류 504 발생

Airflow UI에 액세스할 때 오류 504 발생을 참조하세요.

Lost connection to Postgres server during query 예외가 태스크 실행 중 또는 실행 직후에 발생합니다.

Lost connection to Postgres server during query 예외는 다음 조건이 충족될 때 자주 발생합니다.

  • DAG에 PythonOperator 또는 커스텀 연산자가 사용됩니다.
  • DAG가 Airflow 데이터베이스를 쿼리합니다.

호출 가능한 함수에서 여러 쿼리가 수행될 경우 역추적이 Airflow 코드에서 self.refresh_from_db(lock_for_update=True) 줄을 잘못 가리킬 수 있습니다. 이것은 태스크 실행 후 첫 번째 데이터베이스 쿼리입니다. SQLAlchemy 세션이 올바르게 닫히지 않으면 예외의 실제 원인이 이것 전에 발생합니다.

SQLAlchemy 세션은 범위가 스레드로 지정되고 호출 가능한 함수에 생성됩니다. 나중에 세션이 Airflow 코드 내에서 지속될 수 있습니다. 한 세션 내에서 쿼리 간에 지연이 크게 발생하면 Postgres 서버에서 이미 연결이 닫혔을 수 있습니다. Cloud Composer 환경의 연결 제한 시간은 약 10분으로 설정됩니다.

해결:

  • airflow.utils.db.provide_session 데코레이터를 사용합니다. 이 데코레이터는 session 매개변수에서 Airflow 데이터베이스에 유효한 세션을 제공하고, 함수가 끝날 때 세션을 올바르게 닫습니다.
  • 단일 장기 실행 함수는 사용하지 마세요. 대신 airflow.utils.db.provide_session 데코레이터를 사용하는 함수가 여러 개 있도록 모든 데이터베이스 쿼리를 개별 함수로 이동하세요. 이 경우 쿼리 결과를 검색한 후 세션이 자동으로 닫힙니다.

DAG 실행 시간, 태스크 및 동일한 DAG의 병렬 실행 제어

특정 DAG의 단일 DAG 실행이 지속되는 시간을 제어하려면 dagrun_timeout DAG 매개변수를 사용하여 이를 수행할 수 있습니다. 예를 들어 단일 DAG 실행(실행이 성공 또는 실패했는지 여부에는 무관)이 1시간 이상 지속되면 안 되는 경우 이 매개변수를 3600초로 설정해야 합니다.

단일 Airflow 태스크가 지속되는 시간을 제어할 수도 있습니다. 이렇게 하려면 execution_timeout을 사용하면 됩니다.

특정 DAG에 대해 지정할 DAG 실행 수를 제어하려면 [core]max-active-runs-per-dag Airflow 구성 옵션을 사용하여 제어합니다.

특정 시점에 DAG 인스턴스 하나만 실행되도록 하려면 max-active-runs-per-dag 매개변수를 1로 설정하세요.

DAG 및 플러그인을 스케줄러, 작업자, 웹 서버로 동기화에 영향을 미치는 문제

Cloud Composer는 /dags/plugins 폴더의 콘텐츠를 스케줄러와 작업자로 동기화합니다. /dags/plugins 폴더의 특정 객체로 인해 이 동기화가 올바르게 작동하지 않거나 속도가 느려질 수 있습니다.

  • /dags 폴더가 스케줄러와 작업자로 동기화됩니다. 이 폴더는 Cloud Composer 2의 웹 서버로 동기화되지 않거나 Cloud Composer 1에서 DAG Serialization을 사용 설정한 경우 동기화되지 않습니다.

  • /plugins 폴더가 스케줄러, 작업자, 웹 서버로 동기화됩니다.

다음 문제가 발생할 수 있습니다.

  • 압축 트랜스코딩을 사용하는 gzip 압축 파일을 /dags/plugins 폴더에 업로드했습니다. 일반적으로 gcloud storage cp 명령어에서 --gzip-local-all 플래그를 사용하여 데이터를 버킷에 업로드하는 경우에 발생합니다.

    솔루션: 압축 트랜스코딩을 사용한 객체를 삭제하고 버킷에 다시 업로드합니다.

  • 객체 중 하나는 '.'로 지정됩니다. 이러한 객체는 스케줄러와 작업자로 동기화되지 않으며 동기화가 중지될 수도 있습니다.

    솔루션: 문제가 있는 객체의 이름을 변경합니다.

  • 폴더와 DAG Python 파일의 이름이 동일합니다(예: a.py). 이 경우 DAG 파일이 Airflow 구성요소에 올바르게 동기화되지 않습니다.

    솔루션: DAG Python 파일과 동일한 이름의 폴더를 삭제합니다.

  • /dags 또는 /plugins 폴더의 객체 중 하나의 이름 끝에는 / 기호가 있습니다. / 기호는 객체가 파일이 아닌 폴더라는 의미이므로 이러한 객체가 동기화 프로세스에 혼란을 줄 수 있습니다.

    솔루션: 문제가 되는 객체 이름에서 / 기호를 삭제합니다.

  • /dags/plugins 폴더에 불필요한 파일을 저장하지 마세요.

    구현하는 DAG 및 플러그인은 이러한 구성요소 테스트를 저장하는 파일과 같은 추가 파일과 함께 제공되는 경우가 있습니다. 이러한 파일은 작업자 및 스케줄러로 동기화되며, 이러한 파일을 스케줄러, 작업자, 웹 서버에 복사하는 데 필요한 시간에 영향을 미칩니다.

    솔루션: 추가 파일과 불필요한 파일을 /dags/plugins 폴더에 저장하지 마세요.

스케줄러와 작업자에서 Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' 오류 발생

이 문제는 Cloud Storage의 객체에 중첩 네임스페이스가 있으며 동시에 스케줄러와 작업자에서 기존 파일 시스템을 사용하기 때문에 발생합니다. 예를 들어 폴더와 이름이 동일한 객체 모두 환경의 버킷에 추가할 수 있습니다. 버킷이 환경의 스케줄러와 작업자로 동기화되면 이 오류가 발생하여 태스크가 실패할 수 있습니다.

이 문제를 해결하려면 환경의 버킷에 중첩되는 네임스페이스가 없는지 확인합니다. 예를 들어 /dags/misc(파일) 및 /dags/misc/example_file.txt(다른 파일) 모두 버킷에 있으면 스케줄러에서 오류가 발생합니다.

Airflow 메타데이터 DB에 연결할 때 일시적인 중단

Cloud Composer는 분산된 클라우드 인프라 위에서 실행됩니다. 즉, 경우에 따라 일부 일시적인 문제가 나타날 수 있고 Airflow 태스크 실행이 중단될 수 있습니다.

이러한 경우 Airflow 작업자 로그에서 다음 오류 메시지가 표시될 수 있습니다.

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

또는

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

이러한 간헐적인 문제는 Cloud Composer 환경에 수행되는 유지보수 작업에 의해 발생할 수도 있습니다.

일반적으로 이러한 오류는 간헐적으로 발생합니다. 그리고 Airflow 태스크가 멱등성이고 재시도를 구성한 경우에는 이러한 오류가 발생하지 않습니다. 또한 유지보수 기간을 정의할 수도 있습니다.

이러한 오류가 발생하는 한 가지 추가 이유는 환경 클러스터에 리소스가 부족하기 때문일 수 있습니다. 이러한 경우 환경 확장 또는 환경 최적화 안내에 설명된 대로 환경을 확장하거나 최적화할 수 있습니다.

DAG 실행이 성공으로 표시되지만 실행된 태스크 없음

DAG 실행 execution_date가 DAG의 start_date보다 이전인 경우 실행된 태스크가 없지만 성공으로 표시된 DAG 실행이 표시될 수 있습니다.

실행된 태스크 없이 성공한 DAG 실행
그림 3. 실행된 태스크 없이 성공한 DAG 실행(확대하려면 클릭)

원인

이러한 상황은 다음 사례 중 하나에서 발생할 수 있습니다.

  • DAG의 execution_datestart_date 간의 시간대 차이로 인해 불일치가 발생합니다. 예를 들어 pendulum.parse(...)를 사용하여 start_date를 설정할 때 이러한 문제가 발생할 수 있습니다.

  • DAG의 start_date가 동적 값으로 설정됩니다(예: airflow.utils.dates.days_ago(1)).

해결책

  • execution_datestart_date가 동일한 시간대를 사용하는지 확인합니다.

  • 정적 start_date를 지정하고 catchup=False와 결합하여 과거 시작 날짜가 있는 DAG를 실행하지 않도록 합니다.

DAG가 Airflow UI 또는 DAG UI에 표시되지 않으며 스케줄러가 예약하지 않음

DAG 프로세서는 DAG가 스케줄러를 통해 예약될 수 있고 Airflow UI 또는 DAG UI에 표시되기 전에 각 DAG를 파싱합니다.

다음 Airflow 구성 옵션은 DAG 파싱을 위한 제한 시간을 정의합니다.

DAG가 Airflow UI 또는 DAG UI에 표시되지 않는 경우:

  • DAG 프로세서 로그를 확인하여 DAG 프로세서가 DAG를 올바르게 처리할 수 있는지 확인합니다. 문제가 발생하면 DAG 프로세서 또는 스케줄러 로그에 다음 로그 항목이 표시될 수 있습니다.
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • 스케줄러 로그를 확인하여 스케줄러가 올바르게 작동하는지 확인합니다. 문제가 발생하면 스케줄러 로그에 다음 로그 항목이 표시될 수 있습니다.
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

솔루션:

  • 모든 DAG 파싱 오류를 수정합니다. DAG 프로세서는 여러 DAG를 파싱하며 드물기는 하지만 한 DAG의 오류를 파싱하면 다른 DAG의 파싱에 부정적인 영향을 줄 수 있습니다.

  • DAG 파싱이 [core]dagrun_import_timeout에 정의된 시간(초)보다 오래 걸리면 이 제한 시간을 늘립니다.

  • 모든 DAG의 파싱이 [core]dag_file_processor_timeout에 정의된 시간(초)보다 오래 걸리면 이 제한 시간을 늘립니다.

  • DAG를 파싱하는 데 시간이 오래 걸린다면 최적의 방식으로 구현되지 않은 것일 수도 있습니다. 예를 들어 환경 변수를 여러 개 읽거나 외부 서비스 또는 Airflow 데이터베이스를 호출하는 경우입니다. 가능하면 DAG의 전역 섹션에서 이러한 작업을 수행하지 마세요.

  • 스케줄러의 CPU 및 메모리 리소스를 늘려서 더 빠르게 작업할 수 있습니다.

  • 스케줄러의 수를 조정합니다.

  • 파싱을 더 빠르게 수행할 수 있도록 DAG 프로세서 프로세스의 수를 늘립니다. 이렇게 하려면 [scheduler]parsing_process 값을 늘리면 됩니다.

  • DAG 파싱 실행 빈도를 줄입니다.

  • Airflow 데이터베이스의 부하를 줄입니다.

과부하 상태인 Airflow 데이터베이스의 증상

자세한 내용은 과부하 상태인 Airflow 데이터베이스의 증상을 참조하세요.

다음 단계