Airflow 2로 환경 마이그레이션

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 DAG, 데이터, 구성을 기존 Airflow 1.10.* 환경에서 Airflow 버전 2 이상 환경으로 전송하는 방법을 설명합니다.

기타 마이그레이션 가이드

마이그레이션 이전 마이그레이션 이후 메서드 가이드
Cloud Composer 1, Airflow 2 Cloud Composer 2, Airflow 2 스냅샷을 사용하여 단계별 마이그레이션 마이그레이션 가이드(스냅샷)
Cloud Composer 1, Airflow 1 Cloud Composer 2, Airflow 2 스냅샷을 사용하여 단계별 마이그레이션 마이그레이션 가이드(스냅샷)
Cloud Composer 1, Airflow 2 Cloud Composer 2, Airflow 2 단계별 수동 전송 수동 마이그레이션 가이드
Cloud Composer 1, Airflow 1 Cloud Composer 2, Airflow 2 단계별 수동 전송 수동 마이그레이션 가이드
Airflow 1 Airflow 2 단계별 수동 전송 이 가이드(수동 마이그레이션)

단계별 업그레이드

Cloud Composer는 Cloud Composer 데이터베이스 전송 스크립트를 제공하여 메타데이터 데이터베이스, DAG, 데이터, 플러그인을 Airflow 1.10.14 및 Airflow 1.10.15가 있는 Cloud Composer 환경에서 Airflow 2.0.1 이상의 Airflow 버전을 사용하는 기존 Cloud Composer 환경으로 마이그레이션합니다.

이 방법은 이 가이드에서 설명하는 방법의 대체 방법입니다. 이 가이드의 일부 내용은 제공된 스크립트를 사용할 때 적용됩니다. 예를 들어 마이그레이션하기 전 DAG에서 Airflow 2와의 호환성을 확인하거나 동시 DAG 실행이 발생하지 않는지 그리고 추가 또는 누락된 DAG 실행이 없는지 확인해야 할 수 있습니다.

시작하기 전에

Airflow 2로 Cloud Composer 환경을 시작하기 전 Airflow 2로 Cloud Composer 환경에 적용되는 변경사항을 고려합니다.

Scheduler HA

환경에서 Airflow 스케줄러를 2개 이상 사용할 수 있습니다. 환경을 만들 때 또는 기존 환경을 업데이트할 때 스케줄러 수를 설정할 수 있습니다.

Celery+Kubernetes Executor

Airflow 2 Celery+Kubernetes Executor는 Cloud Composer 3에서 지원됩니다.

브레이킹 체인지

Airflow 2에는 다음과 같은 주요 변경사항이 도입되었습니다.

  • Airflow 1.10.* 의 기존 DAG가 Airflow 2에서 작동된다는 보장이 없습니다. 테스트가 필요하며 조정해야 할 수 있습니다.
  • 연산자, 전송, 후크가 공급자 패키지로 이전됩니다. DAG의 가져오기 문이 새 공급자 패키지를 사용해야 합니다. Airflow 2에서 이전의 가져오기 문이 더 이상 작동하지 않을 수 있습니다.
  • Airflow 2에서 더 이상 지원되지 않는 구성 옵션이 몇 가지 있기 때문에 일부 Airflow 1.10.* 구성이 지원되지 않을 수 있습니다.
  • 일부 커스텀 PyPI 패키지는 새 버전의 Airflow 또는 Python과 호환되지 않을 수 있습니다.
  • 액세스 제어가 포함된 Airflow UI는 기본 Airflow 2 UI입니다. Airflow 2가 다른 Airflow UI 유형을 지원하지 않습니다.
  • 실험용 REST API가 안정적인 Airflow API로 대체되었습니다. 실험용 REST API가 Airflow 2에서 기본적으로 사용 중지되었습니다.
  • Airflow 2.0.0의 기타 주요 변경사항
  • Airflow 2.0.1의 기타 주요 변경사항

Airflow 2 및 Airflow 1.10.* 환경의 차이점

Airflow 1.10.* 을 사용하는 Cloud Composer 환경과 Airflow 2 환경의 주요 차이점:

  • Airflow 2 환경에 Python 3.8이 사용됩니다. 이 버전은 Airflow 1.10.* 환경에 사용된 것보다 최신 버전입니다. Python 2, Python 3.6, Python 3.7은 지원되지 않습니다.
  • Airflow 2에는 다른 CLI 형식이 사용됩니다. Cloud Composer는 gcloud composer environments run 명령어를 통해 Airflow 2 사용 환경에서 새 형식을 지원합니다.
  • Airflow 2 환경에서는 사전 설치된 PyPI 패키지가 다릅니다. 사전 설치된 PyPI 패키지 목록은 Cloud Composer 버전 목록을 참조하세요.
  • Airflow 2에서는 DAG 직렬화가 항상 사용 설정됩니다. 따라서 비동기 DAG 로드가 더 이상 필요하지 않으며 Airflow 2에서 지원되지 않습니다. 따라서 Airflow 2에 대해 [core]store_serialized_dags[core]store_dag_code 매개변수 구성이 지원되지 않으며 이를 설정하려고 시도하면 오류가 보고됩니다.
  • Airflow 웹 서버 플러그인이 지원되지 않습니다. 이는 Airflow 연산자와 센서를 포함하여 스케줄러 또는 작업자 플러그인에 영향을 주지 않습니다.
  • Airflow 2 환경에서 기본 Airflow 사용자 역할Op입니다. Airflow 1.10.* 사용 환경에서 기본 역할은 Admin입니다.

1단계: Airflow 2 호환성 확인

Airflow 2와의 잠재적 충돌을 확인하려면 Airflow 2.0 이상으로 업그레이드 가이드의 DAG 업그레이드 섹션을 참조하세요.

발생할 수 있는 일반적인 문제 중에 호환되지 않는 가져오기 경로와 관련된 문제가 있습니다. 이 호환성 문제를 해결하는 방법에 대한 자세한 내용은 Airflow 2.0 이상으로 업그레이드 가이드의 백포트 제공업체 섹션을 참조하세요.

2단계: Airflow 2 환경 만들기와 구성 재정의 및 환경 변수 전송

Airflow 2 환경을 만들고 구성 재정의 및 환경 변수를 전송합니다.

  1. 환경 만들기 단계를 따릅니다. 또한 환경을 만들려면 먼저 아래 설명에 따라 구성 재정의 및 환경 변수를 지정합니다.

  2. 이미지를 선택할 때 Airflow 2가 사용된 이미지를 선택합니다.

  3. Airflow 1.10.* 환경에서 새로운 Airflow 2 환경으로 구성 매개변수를 수동으로 전송합니다.

    Console

    1. 환경을 만들 때 네트워킹, Airflow 구성 재정의, 추가 기능 섹션을 확장합니다.

    2. Airflow 구성 재정의에서 Airflow 구성 재정의 추가를 클릭합니다.

    3. Airflow 1.10.* 환경에서 모든 구성 재정의를 복사합니다.

      일부 구성 옵션에는 Airflow 2의 다른 이름 및 섹션이 사용됩니다. 자세한 내용은 구성 변경사항을 참조하세요.

    4. 환경 변수 아래에서 변수 추가를 클릭합니다.

    5. Airflow 1.10.* 환경의 모든 환경 변수를 복사합니다.

    6. 만들기를 클릭하여 환경을 만듭니다.

3단계: Airflow 2 환경에 PyPI 패키지 설치

Airflow 2 환경이 생성되면 여기에 PyPI 패키지를 설치합니다.

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. Airflow 2 환경을 선택합니다.

  3. PyPI 패키지 탭으로 이동하여 편집을 클릭합니다.

  4. Airflow 1.10.* 환경에서 PyPI 패키지 요구사항을 복사합니다. 저장을 클릭하고 환경이 업데이트될 때까지 기다립니다.

    Airflow 2 환경에 다른 사전 설치된 패키지 집합과 다른 Python 버전이 사용되기 때문에 해결하기 어려운 PyPI 패키지 충돌이 발생할 수 있습니다. 패키지 종속성 문제를 진단하기 위한 한 가지 방법은 Airflow 작업자 포드에 패키지를 설치하여 PyPI 패키지 오류를 확인하는 것입니다.

4단계: Airflow 2로 변수 및 풀 전송

Airflow 1.10.*에서는 변수 및 풀을 JSON 파일로 내보낼 수 있습니다. 그런 다음 이 파일을 Airflow 2 환경으로 가져오면 됩니다.

default_pool 이외의 커스텀 풀이 있다면 풀만 전송하면 됩니다. 그 밖의 경우에는 풀을 내보내고 가져오는 명령어를 건너뜁니다.

gcloud

  1. Airflow 1.10.* 환경에서 변수를 내보냅니다.

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    다음과 같이 바꿉니다.

    • AIRFLOW_1_ENV: Airflow 1.10.* 환경의 이름입니다.
    • AIRFLOW_1_LOCATION을 환경이 위치한 리전으로 바꿉니다.
  2. Airflow 1.10.* 환경에서 풀을 내보냅니다.

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. Airflow 2 환경 버킷 URI를 가져옵니다.

    1. 다음 명령어를 실행합니다.

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      다음과 같이 바꿉니다.

      • AIRFLOW_2_ENV: Airflow 2 환경의 이름입니다.
      • AIRFLOW_2_LOCATION을 환경이 위치한 리전으로 바꿉니다.
    2. 출력에서 /dags 폴더를 삭제합니다. 결과는 Airflow 2 환경의 버킷의 URI입니다.

      예를 들어 gs://us-central1-example-916807e1-bucket/dagsgs://us-central1-example-916807e1-bucket으로 변경합니다.

  4. 변수와 풀을 포함한 JSON 파일을 Airflow 2 환경으로 전송합니다.

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    AIRFLOW_2_BUCKET을 이전 단계에서 가져온 Airflow 2 환경 버킷의 URI로 바꿉니다.

  5. 변수 및 풀을 Airflow 2로 가져옵니다.

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. 변수 및 풀을 가져왔는지 확인합니다.

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. 버킷에서 JSON 파일을 삭제합니다.

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

5단계: Airflow 1.10.* 환경 버킷에서 다른 데이터 전송

gcloud

  1. 플러그인을 Airflow 2 환경으로 전송합니다. 이를 위해 Airflow 1.10.* 환경 버킷에서 Airflow 2 환경 버킷의 /plugins 폴더로 플러그인을 내보냅니다.

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. /plugins 폴더를 성공적으로 가져왔는지 확인합니다.

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. /data 폴더를 Airflow 1.10.* 환경에서 Airflow 2 환경으로 내보냅니다.

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. /data 폴더를 성공적으로 가져왔는지 확인합니다.

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

6단계: 연결 및 사용자 전송

Airflow 1.10.*은 사용자 및 연결 내보내기를 지원하지 않습니다. 사용자 및 연결을 전송하려면 Airflow 2 환경에서 새 사용자 계정 및 연결을 수동으로 만듭니다.

gcloud

  1. Airflow 1.10.* 환경에서 연결 목록을 가져오려면 다음을 실행합니다.

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. Airflow 2 환경에서 새 연결을 만들려면 gcloud를 통해 connections Airflow CLI 명령어를 실행합니다. 예를 들면 다음과 같습니다.

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. Airflow 1.10.* 환경에서 사용자 목록을 확인하려면 다음 안내를 따르세요.

    1. Airflow 1.10.* 환경에 대해 Airflow 웹 인터페이스를 엽니다.

    2. 관리자 > 사용자로 이동합니다.

  4. Airflow 2 환경에서 새 사용자 계정을 만들려면 gcloud를 통해 users create Airflow CLI 명령어를 실행합니다. 예를 들면 다음과 같습니다.

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Admin
    

7단계: DAG가 Airflow 2에 맞게 준비되었는지 확인

DAG를 Airflow 2 환경으로 전송하기 전 다음을 확인합니다.

  1. DAG가 성공적으로 실행되고 남아 있는 호환성 문제가 없습니다.

  2. DAG에서 올바른 가져오기 문을 사용합니다.

    예를 들어 BigQueryCreateDataTransferOperator의 새 가져오기 문은 다음과 같습니다.

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. DAG가 Airflow 2에 맞게 업그레이드되었습니다. 이 변경사항은 Airflow 1.10.14 이상 버전과 호환됩니다.

8단계: DAG를 Airflow 2 환경으로 전송

환경 간에 DAG를 전송할 때는 다음과 같은 잠재적 문제가 발생할 수 있습니다.

  • 두 환경 모두에 DAG가 사용 설정(일시중지되지 않음)된 경우 각 환경은 예약된 일정에 따라 자체 DAG 사본을 실행합니다. 그로 인해 동일한 데이터 및 실행 시간에 동시 DAG 실행이 발생할 수 있습니다.

  • DAG 포착 때문에 Airflow에서 DAG에 지정된 시작 날짜부터 시작되는 추가 DAG 실행을 예약합니다. 새 Airflow 인스턴스가 1.10.* 환경의 DAG 실행 기록을 고려하지 않기 때문입니다. 이로 인해 지정된 시작 날짜부터 시작되는 다수의 DAG 실행 작업이 예약될 수 있습니다.

동시 DAG 실행 방지

Airflow 2 환경에서 dags_are_paused_at_creation Airflow 구성 옵션을 재정의합니다. 이 변경 후에는 모든 새로운 DAG가 기본적으로 일시중지됩니다.

섹션
core dags_are_paused_at_creation True

추가 또는 누락 DAG 실행 방지

Airflow 2 환경으로 전송하는 DAG에 새 정적 시작 날짜를 지정합니다.

실행 날짜의 차이 및 중복을 막으려면 Airflow 2 환경에서 첫 번째 DAG 실행이 일정 간격의 다음 어커런스에 이루어져야 합니다. 이를 위해서는 DAG의 새 시작 날짜를 Airflow 1.10.* 환경의 마지막 실행 날짜 이전으로 설정합니다.

예를 들어 DAG가 Airflow 1.10.* 환경에서 매일 15:00, 17:00, 21:00에 실행하면 마지막 DAG 실행이 15:00에 수행되며 DAG를 15:15에 전송하면 Airflow 2 환경의 시작 날짜는 오늘 14:45일 수 있습니다. Airflow 2 환경에서 DAG를 사용 설정하면 Airflow는 DAG 실행을 17:00으로 예약합니다.

또 다른 예시로, DAG가 Airflow 1.10.* 환경에서 매일 00:00에 실행되는 경우, DAG 실행이 2021년 4월 26일 00:00에 수행되었고, 2021년 4월 26일 13:00에 DAG 전송을 계획한 경우 Airflow 2 환경의 시작 날짜는 2021년 4월 25일 23:45일 수 있습니다. Airflow 2 환경에서 DAG를 사용 설정한 다음 Airflow는 2021년 4월 27일 00:00에 DAG 실행을 예약합니다.

DAG를 하나씩 Airflow 2 환경으로 전송

각 DAG를 다음 절차에 따라 전송합니다.

  1. DAG의 새 시작 날짜가 이전 섹션에서 설명한 대로 설정되었는지 확인합니다.

  2. Airflow 2 환경으로 업데이트된 DAG를 업로드합니다. 구성 재정의로 인해 이 DAG가 Airflow 2 환경에서 일시 중지되므로, DAG 실행이 아직 예약되지 않았습니다.

  3. Airflow 웹 인터페이스에서 DAG로 이동하여 보고된 DAG 구문 오류를 확인합니다.

  4. DAG를 전송할 계획인 경우:

    1. Airflow 1.10.* 환경에서 DAG를 일시중지합니다.

    2. Airflow 2 환경에서 DAG의 일시중지를 해제합니다.

    3. 새 DAG 실행이 올바른 시간에 예약되었는지 확인합니다.

    4. Airflow 2 환경에서 DAG 실행이 이루어질 때까지 기다린 후 실행이 성공했는지 확인합니다.

  5. DAG 실행 성공 여부에 따라 다음을 수행합니다.

    • DAG 실행이 성공하면 계속 진행하여 Airflow 2 환경에서 DAG를 사용할 수 있습니다. Airflow 1.10.* 버전 DAG는 삭제하는 것이 좋습니다.

    • DAG 실행이 실패하면 Airflow 2에서 성공적으로 실행될 때까지 DAG 문제 해결을 시도합니다.

      필요한 경우 언제든지 Airflow 1.10.* 버전의 DAG로 돌아갈 수 있습니다.

      1. Airflow 2 환경에서 DAG를 일시중지합니다.

      2. Airflow 1.10.* 환경에서 DAG 일시중지를 취소합니다. 그러면 실패한 DAG 실행과 동일한 날짜 및 시간에 새 DAG 실행이 예약됩니다.

      3. Airflow 2 버전의 DAG를 계속 진행할 준비가 되면 시작 날짜를 조정하고 새 버전의 DAG를 Airflow 2 환경에 업로드한 후 절차를 반복합니다.

9단계: Airflow 2 환경 모니터링

모든 DAG 및 구성을 Airflow 2 환경으로 전송한 후 잠재적 문제, 실패한 DAG 실행, 전반적인 환경 상태를 모니터링합니다. Airflow 2 환경이 충분한 시간 동안 문제없이 실행되면 Airflow 1.10.* 환경을 삭제해도 됩니다.

다음 단계