태스크 예약 문제 디버그

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 튜토리얼에서는 스케줄러 오작동, 파싱 오류 및 지연 시간, 태스크 실패를 유발하는 태스크 예약 및 파싱 문제를 진단하고 해결하는 방법을 설명합니다.

소개

Airflow 스케줄러는 주로 태스크 예약과 DAG 파싱이라는 두 가지 요소에 영향을 받습니다. 이러한 요소 중 하나에 문제가 있으면 환경 상태 및 성능에 부정적인 영향을 줄 수 있습니다.

너무 많은 태스크가 동시에 예약되는 경우가 있습니다. 이러한 경우 큐가 채워지고 태스크는 '예약된' 상태로 유지되거나 큐에 추가된 후 다시 예약되므로 태스크 실패와 성능 지연 시간이 발생할 수 있습니다.

또 다른 일반적인 문제는 DAG 코드의 복잡성으로 인한 파싱 지연 시간과 오류입니다. 예를 들어 코드 최상위에 Airflow 변수가 포함된 DAG 코드는 파싱 지연, 데이터베이스 과부하, 예약 실패, DAG 시간 초과를 유발할 수 있습니다.

이 튜토리얼에서는 예시 DAG를 진단하고, 예약 및 파싱 문제 해결, DAG 예약 개선, DAG 코드 및 환경 구성 최적화를 통한 성능 개선 방법을 알아봅니다.

목표

이 섹션에는 이 튜토리얼 예시의 목표가 나와 있습니다.

예: 다수의 태스크 동시 실행으로 인한 스케줄러 오작동 및 지연 시간

  • 동시에 여러 번 실행되는 샘플 DAG를 업로드하고 Cloud Monitoring으로 스케줄러 오작동 및 지연 시간 문제를 진단합니다.

  • 태스크를 통합하여 DAG 코드를 최적화하고 성능 영향을 평가합니다.

  • 시간 경과에 따라 태스크를 보다 균등하게 분배하고 성능 영향을 평가합니다.

  • Airflow 구성 및 환경 구성을 최적화하고 영향을 평가합니다.

예: 복잡한 코드로 인한 DAG 파싱 오류 및 지연 시간

  • Airflow 변수를 사용하는 샘플 DAG를 업로드하고 Cloud Monitoring으로 파싱 문제를 진단합니다.

  • 코드 최상위에서 Airflow 변수를 방지하여 DAG 코드를 최적화하고 파싱 시간에 미치는 영향을 평가합니다.

  • Airflow 구성 및 환경 구성을 최적화하고 파싱 시간에 미치는 영향을 평가합니다.

비용

이 튜토리얼에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

이 튜토리얼을 마치면 만든 리소스를 삭제하여 비용이 계속 청구되지 않도록 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

이 섹션에서는 튜토리얼을 시작하기 전에 필요한 작업을 설명합니다.

프로젝트 생성 및 구성

이 튜토리얼에는 Google Cloud 프로젝트가 필요합니다. 다음과 같은 방법으로 프로젝트를 구성합니다.

  1. Google Cloud 콘솔에서 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  2. 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  3. Google Cloud 프로젝트 사용자에게 필요한 리소스를 만들 수 있는 다음 역할이 있는지 확인합니다.

    • 환경 및 스토리지 객체 관리자(roles/composer.environmentAndStorageObjectAdmin)
    • Compute 관리자(roles/compute.admin)

프로젝트에 API 사용 설정

Cloud Composer API 사용 설정

API 사용 설정

Cloud Composer 환경 만들기

Cloud Composer 2 환경을 만듭니다.

환경을 만들 때 Composer 서비스 에이전트 계정에 Cloud Composer v2 API 서비스 에이전트 확장 프로그램(roles/composer.ServiceAgentV2Ext) 역할을 부여합니다. Cloud Composer는 이 계정을 사용하여 Google Cloud 프로젝트에서 작업을 수행합니다.

예: 태스크 예약 문제로 인한 스케줄러 오작동 및 태스크 실패

이 예시에서는 다수의 태스크 동시 실행으로 인한 스케줄러 오작동과 지연 시간을 디버그합니다.

사용자 환경에 샘플 DAG 업로드

다음 샘플 DAG를 이전 단계에서 만든 환경에 업로드합니다. 이 튜토리얼에서 이 DAG의 이름은 dag_10_tasks_200_seconds_1입니다.

이 DAG에는 200개의 태스크가 있습니다. 각 태스크는 1초 동안 기다렸다가 '완료'를 출력합니다. DAG는 업로드 후 자동으로 트리거됩니다. Cloud Composer는 이 DAG를 10번 실행하며, 모든 DAG 실행은 동시에 수행됩니다.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 200
seconds = 1
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2023, 11, 22, 20, 0),
    end_date=datetime(2023, 11, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

스케줄러 오작동 및 태스크 실패 문제 진단

DAG 실행이 완료되면 Airflow UI를 열고 dag_10_tasks_200_seconds_1 DAG를 클릭합니다. 총 10개의 DAG 실행이 성공했고 각 실행 시 200개의 태스크가 성공했음을 확인할 수 있습니다.

Airflow 태스크 로그를 검토합니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. 로그 탭으로 이동한 후 모든 로그 > Airflow 로그 > 작업자 > 로그 탐색기에서 보기로 이동합니다.

로그 히스토그램에서 빨간색 및 주황색으로 표시된 오류 및 경고를 볼 수 있습니다.

빨간색 및 주황색으로 표시된 오류와 경고가 있는 Airflow 작업자 오류 및 경고 로그의 히스토그램
그림 1. Airflow 작업자 로그 히스토그램(확대하려면 클릭)

예시 DAG에서는 약 130개의 경고와 60개의 오류가 발생했습니다. 노란색 막대와 빨간색 막대가 포함된 열을 클릭합니다. 로그에서 다음과 같은 경고 및 오류를 볼 수 있습니다.

State of this instance has been externally set to success. Terminating
instance.

Received SIGTERM. Terminating subprocesses.

worker: Warm shutdown (MainProcess).

이러한 로그는 리소스 사용량이 한도를 초과했으며 작업자가 다시 시작되었음을 나타낼 수 있습니다.

Airflow 태스크가 큐에 너무 오래 보관되면 스케줄러에서 이 태스크를 failed 및 up_for_retry로 표시하고 실행을 위해 다시 한 번 예약합니다. 이 상황의 증상을 관측하는 한 가지 방법은 큐에 추가된 태스크 수가 있는 차트를 살펴보는 것입니다. 이 차트의 급상승 구간이 약 10분 내로 하락하지 않을 경우 태스크 실패가 발생할 가능성이 있습니다(로그 없음).

모니터링 정보를 검토합니다.

  1. Monitoring 탭으로 이동하고 개요를 선택합니다.

  2. Airflow 태스크 그래프를 검토합니다.

    큐에 추가된 태스크 수의 급상승 구간을 보여주는 시간별 Airflow 태스크 그래프
    그림 2. Airflow 태스크 그래프(확대하려면 클릭)

    Airflow 태스크 그래프에서 큐에 추가된 태스크의 급상승 구간이 10분 이상 지속되므로 환경의 리소스가 예약된 모든 태스크를 처리하기에 충분하지 않을 수 있습니다.

  3. 활성 작업자 그래프를 검토합니다.

    활성 작업자 수가 최대 한도까지 증가했음을 보여주는 시간별 활성 Airflow 작업자 그래프
    그림 3. 활성 작업자 그래프(확대하려면 클릭)

    활성 작업자 그래프는 DAG가 DAG 실행 중에 최대 허용 한도인 작업자 수 3개로 자동 확장을 트리거했음을 나타냅니다.

  4. 리소스 사용량 그래프는 Airflow 작업자에 큐에 추가된 태스크를 실행할 용량이 부족함을 나타낼 수 있습니다. Monitoring 탭에서 작업자를 선택하고 총 작업자 CPU 사용량총 작업자 메모리 사용량 그래프를 검토합니다.

    최대 한도까지 증가하는 CPU 사용량을 보여주는 Airflow 작업자별 CPU 사용량 그래프
    그림 4. 총 작업자 CPU 사용량 그래프(확대하려면 클릭)
    Airflow 작업자별 메모리 사용량 그래프는 메모리 사용량이 증가했지만 최대 한도에 도달하지 않았음을 보여줍니다.
    그림 5. 총 작업자 메모리 사용량 그래프(확대하려면 클릭)

    그래프는 너무 많은 태스크를 동시에 실행하여 CPU 한도에 도달했음을 나타냅니다. 리소스는 30분 넘게 사용되었으며, 이는 10개 DAG 실행에서 200개의 태스크가 하나씩 실행되는 데 걸리는 총 시간보다 훨씬 깁니다.

이는 큐가 가득 찼으며 예약된 모든 태스크를 처리하기 위한 리소스가 부족함을 나타냅니다.

태스크 통합

현재 코드는 모든 태스크를 동시에 처리할 리소스가 부족한 상태에서 많은 DAG 및 태스크를 만들어 큐를 가득 채웁니다. 태스크를 큐에 너무 오래 보관하면 태스크가 다시 예약되거나 실패할 수 있습니다. 이러한 경우에는 더 적은 수로 보다 통합된 태스크를 사용해야 합니다.

다음 샘플 DAG는 초기 예시의 태스크 수를 200에서 20으로 변경하고 대기 시간을 1초에서 10초로 늘려 동일한 양의 작업을 수행하는 더 통합된 태스크를 모방합니다.

다음 샘플 DAG를 생성된 환경에 업로드합니다. 이 튜토리얼에서 이 DAG의 이름은 dag_10_tasks_20_seconds_10입니다.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5

with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

보다 통합된 태스크가 예약 프로세스에 미치는 영향을 평가합니다.

  1. DAG 실행이 완료될 때까지 기다립니다.

  2. Airflow UI의 DAG 페이지에서 dag_10_tasks_20_seconds_10 DAG를 클릭합니다. 각각 20개의 태스크가 성공한 10개의 DAG 실행을 볼 수 있습니다.

  3. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  4. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  5. 로그 탭으로 이동한 후 모든 로그 > Airflow 로그 > 작업자 > 로그 탐색기에서 보기로 이동합니다.

    태스크가 더 통합된 두 번째 예시에서는 약 10개의 경고와 7개의 오류가 발생했습니다. 히스토그램에서 초기 예시(이전 값)의 오류 및 경고 수와 두 번째 예시(이후 값)를 비교할 수 있습니다.

    태스크가 통합된 후 Airflow 작업자 오류 및 경고 로그의 히스토그램에서 오류 및 경고의 양이 감소한 것을 보여줍니다.
    그림 6. 태스크 통합 후 Airflow 작업자 로그 히스토그램(확대하려면 클릭)

    첫 번째 예시와 더욱 통합된 예시를 비교하면 두 번째 예시의 오류와 경고가 훨씬 적은 것을 볼 수 있습니다. 그러나 리소스 과부하로 인해 웜 종료와 관련된 동일한 오류가 로그에 계속 표시됩니다.

  6. Monitoring 탭에서 작업자를 선택하고 그래프를 검토합니다.

    첫 번째 예시(이전 값)의 Airflow 태스크 그래프를 더 통합된 태스크가 있는 두 번째 예시의 그래프와 비교하면 태스크가 더 통합된 경우 큐에 추가된 태스크의 급상승 구간이 더 짧게 지속된 것을 확인할 수 있습니다. 하지만 10분 가까이 지속되었으며, 여전히 최적의 상태가 아닙니다.

    Airflow 태스크 급상승 구간이 이전보다 짧은 기간 동안 지속되었음을 보여주는 시간별 Airflow 태스크 그래프
    그림 7. 태스크 통합 후 Airflow 태스크 그래프(확대하려면 클릭)

    활성 작업자 그래프에서는 두 예시 모두 동일한 양을 모방하더라도 첫 번째 예시(그래프 왼쪽)가 두 번째 예시보다 훨씬 더 오랫동안 리소스를 사용한 것을 볼 수 있습니다.

    이전보다 짧은 기간 동안 활성 작업자 수가 증가했음을 보여주는 시간별 활성 Airflow 작업자 그래프
    그림 8. 태스크 통합 후 활성 작업자 그래프(확대하려면 클릭)

    작업자 리소스 소비 그래프를 검토합니다. 더 통합된 태스크를 포함하는 예시와 초기 예시에 사용된 리소스의 차이가 상당히 크더라도 CPU 사용량은 한도의 70%로 급상승합니다.

    최대 한도의 70%까지 증가하는 CPU 사용량을 보여주는 Airflow 작업자별 CPU 사용량 그래프
    그림 9. 태스크 통합 후 총 작업자 CPU 사용량 그래프(확대하려면 클릭)
    Airflow 작업자별 메모리 사용량 그래프는 메모리 사용량이 증가했지만 최대 한도에 도달하지 않았음을 보여줍니다.
    그림 10. 태스크 통합 후 총 작업자 메모리 사용량 그래프(확대하려면 클릭)

시간 경과에 따라 태스크를 더 균등하게 분배

동시 태스크가 너무 많으면 큐가 가득 차서 태스크가 큐에 묶여 있거나 다시 예약됩니다. 이전 단계에서는 이러한 태스크를 통합하여 태스크 수를 줄였지만 출력 로그 및 모니터링에 따르면 동시 태스크 수가 여전히 최적 상태가 아님을 알 수 있습니다.

일정을 구현하거나 동시에 실행할 수 있는 태스크 수에 대한 한도를 설정하여 동시 태스크 실행 수를 제어할 수 있습니다.

이 튜토리얼에서는 dag_10_tasks_20_seconds_10 DAG에 DAG 수준 매개변수를 추가하여 시간 경과에 따라 태스크를 보다 균등하게 분배합니다.

  1. DAG 컨텍스트 관리자에 max_active_runs=1 인수를 추가합니다. 이 인수는 특정 시점에 DAG 인스턴스를 하나만 실행하도록 한도를 설정합니다.

  2. DAG 컨텍스트 관리자에 max_active_tasks=5 인수를 추가합니다. 이 인수는 각 DAG에서 동시에 실행될 수 있는 최대 태스크 인스턴스 수를 제어합니다.

다음 샘플 DAG를 생성된 환경에 업로드합니다. 이 튜토리얼에서 이 DAG의 이름은 dag_10_tasks_20_seconds_10_scheduled.py입니다.

import time
from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task


tasks_amount = 20
seconds = 10
minutes = 5
active_runs = 1
active_tasks = 5


with DAG(
    dag_id=f"dag_10_tasks_{tasks_amount}_sec_{seconds}_runs_{active_runs}_tasks_{active_tasks}",
    start_date=datetime(2021, 12, 22, 20, 0),
    end_date=datetime(2021, 12, 22, 20, 49),
    schedule_interval=timedelta(minutes=minutes),
    max_active_runs=active_runs,
    max_active_tasks=active_tasks,
    catchup=True,
) as dag:

    @task
    def create_subtasks(seconds: int) -> None:
        time.sleep(seconds)

    for i in range(tasks_amount):
        create_subtasks(seconds)

일정 예약 프로세스에 대한 태스크 분배의 시간별 영향을 평가합니다.

  1. DAG 실행이 완료될 때까지 기다립니다.

  2. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  3. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  4. 로그 탭으로 이동한 후 모든 로그 > Airflow 로그 > 작업자 > 로그 탐색기에서 보기로 이동합니다.

  5. 히스토그램에서 제한된 수의 활성 태스크와 실행을 포함하는 세 번째 DAG가 경고 또는 오류를 생성하지 않았고, 로그 분포가 이전 값보다 훨씬 더 균등해 보이는 것을 확인할 수 있습니다.

    태스크가 통합되고 점차 분배되면서 Airflow 작업자 오류 및 경고 로그의 히스토그램에 오류 또는 경고가 표시되지 않습니다.
    그림 11. 태스크가 통합되고 점차 분배된 후 Airflow 작업자 로그 히스토그램(확대하려면 클릭)

제한된 수의 활성 태스크와 실행을 포함하는 dag_10_tasks_20_seconds_10_scheduled 예시의 태스크는 태스크가 균등하게 큐에 추가되었기 때문에 리소스 압박을 일으키지 않습니다.

설명된 단계를 수행한 후 소규모 태스크를 통합하고 시간 경과에 따라 더 균일하게 분배하여 리소스 사용량을 최적화했습니다.

환경 구성 최적화

큐에 추가된 태스크를 실행할 용량이 Airflow 작업자에 항상 남아 있도록 환경 구성을 조정할 수 있습니다.

작업자 수 및 동시 실행 작업자

Cloud Composer가 설정된 한도 내에서 환경을 자동으로 확장하도록 최대 작업자 수를 조정할 수 있습니다.

[celery]worker_concurrency 매개변수는 단일 작업자가 태스크 큐에서 선택할 수 있는 최대 태스크 수를 정의합니다. 이 매개변수를 변경하면 단일 작업자가 동시에 실행할 수 있는 태스크 수가 조정됩니다. 이 Airflow 구성 옵션을 재정의하여 변경할 수 있습니다. 기본적으로 동시 실행 작업자는 32, 12 * worker_CPU, 8 * worker_memory 중 최소값으로 설정되어 있습니다. 즉, 작업자 리소스 한도에 따라 달라집니다. 기본 동시 실행 작업자 값에 대한 자세한 내용은 환경 최적화를 참조하세요.

작업자 수와 동시 실행 작업자는 서로 함께 작동하며 두 매개변수에 따라 환경 성능이 크게 달라집니다. 다음 고려사항을 통해 올바른 조합을 선택할 수 있습니다.

  • 짧은 태스크 여러 개를 동시에 실행. 큐에서 대기 중인 태스크가 있고 작업자가 동시에 CPU와 메모리를 적게 사용하면 동시 실행 작업자를 늘릴 수 있습니다. 하지만 일부 경우에는 큐가 가득 차지 않고 자동 확장이 트리거되지 않을 수 있습니다. 새 작업자가 준비되었을 때 소규모 태스크가 완료되는 경우 기존 작업자가 나머지 태스크를 선택할 수 있으며 새로 생성된 작업자에 대한 태스크는 없습니다.

    이러한 상황에서는 과도한 확장을 방지하기 위해 최소 작업자 수를 늘리고 동시 실행 작업자를 늘리는 것이 좋습니다.

  • 긴 태스크 여러 개를 동시에 실행. 동시 실행 작업자가 많으면 시스템이 작업자 수를 늘리지 못하게 됩니다. 여러 작업이 리소스 집약적이고 완료하는 데 시간이 오래 걸리는 경우 동시 실행 작업자가 많으면 큐가 가득 차지 않고 한 작업자가 모든 작업을 선택해서 성능 문제가 발생할 수 있습니다. 이러한 경우 최대 작업자 수를 늘리고 동시 실행 작업자를 줄이는 것이 좋습니다.

동시 로드의 중요성

Airflow 스케줄러는 DAG 실행 및 DAG의 개별 태스크 일정을 제어합니다. [core]parallelism Airflow 구성 옵션은 이러한 태스크의 모든 종속 항목이 충족된 후 Airflow 스케줄러가 Executor 큐에서 큐에 추가할 수 있는 태스크 수를 제어합니다.

동시 로드는 작업자 수에 관계없이 각 스케줄러별로 동시에 실행될 수 있는 태스크 수를 결정하는 Airflow의 보호 메커니즘입니다. 동시 로드 값에 클러스터의 스케줄러 수를 곱한 값이 환경에서 큐에 추가할 수 있는 최대 태스크 인스턴스 수입니다.

일반적으로 [core]parallelism은 최대 작업자 수와 [celery]worker_concurrency를 곱한 값으로 설정됩니다. 의 영향도 받습니다. 이 Airflow 구성 옵션을 재정의하여 변경할 수 있습니다. 확장과 관련된 Airflow 구성 조정에 대한 자세한 내용은 Airflow 구성 확장을 참조하세요.

최적의 환경 구성 찾기

예약 문제를 해결하는 데 권장되는 방법은 작은 태스크를 더 큰 태스크에 통합하고 시간 경과에 따라 태스크를 보다 균등하게 분배하는 것입니다. DAG 코드 최적화 외에도 환경 구성 최적화를 통해 여러 태스크를 동시에 실행하기에 충분한 용량을 확보할 수 있습니다.

예를 들어 최대한 많이 DAG의 태스크를 통합하되 시간 경과에 따라 활성 태스크를 보다 균등하게 분배하도록 제한하는 것은 특정 사용 사례에 권장되는 솔루션이 아닙니다.

활성 태스크를 제한하지 않고 동시 로드, 작업자 수, 동시 실행 작업자 매개변수를 조정하여 dag_10_tasks_20_seconds_10 DAG를 실행할 수 있습니다. 이 예시에서 DAG는 10번 실행되고 각 실행에는 20개의 작은 태스크가 포함됩니다. 모두 동시에 실행하려면 다음 안내를 따르세요.

  • 환경의 관리형 Cloud Composer 인프라 성능 매개변수를 제어하므로 환경 크기가 더 커야 합니다.

  • Airflow 작업자가 20개의 태스크를 동시에 실행할 수 있어야 합니다. 즉, 동시 실행 작업자를 20개로 설정해야 합니다.

  • 작업자가 모든 태스크를 처리하기에 충분한 CPU와 메모리가 있어야 합니다. 동시 실행 작업자는 작업자 CPU 및 메모리의 영향을 받으므로 worker_concurrency / 12 이상의 CPU와 least worker_concurrency / 8의 메모리가 필요합니다.

  • 더 많은 동시 실행 작업자에 맞게 동시 로드를 늘려야 합니다. 작업자가 큐에서 20개의 태스크를 선택하려면 스케줄러가 먼저 20개의 태스크를 예약해야 합니다.

다음과 같은 방법으로 환경 구성을 조정합니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. 환경 구성 탭으로 이동합니다.

  4. 리소스 > 워크로드 구성을 찾고 수정을 클릭합니다.

  5. 작업자 섹션의 메모리 필드에서 Airflow 작업자의 새 메모리 한도를 지정합니다. 이 튜토리얼에서는 4GB를 사용합니다.

  6. CPU 필드에 Airflow 작업자의 새 CPU 한도를 지정합니다. 이 튜토리얼에서는 2개의 vCPU를 사용합니다.

  7. 변경사항을 저장하고 Airflow 작업자가 다시 시작될 때까지 몇 분 정도 기다립니다.

그런 다음 동시 로드 및 동시 실행 작업자 Airflow 구성 옵션을 재정의합니다.

  1. Airflow 구성 재정의 탭으로 이동합니다.

  2. 편집을 클릭한 후 Airflow 구성 재정의 추가를 클릭합니다.

  3. 동시 로드 구성을 재정의합니다.

    섹션
    core parallelism 20
  4. Airflow 구성 재정의 추가를 클릭하고 동시 실행 작업자 구성을 재정의합니다.

    섹션
    celery worker_concurrency 20
  5. 저장을 클릭하고 환경에서 구성이 업데이트될 때까지 기다립니다.

조정된 구성을 사용하여 동일한 예시 DAG를 다시 트리거합니다.

  1. Airflow UI에서 DAG 페이지로 이동합니다.

  2. dag_10_tasks_20_seconds_10 DAG를 찾아서 삭제합니다.

    DAG가 삭제되면 Airflow가 환경 버킷의 DAG 폴더를 확인하고 DAG를 자동으로 다시 실행합니다.

DAG 실행이 완료되면 로그 히스토그램을 다시 검토합니다. 다이어그램에서 더 통합된 태스크가 있는 dag_10_tasks_20_seconds_10 예시는 조정된 환경 구성으로 실행할 때 오류 및 경고가 생성되지 않은 것을 확인할 수 있습니다. 이 결과를 다이어그램의 이전 데이터와 비교합니다. 기본 환경 구성으로 실행할 경우 동일한 예시에서 오류 및 경고가 생성되었습니다.

환경 구성이 조정된 후 Airflow 작업자 오류 및 경고 로그의 히스토그램에 오류 및 경고가 표시되지 않습니다.
그림 12. 환경 구성이 조정된 후 Airflow 작업자 로그 히스토그램(확대하려면 클릭)

환경 구성과 Airflow 구성은 태스크 예약에 중요한 역할을 하지만 특정 한도를 초과하여 구성을 늘릴 수는 없습니다.

DAG 코드를 최적화하고, 태스크를 통합하고, 예약을 사용하여 성능 및 효율을 최적화하는 것이 좋습니다.

예: 복잡한 DAG 코드로 인한 DAG 파싱 오류 및 지연 시간

이 예시에서는 과도한 Airflow 변수를 모방하는 샘플 DAG의 파싱 지연 시간을 살펴봅니다.

새 Airflow 변수 만들기

샘플 코드를 업로드하기 전에 새 Airflow 변수를 만듭니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. Airflow 웹 서버 열에서 해당 환경의 Airflow 링크를 따릅니다.

  3. 관리 > 변수 > 새 레코드 추가로 이동합니다.

  4. 다음 값을 설정합니다.

    • key: example_var
    • val: test_airflow_variable

사용자 환경에 샘플 DAG 업로드

다음 샘플 DAG를 이전 단계에서 만든 환경에 업로드합니다. 이 튜토리얼에서 이 DAG의 이름은 dag_for_loop_airflow_variable입니다.

이 DAG에는 1,000번 실행되고 과도한 Airflow 변수를 모방하는 for 루프가 포함되어 있습니다. 반복할 때마다 example_var 변수가 읽히고 태스크가 생성됩니다. 각 태스크에는 변수 값을 출력하는 명령어 1개가 포함되어 있습니다.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable',
    default_args=default_args,
    catchup=False,
    schedule_interval="@daily"
)

for i in range(1000):
    a = Variable.get('example_var', 'N/A')
    task = BashOperator(
        task_id=f'task_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': a}
    )

파싱 문제 진단

DAG 파싱 시간은 Airflow Scheduler가 DAG 파일을 읽고 이를 파싱하는 데 걸리는 시간입니다. Airflow Scheduler가 DAG에서 태스크를 예약할 수 있도록 Scheduler가 DAG 파일을 파싱하여 DAG 및 정의된 태스크의 구조를 확인해야 합니다.

DAG가 파싱하는 데 시간이 오래 걸리면 스케줄러의 용량이 소비되고 DAG 실행 성능이 저하될 수 있습니다.

DAG 파싱 시간을 모니터링하려면 다음 안내를 따르세요.

  1. gcloud CLI에서 dags report Airflow CLI 명령어를 실행하여 모든 DAG의 파싱 시간을 확인합니다.

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        dags report
    

    다음을 바꿉니다.

    • ENVIRONMENT_NAME: 환경의 이름
    • LOCATION: 환경이 위치한 리전
  2. 명령어 출력에서 dag_for_loop_airflow_variables DAG의 기간 값을 찾습니다. 큰 값은 이 DAG가 최적의 방식으로 구현되지 않았음을 나타낼 수 있습니다. 출력 테이블에서 DAG가 여러 개이면 파싱 시간이 긴 DAG를 식별할 수 있습니다.

    예:

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:14.773594 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /airflow_monitoring | 0:00:00.003035 | 1       | 1        | airflow_monitoring
    .py
    
    
  3. Google Cloud 콘솔에서 DAG 파싱 시간을 살펴봅니다.

    1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  4. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  5. 로그 탭으로 이동한 후 모든 로그 > DAG 프로세서 관리자로 이동합니다.

  6. dag-processor-manager 로그를 검토하고 발생 가능한 문제를 식별합니다.

    DAG 파싱 시간이 46.3초임을 보여주는 샘플 DAG의 로그 항목
    그림 13. DAG 파싱 시간을 보여주는 DAG 프로세서 관리자 로그(확대하려면 클릭)

총 DAG 파싱 시간이 약 10초를 초과하면 스케줄러가 DAG 파싱으로 과부하되어 DAG를 효과적으로 실행할 수 없습니다.

DAG 코드 최적화

DAG에서 불필요한 '최상위' Python 코드는 피하는 것이 좋습니다. DAG 외부의 가져오기, 변수, 함수가 많은 DAG는 Airflow 스케줄러의 파싱 시간을 늘립니다. 따라서 Cloud Composer 및 Airflow의 성능과 확장성이 줄어듭니다. 과도한 Airflow 변수를 읽으면 파싱 시간이 길어지고 데이터베이스 부하가 커집니다. 이 코드가 DAG 파일에 있을 경우 모든 스케줄러 하트비트에서 이러한 함수가 실행되어 속도가 느려질 수 있습니다.

Airflow의 템플릿 필드를 사용하여 Airflow 변수 및 Jinja 템플릿의 값을 DAG에 통합할 수 있습니다. 이렇게 하면 스케줄러 하트비트 중에 불필요한 함수 실행이 방지됩니다.

DAG 예시를 보다 나은 방식으로 구현하려면 DAG의 최상위 Python 코드에서 Airflow 변수를 사용하지 마세요. 대신 Jinja 템플릿을 통해 Airflow 변수를 기존 연산자에 전달합니다. 그러면 태스크가 실행될 때까지 값 읽기가 지연됩니다.

샘플 DAG의 새 버전을 환경에 업로드합니다. 이 튜토리얼에서 이 DAG의 이름은 dag_for_loop_airflow_variable_optimized입니다.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 2, 17),
    'retries': 0
}

dag = DAG(
    'dag_for_loop_airflow_variable_optimized',
    default_args=default_args,
    catchup=False,
    schedule_interval='@daily'
)

for i in range(1000):
    task = BashOperator(
        task_id=f'bash_use_variable_good_{i}',
        bash_command='echo variable foo=${foo_env}',
        dag=dag,
        env={'foo_env': '{{ var.value.get("example_var") }}'},
    )

새 DAG 파싱 시간을 살펴봅니다.

  1. DAG 실행이 완료될 때까지 기다립니다.

  2. dags report 명령어를 다시 실행하여 모든 DAG의 파싱 시간을 확인합니다.

    file                | duration       | dag_num | task_num | dags
    ====================+================+=========+==========+=====================
    /dag_for_loop_airfl | 0:00:37.000369 | 1       | 1000     | dag_for_loop_airflow
    ow_variable.py      |                |         |          | _variable
    /dag_for_loop_airfl | 0:00:01.109457 | 1       | 1000     | dag_for_loop_airflow
    ow_variable_optimiz |                |         |          | _variable_optimized
    ed.py               |                |         |          |
    /airflow_monitoring | 0:00:00.040510 | 1       | 1        | airflow_monitoring
    .py                 |                |         |          |
    
  3. dag-processor-manager 로그를 다시 검토하고 파싱 기간을 분석합니다.

    DAG 파싱 시간이 4.21초임을 보여주는 샘플 DAG의 로그 항목
    그림 14. DAG 코드가 최적화된 후 DAG 파싱 시간을 보여주는 DAG 프로세서 관리자 로그(확대하려면 클릭)

환경 변수를 Airflow 템플릿으로 바꿔 DAG 코드를 단순화하고 파싱 지연 시간을 약 10배 줄였습니다.

Airflow 환경 구성 최적화

Airflow 스케줄러는 지속적으로 새 태스크를 트리거하고 환경 버킷의 모든 DAG를 파싱합니다. DAG의 파싱 시간이 길고 스케줄러의 리소스 소비량이 많은 경우, 스케줄러가 리소스를 보다 효율적으로 사용할 수 있도록 Airflow 스케줄러 구성을 최적화할 수 있습니다.

이 튜토리얼에서는 DAG 파일을 파싱하는 데 시간이 오래 걸리고 파싱 주기가 겹치기 시작하므로 스케줄러의 용량이 소진됩니다. 이 예시의 첫 번째 예시 DAG는 파싱하는 데 5초 넘게 걸리므로 스케줄러가 리소스를 보다 효율적으로 사용하도록 실행 빈도를 더 낮게 구성합니다. scheduler_heartbeat_sec Airflow 구성 옵션을 재정의합니다. 이 구성은 스케줄러가 실행되는 빈도(초)를 정의합니다. 기본적으로 이 값은 5초로 설정됩니다. 이 Airflow 구성 옵션을 재정의하여 변경할 수 있습니다.

scheduler_heartbeat_sec Airflow 구성 옵션을 재정의합니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. Airflow 구성 재정의 탭으로 이동합니다.

  4. 편집을 클릭한 후 Airflow 구성 재정의 추가를 클릭합니다.

  5. Airflow 구성 옵션을 재정의합니다.

    섹션
    scheduler scheduler_heartbeat_sec 10
  6. 저장을 클릭하고 환경에서 구성이 업데이트될 때까지 기다립니다.

스케줄러 측정항목을 확인합니다.

  1. Monitoring 탭으로 이동하고 스케줄러를 선택합니다.

  2. 스케줄러 하트비트 그래프에서 옵션 더보기 버튼(점 3개)을 클릭한 후 측정항목 탐색기에서 보기를 클릭합니다.

하트비트가 덜 자주 발생함을 보여주는 스케줄러 하트비트 그래프
그림 15. 스케줄러 하트비트 그래프(확대하려면 클릭)

그래프에서 기본 구성을 5초에서 10초로 변경하면 스케줄러가 실행 빈도가 두 배 줄어듭니다. 하트비트 빈도를 줄이면 이전 파싱 주기가 진행되는 동안 스케줄러가 실행되지 않고 스케줄러의 리소스 용량이 소진되지 않습니다.

스케줄러에 리소스 추가 할당

Cloud Composer 2에서는 스케줄러에 더 많은 CPU 및 메모리 리소스를 할당할 수 있습니다. 이러한 방식으로 스케줄러의 성능을 높이고 DAG의 파싱 시간을 단축할 수 있습니다.

스케줄러에 추가 CPU 및 메모리를 할당합니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. 환경 구성 탭으로 이동합니다.

  4. 리소스 > 워크로드 구성을 찾고 수정을 클릭합니다.

  5. 스케줄러 섹션의 메모리 필드에 새 메모리 한도를 지정합니다. 이 튜토리얼에서는 4GB를 사용합니다.

  6. CPU 필드에 새 CPU 한도를 지정합니다. 이 튜토리얼에서는 2개의 vCPU를 사용합니다.

  7. 변경사항을 저장하고 Airflow 스케줄러가 다시 시작될 때까지 몇 분 정도 기다립니다.

  8. 로그 탭으로 이동한 후 모든 로그 > DAG 프로세서 관리자로 이동합니다.

  9. dag-processor-manager 로그를 검토하고 예시 DAG의 파싱 기간을 비교합니다.

    최적화된 DAG의 DAG 파싱 시간이 1.5초임을 보여주는 샘플 DAG의 로그 항목 최적화되지 않은 DAG의 파싱 시간은 28.71초입니다.
    그림 16. 스케줄러에 더 많은 리소스가 할당된 후 DAG 파싱 시간을 보여주는 DAG 프로세서 관리자 로그(확대하려면 클릭)

스케줄러에 리소스를 더 많이 할당하여 스케줄러의 용량을 늘리고 파싱 지연 시간을 기본 환경 구성에 비해 크게 단축했습니다. 더 많은 리소스를 사용하면 스케줄러가 DAG를 더 빠르게 파싱할 수 있지만 Cloud Composer 리소스와 관련된 비용도 증가합니다. 또한 특정 한도를 초과하여 리소스를 늘릴 수 없습니다.

가능한 DAG 코드 및 Airflow 구성 최적화가 구현된 후에만 리소스를 할당하는 것이 좋습니다.

삭제

이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트는 유지하되 개별 리소스를 삭제하세요.

프로젝트 삭제

  1. Google Cloud 콘솔에서 리소스 관리 페이지로 이동합니다.

    리소스 관리로 이동

  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력한 후 종료를 클릭하여 프로젝트를 삭제합니다.

개별 리소스 삭제

여러 튜토리얼과 빠른 시작을 살펴보려는 경우 프로젝트를 재사용하면 프로젝트 할당량 한도 초과를 방지할 수 있습니다.

Cloud Composer 환경을 삭제합니다. 이 절차 동안의 환경 버킷도 삭제합니다.

다음 단계