Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
이 페이지에서는 DAG, 데이터, 구성을 기존 Airflow 1.10.* 환경에서 Airflow 버전 2 이상 환경으로 전송하는 방법을 설명합니다.
기타 마이그레이션 가이드
마이그레이션 이전 | 마이그레이션 이후 | 메서드 | 가이드 |
---|---|---|---|
Cloud Composer 1, Airflow 2 | Cloud Composer 2, Airflow 2 | 스냅샷을 사용하여 단계별 마이그레이션 | 마이그레이션 가이드(스냅샷) |
Cloud Composer 1, Airflow 1 | Cloud Composer 2, Airflow 2 | 스냅샷을 사용하여 단계별 마이그레이션 | 마이그레이션 가이드(스냅샷) |
Cloud Composer 1, Airflow 2 | Cloud Composer 2, Airflow 2 | 단계별 수동 전송 | 수동 마이그레이션 가이드 |
Cloud Composer 1, Airflow 1 | Cloud Composer 2, Airflow 2 | 단계별 수동 전송 | 수동 마이그레이션 가이드 |
Airflow 1 | Airflow 2 | 단계별 수동 전송 | 이 가이드(수동 마이그레이션) |
단계별 업그레이드
Cloud Composer는 Cloud Composer 데이터베이스 전송 스크립트를 제공하여 메타데이터 데이터베이스, DAG, 데이터, 플러그인을 Airflow 1.10.14 및 Airflow 1.10.15가 있는 Cloud Composer 환경에서 Airflow 2.0.1 이상의 Airflow 버전을 사용하는 기존 Cloud Composer 환경으로 마이그레이션합니다.
이 방법은 이 가이드에서 설명하는 방법의 대체 방법입니다. 이 가이드의 일부 내용은 제공된 스크립트를 사용할 때 적용됩니다. 예를 들어 마이그레이션하기 전 DAG에서 Airflow 2와의 호환성을 확인하거나 동시 DAG 실행이 발생하지 않는지 그리고 추가 또는 누락된 DAG 실행이 없는지 확인해야 할 수 있습니다.
시작하기 전에
Airflow 2로 Cloud Composer 환경을 시작하기 전 Airflow 2로 Cloud Composer 환경에 적용되는 변경사항을 고려합니다.
Scheduler HA
환경에서 Airflow 스케줄러를 2개 이상 사용할 수 있습니다. 환경을 만들 때 또는 기존 환경을 업데이트할 때 스케줄러 수를 설정할 수 있습니다.
Celery+Kubernetes Executor
Airflow 2 Celery+Kubernetes Executor는 Cloud Composer 3에서 지원됩니다.
브레이킹 체인지
Airflow 2에는 다음과 같은 주요 변경사항이 도입되었습니다.
- Airflow 1.10.* 의 기존 DAG가 Airflow 2에서 작동된다는 보장이 없습니다. 테스트가 필요하며 조정해야 할 수 있습니다.
- 연산자, 전송, 후크가 공급자 패키지로 이전됩니다. DAG의 가져오기 문이 새 공급자 패키지를 사용해야 합니다. Airflow 2에서 이전의 가져오기 문이 더 이상 작동하지 않을 수 있습니다.
- Airflow 2에서 더 이상 지원되지 않는 구성 옵션이 몇 가지 있기 때문에 일부 Airflow 1.10.* 구성이 지원되지 않을 수 있습니다.
- 일부 커스텀 PyPI 패키지는 새 버전의 Airflow 또는 Python과 호환되지 않을 수 있습니다.
- 액세스 제어가 포함된 Airflow UI는 기본 Airflow 2 UI입니다. Airflow 2가 다른 Airflow UI 유형을 지원하지 않습니다.
- 실험용 REST API가 안정적인 Airflow API로 대체되었습니다. 실험용 REST API가 Airflow 2에서 기본적으로 사용 중지되었습니다.
- Airflow 2.0.0의 기타 주요 변경사항
- Airflow 2.0.1의 기타 주요 변경사항
Airflow 2 및 Airflow 1.10.* 환경의 차이점
Airflow 1.10.* 을 사용하는 Cloud Composer 환경과 Airflow 2 환경의 주요 차이점:
- Airflow 2 환경에 Python 3.8이 사용됩니다. 이 버전은 Airflow 1.10.* 환경에 사용된 것보다 최신 버전입니다. Python 2, Python 3.6, Python 3.7은 지원되지 않습니다.
- Airflow 2에는 다른 CLI 형식이 사용됩니다. Cloud Composer는
gcloud composer environments run
명령어를 통해 Airflow 2 사용 환경에서 새 형식을 지원합니다. - Airflow 2 환경에서는 사전 설치된 PyPI 패키지가 다릅니다. 사전 설치된 PyPI 패키지 목록은 Cloud Composer 버전 목록을 참조하세요.
- Airflow 2에서는 DAG 직렬화가 항상 사용 설정됩니다. 따라서 비동기 DAG 로드가 더 이상 필요하지 않으며 Airflow 2에서 지원되지 않습니다. 따라서 Airflow 2에 대해
[core]store_serialized_dags
및[core]store_dag_code
매개변수 구성이 지원되지 않으며 이를 설정하려고 시도하면 오류가 보고됩니다. - Airflow 웹 서버 플러그인이 지원되지 않습니다. 이는 Airflow 연산자와 센서를 포함하여 스케줄러 또는 작업자 플러그인에 영향을 주지 않습니다.
- Airflow 2 환경에서 기본 Airflow 사용자 역할은
Op
입니다. Airflow 1.10.* 사용 환경에서 기본 역할은Admin
입니다.
1단계: Airflow 2 호환성 확인
Airflow 2와의 잠재적 충돌을 확인하려면 Airflow 2.0 이상으로 업그레이드 가이드의 DAG 업그레이드 섹션을 참조하세요.
발생할 수 있는 일반적인 문제 중에 호환되지 않는 가져오기 경로와 관련된 문제가 있습니다. 이 호환성 문제를 해결하는 방법에 대한 자세한 내용은 Airflow 2.0 이상으로 업그레이드 가이드의 백포트 제공업체 섹션을 참조하세요.
2단계: Airflow 2 환경 만들기와 구성 재정의 및 환경 변수 전송
Airflow 2 환경을 만들고 구성 재정의 및 환경 변수를 전송합니다.
환경 만들기 단계를 따릅니다. 또한 환경을 만들려면 먼저 아래 설명에 따라 구성 재정의 및 환경 변수를 지정합니다.
이미지를 선택할 때 Airflow 2가 사용된 이미지를 선택합니다.
Airflow 1.10.* 환경에서 새로운 Airflow 2 환경으로 구성 매개변수를 수동으로 전송합니다.
Console
환경을 만들 때 네트워킹, Airflow 구성 재정의, 추가 기능 섹션을 확장합니다.
Airflow 구성 재정의에서 Airflow 구성 재정의 추가를 클릭합니다.
Airflow 1.10.* 환경에서 모든 구성 재정의를 복사합니다.
일부 구성 옵션에는 Airflow 2의 다른 이름 및 섹션이 사용됩니다. 자세한 내용은 구성 변경사항을 참조하세요.
환경 변수 아래에서 변수 추가를 클릭합니다.
Airflow 1.10.* 환경의 모든 환경 변수를 복사합니다.
만들기를 클릭하여 환경을 만듭니다.
3단계: Airflow 2 환경에 PyPI 패키지 설치
Airflow 2 환경이 생성되면 여기에 PyPI 패키지를 설치합니다.
콘솔
Google Cloud 콘솔에서 환경 페이지로 이동합니다.
Airflow 2 환경을 선택합니다.
PyPI 패키지 탭으로 이동하여 편집을 클릭합니다.
Airflow 1.10.* 환경에서 PyPI 패키지 요구사항을 복사합니다. 저장을 클릭하고 환경이 업데이트될 때까지 기다립니다.
Airflow 2 환경에 다른 사전 설치된 패키지 집합과 다른 Python 버전이 사용되기 때문에 해결하기 어려운 PyPI 패키지 충돌이 발생할 수 있습니다. 패키지 종속성 문제를 진단하기 위한 한 가지 방법은 Airflow 작업자 포드에 패키지를 설치하여 PyPI 패키지 오류를 확인하는 것입니다.
4단계: Airflow 2로 변수 및 풀 전송
Airflow 1.10.*에서는 변수 및 풀을 JSON 파일로 내보낼 수 있습니다. 그런 다음 이 파일을 Airflow 2 환경으로 가져오면 됩니다.
default_pool
이외의 커스텀 풀이 있다면 풀만 전송하면 됩니다. 그 밖의 경우에는 풀을 내보내고 가져오는 명령어를 건너뜁니다.
gcloud
Airflow 1.10.* 환경에서 변수를 내보냅니다.
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ variables -- -e /home/airflow/gcs/data/variables.json
다음과 같이 바꿉니다.
AIRFLOW_1_ENV
: Airflow 1.10.* 환경의 이름입니다.AIRFLOW_1_LOCATION
을 환경이 위치한 리전으로 바꿉니다.
Airflow 1.10.* 환경에서 풀을 내보냅니다.
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ pool -- -e /home/airflow/gcs/data/pools.json
Airflow 2 환경 버킷 URI를 가져옵니다.
다음 명령어를 실행합니다.
gcloud composer environments describe AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ --format="value(config.dagGcsPrefix)"
다음과 같이 바꿉니다.
AIRFLOW_2_ENV
: Airflow 2 환경의 이름입니다.AIRFLOW_2_LOCATION
을 환경이 위치한 리전으로 바꿉니다.
출력에서
/dags
폴더를 삭제합니다. 결과는 Airflow 2 환경의 버킷의 URI입니다.예를 들어
gs://us-central1-example-916807e1-bucket/dags
를gs://us-central1-example-916807e1-bucket
으로 변경합니다.
변수와 풀을 포함한 JSON 파일을 Airflow 2 환경으로 전송합니다.
gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION \ --source=variables.json gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION \ --source=pools.json
AIRFLOW_2_BUCKET
을 이전 단계에서 가져온 Airflow 2 환경 버킷의 URI로 바꿉니다.변수 및 풀을 Airflow 2로 가져옵니다.
gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables import \ -- /home/airflow/gcs/data/variables.json gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools import \ -- /home/airflow/gcs/data/pools.json
변수 및 풀을 가져왔는지 확인합니다.
gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ variables list gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ pools list
버킷에서 JSON 파일을 삭제합니다.
gcloud composer environments storage data delete \ variables.json \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION gcloud composer environments storage data delete \ pools.json \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION gcloud composer environments storage data delete \ variables.json \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION gcloud composer environments storage data delete \ pools.json \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
5단계: Airflow 1.10.* 환경 버킷에서 다른 데이터 전송
gcloud
플러그인을 Airflow 2 환경으로 전송합니다. 이를 위해 Airflow 1.10.* 환경 버킷에서 Airflow 2 환경 버킷의
/plugins
폴더로 플러그인을 내보냅니다.gcloud composer environments storage plugins export \ --destination=AIRFLOW_2_BUCKET/plugins \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
/plugins
폴더를 성공적으로 가져왔는지 확인합니다.gcloud composer environments storage plugins list \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION
/data
폴더를 Airflow 1.10.* 환경에서 Airflow 2 환경으로 내보냅니다.gcloud composer environments storage data export \ --destination=AIRFLOW_2_BUCKET/data \ --environment=AIRFLOW_1_ENV \ --location=AIRFLOW_1_LOCATION
/data
폴더를 성공적으로 가져왔는지 확인합니다.gcloud composer environments storage data list \ --environment=AIRFLOW_2_ENV \ --location=AIRFLOW_2_LOCATION
6단계: 연결 및 사용자 전송
Airflow 1.10.*은 사용자 및 연결 내보내기를 지원하지 않습니다. 사용자 및 연결을 전송하려면 Airflow 2 환경에서 새 사용자 계정 및 연결을 수동으로 만듭니다.
gcloud
Airflow 1.10.* 환경에서 연결 목록을 가져오려면 다음을 실행합니다.
gcloud composer environments run AIRFLOW_1_ENV \ --location AIRFLOW_1_LOCATION \ connections -- --list
Airflow 2 환경에서 새 연결을 만들려면 gcloud를 통해
connections
Airflow CLI 명령어를 실행합니다. 예를 들면 다음과 같습니다.gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ connections add \ -- --conn-host postgres.example.com \ --conn-port 5432 \ --conn-type postgres \ --conn-login example_user \ --conn-password example_password \ --conn-description "Example connection" \ example_connection
Airflow 1.10.* 환경에서 사용자 목록을 확인하려면 다음 안내를 따르세요.
Airflow 1.10.* 환경에 대해 Airflow 웹 인터페이스를 엽니다.
관리자 > 사용자로 이동합니다.
Airflow 2 환경에서 새 사용자 계정을 만들려면 gcloud를 통해
users create
Airflow CLI 명령어를 실행합니다. 예를 들면 다음과 같습니다.gcloud composer environments run \ AIRFLOW_2_ENV \ --location AIRFLOW_2_LOCATION \ users create \ -- --username example_username \ --firstname Example-Name \ --lastname Example-Surname \ --email example-user@example.com \ --use-random-password \ --role Admin
7단계: DAG가 Airflow 2에 맞게 준비되었는지 확인
DAG를 Airflow 2 환경으로 전송하기 전 다음을 확인합니다.
DAG가 성공적으로 실행되고 남아 있는 호환성 문제가 없습니다.
DAG에서 올바른 가져오기 문을 사용합니다.
예를 들어
BigQueryCreateDataTransferOperator
의 새 가져오기 문은 다음과 같습니다.from airflow.providers.google.cloud.operators.bigquery_dts \ import BigQueryCreateDataTransferOperator
DAG가 Airflow 2에 맞게 업그레이드되었습니다. 이 변경사항은 Airflow 1.10.14 이상 버전과 호환됩니다.
8단계: DAG를 Airflow 2 환경으로 전송
환경 간에 DAG를 전송할 때는 다음과 같은 잠재적 문제가 발생할 수 있습니다.
두 환경 모두에 DAG가 사용 설정(일시중지되지 않음)된 경우 각 환경은 예약된 일정에 따라 자체 DAG 사본을 실행합니다. 그로 인해 동일한 데이터 및 실행 시간에 동시 DAG 실행이 발생할 수 있습니다.
DAG 포착 때문에 Airflow에서 DAG에 지정된 시작 날짜부터 시작되는 추가 DAG 실행을 예약합니다. 새 Airflow 인스턴스가 1.10.* 환경의 DAG 실행 기록을 고려하지 않기 때문입니다. 이로 인해 지정된 시작 날짜부터 시작되는 다수의 DAG 실행 작업이 예약될 수 있습니다.
동시 DAG 실행 방지
Airflow 2 환경에서 dags_are_paused_at_creation
Airflow 구성 옵션을 재정의합니다. 이 변경 후에는 모든 새로운 DAG가 기본적으로 일시중지됩니다.
섹션 | 키 | 값 |
---|---|---|
core |
dags_are_paused_at_creation |
True |
추가 또는 누락 DAG 실행 방지
Airflow 2 환경으로 전송하는 DAG에 새 정적 시작 날짜를 지정합니다.
실행 날짜의 차이 및 중복을 막으려면 Airflow 2 환경에서 첫 번째 DAG 실행이 일정 간격의 다음 어커런스에 이루어져야 합니다. 이를 위해서는 DAG의 새 시작 날짜를 Airflow 1.10.* 환경의 마지막 실행 날짜 이전으로 설정합니다.
예를 들어 DAG가 Airflow 1.10.* 환경에서 매일 15:00, 17:00, 21:00에 실행하면 마지막 DAG 실행이 15:00에 수행되며 DAG를 15:15에 전송하면 Airflow 2 환경의 시작 날짜는 오늘 14:45일 수 있습니다. Airflow 2 환경에서 DAG를 사용 설정하면 Airflow는 DAG 실행을 17:00으로 예약합니다.
또 다른 예시로, DAG가 Airflow 1.10.* 환경에서 매일 00:00에 실행되는 경우, DAG 실행이 2021년 4월 26일 00:00에 수행되었고, 2021년 4월 26일 13:00에 DAG 전송을 계획한 경우 Airflow 2 환경의 시작 날짜는 2021년 4월 25일 23:45일 수 있습니다. Airflow 2 환경에서 DAG를 사용 설정한 다음 Airflow는 2021년 4월 27일 00:00에 DAG 실행을 예약합니다.
DAG를 하나씩 Airflow 2 환경으로 전송
각 DAG를 다음 절차에 따라 전송합니다.
DAG의 새 시작 날짜가 이전 섹션에서 설명한 대로 설정되었는지 확인합니다.
Airflow 2 환경으로 업데이트된 DAG를 업로드합니다. 구성 재정의로 인해 이 DAG가 Airflow 2 환경에서 일시 중지되므로, DAG 실행이 아직 예약되지 않았습니다.
Airflow 웹 인터페이스에서 DAG로 이동하여 보고된 DAG 구문 오류를 확인합니다.
DAG를 전송할 계획인 경우:
Airflow 1.10.* 환경에서 DAG를 일시중지합니다.
Airflow 2 환경에서 DAG의 일시중지를 해제합니다.
새 DAG 실행이 올바른 시간에 예약되었는지 확인합니다.
Airflow 2 환경에서 DAG 실행이 이루어질 때까지 기다린 후 실행이 성공했는지 확인합니다.
DAG 실행 성공 여부에 따라 다음을 수행합니다.
DAG 실행이 성공하면 계속 진행하여 Airflow 2 환경에서 DAG를 사용할 수 있습니다. Airflow 1.10.* 버전 DAG는 삭제하는 것이 좋습니다.
DAG 실행이 실패하면 Airflow 2에서 성공적으로 실행될 때까지 DAG 문제 해결을 시도합니다.
필요한 경우 언제든지 Airflow 1.10.* 버전의 DAG로 돌아갈 수 있습니다.
Airflow 2 환경에서 DAG를 일시중지합니다.
Airflow 1.10.* 환경에서 DAG 일시중지를 취소합니다. 그러면 실패한 DAG 실행과 동일한 날짜 및 시간에 새 DAG 실행이 예약됩니다.
Airflow 2 버전의 DAG를 계속 진행할 준비가 되면 시작 날짜를 조정하고 새 버전의 DAG를 Airflow 2 환경에 업로드한 후 절차를 반복합니다.
9단계: Airflow 2 환경 모니터링
모든 DAG 및 구성을 Airflow 2 환경으로 전송한 후 잠재적 문제, 실패한 DAG 실행, 전반적인 환경 상태를 모니터링합니다. Airflow 2 환경이 충분한 시간 동안 문제없이 실행되면 Airflow 1.10.* 환경을 삭제해도 됩니다.