Airflow 2로 환경 마이그레이션

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 DAG, 데이터, 구성을 기존 Airflow 1.10.* 환경에서 Airflow 버전 2 이상 환경으로 전송하는 방법을 설명합니다.

기타 마이그레이션 가이드

보낸사람 받는사람 메서드 가이드
Cloud Composer 1, Airflow 2 Cloud Composer 2, Airflow 2 스냅샷을 사용하여 단계별 마이그레이션 마이그레이션 가이드(스냅샷)
Cloud Composer 1, Airflow 1 Cloud Composer 2, Airflow 2 스냅샷을 사용하여 단계별 마이그레이션 마이그레이션 가이드(스냅샷)
Cloud Composer 1, Airflow 2 Cloud Composer 2, Airflow 2 단계별 수동 전송 수동 마이그레이션 가이드
Cloud Composer 1, Airflow 1 Cloud Composer 2, Airflow 2 단계별 수동 전송 수동 마이그레이션 가이드
Airflow 1 Airflow 2 단계별 수동 전송 이 가이드(수동 마이그레이션)

단계별 업그레이드

Cloud Composer는 Cloud Composer 데이터베이스 전송 스크립트를 제공하여 메타데이터 데이터베이스, DAG, 데이터, 플러그인을 Airflow 1.10.14 및 Airflow 1.10.15가 있는 Cloud Composer 환경에서 Airflow 2.0.1 이상의 Airflow 버전을 사용하는 기존 Cloud Composer 환경으로 마이그레이션합니다.

이 방법은 이 가이드에서 설명하는 방법의 대체 방법입니다. 이 가이드의 일부 내용은 제공된 스크립트를 사용할 때 적용됩니다. 예를 들어 마이그레이션하기 전 DAG에서 Airflow 2와의 호환성을 확인하거나 동시 DAG 실행이 발생하지 않는지 그리고 추가 또는 누락된 DAG 실행이 없는지 확인해야 할 수 있습니다.

시작하기 전에

Airflow 2로 Cloud Composer 환경을 시작하기 전 Airflow 2로 Cloud Composer 환경에 적용되는 변경사항을 고려합니다.

Scheduler HA

환경에서 Airflow 스케줄러를 2개 이상 사용할 수 있습니다. 환경을 만들 때 또는 기존 환경을 업데이트할 때 스케줄러 수를 설정할 수 있습니다.

Celery+Kubernetes Executor

Airflow 2 Celery+Kubernetes Executor는 Cloud Composer 3에서 지원됩니다.

브레이킹 체인지

Airflow 2에는 다음과 같은 주요 변경사항이 도입되었습니다.

  • Airflow 1.10.* 의 기존 DAG가 Airflow 2에서 작동된다는 보장이 없습니다. 테스트가 필요하며 조정해야 할 수 있습니다.
  • 연산자, 전송, 후크가 공급자 패키지로 이전됩니다. DAG의 가져오기 문이 새 공급자 패키지를 사용해야 합니다. Airflow 2에서 이전의 가져오기 문이 더 이상 작동하지 않을 수 있습니다.
  • Airflow 2에서 더 이상 지원되지 않는 구성 옵션이 몇 가지 있기 때문에 일부 Airflow 1.10.* 구성이 지원되지 않을 수 있습니다.
  • 일부 커스텀 PyPI 패키지는 새 버전의 Airflow 또는 Python과 호환되지 않을 수 있습니다.
  • 액세스 제어가 포함된 Airflow UI는 기본 Airflow 2 UI입니다. Airflow 2가 다른 Airflow UI 유형을 지원하지 않습니다.
  • 실험용 REST API가 안정적인 Airflow API로 대체되었습니다. 실험용 REST API가 Airflow 2에서 기본적으로 사용 중지되었습니다.
  • Airflow 2.0.0의 기타 주요 변경사항
  • Airflow 2.0.1의 기타 주요 변경사항

Airflow 2 및 Airflow 1.10.* 환경의 차이점

Airflow 1.10.* 을 사용하는 Cloud Composer 환경과 Airflow 2 환경의 주요 차이점:

  • Airflow 2 환경에 Python 3.8이 사용됩니다. 이 버전은 Airflow 1.10.* 환경에 사용된 것보다 최신 버전입니다. Python 2, Python 3.6, Python 3.7은 지원되지 않습니다.
  • Airflow 2에는 다른 CLI 형식이 사용됩니다. Cloud Composer는 gcloud composer environments run 명령어를 통해 Airflow 2 사용 환경에서 새 형식을 지원합니다.
  • Airflow 2 환경에서는 사전 설치된 PyPI 패키지가 다릅니다. 사전 설치된 PyPI 패키지 목록은 Cloud Composer 버전 목록을 참조하세요.
  • Airflow 2에서는 DAG 직렬화가 항상 사용 설정됩니다. 따라서 비동기 DAG 로드가 더 이상 필요하지 않으며 Airflow 2에서 지원되지 않습니다. 따라서 Airflow 2에 대해 [core]store_serialized_dags[core]store_dag_code 매개변수 구성이 지원되지 않으며 이를 설정하려고 시도하면 오류가 보고됩니다.
  • Airflow 웹 서버 플러그인이 지원되지 않습니다. 이는 Airflow 연산자와 센서를 포함하여 스케줄러 또는 작업자 플러그인에 영향을 주지 않습니다.
  • Airflow 2 환경에서 기본 Airflow 사용자 역할Op입니다. Airflow 1.10.* 사용 환경에서 기본 역할은 Admin입니다.

1단계: Airflow 2 호환성 확인

Airflow 2와의 잠재적 충돌을 확인하려면 기존 Airflow 1.10.* 환경에서 Airflow가 제공하는 업그레이드 확인 스크립트를 사용합니다.

gcloud

  1. 환경에 Airflow 1.10.14 이하 버전이 사용되는 경우 Airflow 1.10.15 이상을 사용하는 Cloud Composer 버전으로 환경을 업그레이드합니다. Cloud Composer는 Airflow 1.10.15부터 시작하여 업그레이드 확인 명령어를 지원합니다.

  2. gcloud composer environments run 명령어로 업그레이드 확인을 실행합니다. 독립형 Airflow 1.10.15와 관련된 일부 업그레이드 확인은 Cloud Composer와 관련이 없습니다. 다음 명령어는 이러한 확인 작업을 제외합니다.

    gcloud composer environments run \
        AIRFLOW_1_ENV  \
        --location=AIRFLOW_1_LOCATION \
        upgrade_check \
        -- --ignore VersionCheckRule --ignore LoggingConfigurationRule \
        --ignore PodTemplateFileRule --ignore SendGridEmailerMovedRule
    

    다음과 같이 바꿉니다.

    • AIRFLOW_1_ENV: Airflow 1.10.* 환경의 이름입니다.
    • AIRFLOW_1_LOCATION을 환경이 위치한 리전으로 바꿉니다.
  3. 명령어 결과를 확인합니다. 업데이트 확인 스크립트는 기존 환경에서 발생 가능한 호환성 문제를 보고합니다.

  4. Airflow 2.0 이상으로 업그레이드 가이드의 DAG 업그레이드 섹션에 설명된 대로 DAG에 다른 변경사항을 구현합니다.

2단계: Airflow 2 환경 만들기와 구성 재정의 및 환경 변수 전송

Airflow 2 환경을 만들고 구성 재정의 및 환경 변수를 전송합니다.

  1. 환경 만들기 단계를 따릅니다. 또한 환경을 만들려면 먼저 아래 설명에 따라 구성 재정의 및 환경 변수를 지정합니다.

  2. 이미지를 선택할 때 Airflow 2가 사용된 이미지를 선택합니다.

  3. Airflow 1.10.* 환경에서 새로운 Airflow 2 환경으로 구성 매개변수를 수동으로 전송합니다.

    Console

    1. 환경을 만들 때 네트워킹, Airflow 구성 재정의, 추가 기능 섹션을 확장합니다.

    2. Airflow 구성 재정의에서 Airflow 구성 재정의 추가를 클릭합니다.

    3. Airflow 1.10.* 환경에서 모든 구성 재정의를 복사합니다.

      일부 구성 옵션에는 Airflow 2의 다른 이름 및 섹션이 사용됩니다. 자세한 내용은 구성 변경사항을 참조하세요.

    4. 환경 변수 아래에서 변수 추가를 클릭합니다.

    5. Airflow 1.10.* 환경의 모든 환경 변수를 복사합니다.

    6. 만들기를 클릭하여 환경을 만듭니다.

3단계: Airflow 2 환경에 PyPI 패키지 설치

Airflow 2 환경이 생성되면 여기에 PyPI 패키지를 설치합니다.

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. Airflow 2 환경을 선택합니다.

  3. PyPI 패키지 탭으로 이동하여 편집을 클릭합니다.

  4. Airflow 1.10.* 환경에서 PyPI 패키지 요구사항을 복사합니다. 저장을 클릭하고 환경이 업데이트될 때까지 기다립니다.

    Airflow 2 환경에 다른 사전 설치된 패키지 집합과 다른 Python 버전이 사용되기 때문에 해결하기 어려운 PyPI 패키지 충돌이 발생할 수 있습니다. 패키지 종속성 문제를 진단하기 위한 한 가지 방법은 Airflow 작업자 포드에 패키지를 설치하여 PyPI 패키지 오류를 확인하는 것입니다.

4단계: Airflow 2로 변수 및 풀 전송

Airflow 1.10.*에서는 변수 및 풀을 JSON 파일로 내보낼 수 있습니다. 그런 다음 이 파일을 Airflow 2 환경으로 가져오면 됩니다.

default_pool 이외의 커스텀 풀이 있다면 풀만 전송하면 됩니다. 그 밖의 경우에는 풀을 내보내고 가져오는 명령어를 건너뜁니다.

gcloud

  1. Airflow 1.10.* 환경에서 변수를 내보냅니다.

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    다음과 같이 바꿉니다.

    • AIRFLOW_1_ENV: Airflow 1.10.* 환경의 이름입니다.
    • AIRFLOW_1_LOCATION을 환경이 위치한 리전으로 바꿉니다.
  2. Airflow 1.10.* 환경에서 풀을 내보냅니다.

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. Airflow 2 환경 버킷 URI를 가져옵니다.

    1. 다음 명령어를 실행합니다.

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      다음과 같이 바꿉니다.

      • AIRFLOW_2_ENV: Airflow 2 환경의 이름입니다.
      • AIRFLOW_2_LOCATION을 환경이 위치한 리전으로 바꿉니다.
    2. 출력에서 /dags 폴더를 삭제합니다. 결과는 Airflow 2 환경의 버킷의 URI입니다.

      예를 들어 gs://us-central1-example-916807e1-bucket/dagsgs://us-central1-example-916807e1-bucket으로 변경합니다.

  4. 변수와 풀을 포함한 JSON 파일을 Airflow 2 환경으로 전송합니다.

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    AIRFLOW_2_BUCKET을 이전 단계에서 가져온 Airflow 2 환경 버킷의 URI로 바꿉니다.

  5. 변수 및 풀을 Airflow 2로 가져옵니다.

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. 변수 및 풀을 가져왔는지 확인합니다.

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. 버킷에서 JSON 파일을 삭제합니다.

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

5단계: Airflow 1.10.* 환경 버킷에서 다른 데이터 전송

gcloud

  1. 플러그인을 Airflow 2 환경으로 전송합니다. 이를 위해 Airflow 1.10.* 환경 버킷에서 Airflow 2 환경 버킷의 /plugins 폴더로 플러그인을 내보냅니다.

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. /plugins 폴더를 성공적으로 가져왔는지 확인합니다.

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. /data 폴더를 Airflow 1.10.* 환경에서 Airflow 2 환경으로 내보냅니다.

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. /data 폴더를 성공적으로 가져왔는지 확인합니다.

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

6단계: 연결 및 사용자 전송

Airflow 1.10.*은 사용자 및 연결 내보내기를 지원하지 않습니다. 사용자 및 연결을 전송하려면 Airflow 2 환경에서 새 사용자 계정 및 연결을 수동으로 만듭니다.

gcloud

  1. Airflow 1.10.* 환경에서 연결 목록을 가져오려면 다음을 실행합니다.

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. Airflow 2 환경에서 새 연결을 만들려면 gcloud를 통해 connections Airflow CLI 명령어를 실행합니다. 예를 들면 다음과 같습니다.

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. Airflow 1.10.* 환경에서 사용자 목록을 확인하려면 다음 안내를 따르세요.

    1. Airflow 1.10.* 환경에 대해 Airflow 웹 인터페이스를 엽니다.

    2. 관리자 > 사용자로 이동합니다.

  4. Airflow 2 환경에서 새 사용자 계정을 만들려면 gcloud를 통해 users create Airflow CLI 명령어를 실행합니다. 예를 들면 다음과 같습니다.

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Admin
    

7단계: DAG가 Airflow 2에 맞게 준비되었는지 확인

DAG를 Airflow 2 환경으로 전송하기 전 다음을 확인합니다.

  1. DAG의 업그레이드 확인 스크립트가 성공적으로 실행되었고 남아 있는 호환성 문제가 없습니다.

  2. DAG에서 올바른 가져오기 문을 사용합니다.

    예를 들어 BigQueryCreateDataTransferOperator의 새 가져오기 문은 다음과 같습니다.

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. DAG가 Airflow 2에 맞게 업그레이드되었습니다. 이 변경사항은 Airflow 1.10.14 이상 버전과 호환됩니다.

8단계: DAG를 Airflow 2 환경으로 전송

환경 간에 DAG를 전송할 때는 다음과 같은 잠재적 문제가 발생할 수 있습니다.

  • 두 환경 모두에 DAG가 사용 설정(일시중지되지 않음)된 경우 각 환경은 예약된 일정에 따라 자체 DAG 사본을 실행합니다. 그로 인해 동일한 데이터 및 실행 시간에 동시 DAG 실행이 발생할 수 있습니다.

  • DAG 포착 때문에 Airflow에서 DAG에 지정된 시작 날짜부터 시작되는 추가 DAG 실행을 예약합니다. 새 Airflow 인스턴스가 1.10.* 환경의 DAG 실행 기록을 고려하지 않기 때문입니다. 이로 인해 지정된 시작 날짜부터 시작되는 다수의 DAG 실행 작업이 예약될 수 있습니다.

동시 DAG 실행 방지

Airflow 2 환경에서 dags_are_paused_at_creation Airflow 구성 옵션을 재정의합니다. 이 변경 후에는 모든 새로운 DAG가 기본적으로 일시중지됩니다.

섹션
core dags_are_paused_at_creation True

추가 또는 누락 DAG 실행 방지

Airflow 2 환경으로 전송하는 DAG에 새 정적 시작 날짜를 지정합니다.

실행 날짜의 차이 및 중복을 막으려면 Airflow 2 환경에서 첫 번째 DAG 실행이 일정 간격의 다음 어커런스에 이루어져야 합니다. 이를 위해서는 DAG의 새 시작 날짜를 Airflow 1.10.* 환경의 마지막 실행 날짜 이전으로 설정합니다.

예를 들어 DAG가 Airflow 1.10.* 환경에서 매일 15:00, 17:00, 21:00에 실행하면 마지막 DAG 실행이 15:00에 수행되며 DAG를 15:15에 전송하면 Airflow 2 환경의 시작 날짜는 오늘 14:45일 수 있습니다. Airflow 2 환경에서 DAG를 사용 설정하면 Airflow는 DAG 실행을 17:00으로 예약합니다.

또 다른 예시로, DAG가 Airflow 1.10.* 환경에서 매일 00:00에 실행되는 경우, DAG 실행이 2021년 4월 26일 00:00에 수행되었고, 2021년 4월 26일 13:00에 DAG 전송을 계획한 경우 Airflow 2 환경의 시작 날짜는 2021년 4월 25일 23:45일 수 있습니다. Airflow 2 환경에서 DAG를 사용 설정한 다음 Airflow는 2021년 4월 27일 00:00에 DAG 실행을 예약합니다.

DAG를 하나씩 Airflow 2 환경으로 전송

각 DAG를 다음 절차에 따라 전송합니다.

  1. DAG의 새 시작 날짜가 이전 섹션에서 설명한 대로 설정되었는지 확인합니다.

  2. Airflow 2 환경으로 업데이트된 DAG를 업로드합니다. 구성 재정의로 인해 이 DAG가 Airflow 2 환경에서 일시 중지되므로, DAG 실행이 아직 예약되지 않았습니다.

  3. Airflow 웹 인터페이스에서 DAG로 이동하여 보고된 DAG 구문 오류를 확인합니다.

  4. DAG를 전송할 계획인 경우:

    1. Airflow 1.10.* 환경에서 DAG를 일시중지합니다.

    2. Airflow 2 환경에서 DAG의 일시중지를 해제합니다.

    3. 새 DAG 실행이 올바른 시간에 예약되었는지 확인합니다.

    4. Airflow 2 환경에서 DAG 실행이 이루어질 때까지 기다린 후 실행이 성공했는지 확인합니다.

  5. DAG 실행 성공 여부에 따라 다음을 수행합니다.

    • DAG 실행이 성공하면 계속 진행하여 Airflow 2 환경에서 DAG를 사용할 수 있습니다. Airflow 1.10.* 버전 DAG는 삭제하는 것이 좋습니다.

    • DAG 실행이 실패하면 Airflow 2에서 성공적으로 실행될 때까지 DAG 문제 해결을 시도합니다.

      필요한 경우 언제든지 Airflow 1.10.* 버전의 DAG로 돌아갈 수 있습니다.

      1. Airflow 2 환경에서 DAG를 일시중지합니다.

      2. Airflow 1.10.* 환경에서 DAG 일시중지를 취소합니다. 그러면 실패한 DAG 실행과 동일한 날짜 및 시간에 새 DAG 실행이 예약됩니다.

      3. Airflow 2 버전의 DAG를 계속 진행할 준비가 되면 시작 날짜를 조정하고 새 버전의 DAG를 Airflow 2 환경에 업로드한 후 절차를 반복합니다.

9단계: Airflow 2 환경 모니터링

모든 DAG 및 구성을 Airflow 2 환경으로 전송한 후 잠재적 문제, 실패한 DAG 실행, 전반적인 환경 상태를 모니터링합니다. Airflow 2 환경이 충분한 시간 동안 문제없이 실행되면 Airflow 1.10.* 환경을 삭제해도 됩니다.

다음 단계