Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
이 페이지에서는 Cloud Composer 환경의 DAG를 관리하는 방법을 설명합니다.
Cloud Composer는 Cloud Storage 버킷을 사용하여 Cloud Composer 환경의 DAG를 저장합니다. 사용자 환경이 이 버킷의 DAG를 Airflow 작업자 및 스케줄러와 같은 Airflow 구성요소에 동기화합니다.
시작하기 전에
- Apache Airflow는 강력한 DAG 분리를 제공하지 않으므로 프로덕션 환경과 테스트 환경을 별도로 유지하여 DAG 방해를 방지하는 것이 좋습니다. 자세한 내용은 DAG 테스트를 참조하세요.
- 계정에 DAG를 관리할 수 있는 권한이 충분한지 확인합니다.
- DAG 변경사항은 3-5분 이내에 Airflow로 전파됩니다. Airflow 웹 인터페이스에서 태스크 상태를 볼 수 있습니다.
사용자 환경의 버킷 액세스
사용자 환경과 연관된 버킷에 액세스하려면 다음 안내를 따르세요.
콘솔
- 콘솔에서 Google Cloud 환경 페이지로 이동합니다. 
- 환경 목록에서 사용자 환경의 이름이 있는 행을 찾고 DAG 폴더 열에서 DAG 링크를 클릭합니다. 버킷 세부정보 페이지가 열립니다. 사용자 환경의 버킷에 있는 - /dags폴더의 콘텐츠를 보여줍니다.
gcloud
gcloud CLI에는 사용자 환경의 버킷에서 DAG를 추가 및 삭제하기 위한 명령이 별도로 있습니다.
환경의 버킷과 상호작용하기 위해 Google Cloud CLI를 사용할 수도 있습니다. 사용자 환경의 버킷 주소를 가져오려면 다음 gcloud CLI 명령어를 실행합니다.
gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"
다음과 같이 바꿉니다.
- ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
- LOCATION을 환경이 위치한 리전으로 바꿉니다.
예를 들면 다음과 같습니다.
gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"
API
environments.get API 요청을 생성합니다. Environment 리소스, EnvironmentConfig 리소스, dagGcsPrefix 리소스에 환경 버킷의 주소가 있습니다.
예를 들면 다음과 같습니다.
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
google-auth 라이브러리를 사용하여 사용자 인증 정보를 가져오고 requests 라이브러리를 사용하여 REST API를 호출합니다.
DAG 추가 또는 업데이트
DAG를 추가하거나 업데이트하려면 DAG의 Python .py 파일을 환경 버킷의 /dags 폴더로 이동합니다.
콘솔
- 콘솔에서 Google Cloud 환경 페이지로 이동합니다. 
- 환경 목록에서 사용자 환경의 이름이 있는 행을 찾고 DAG 폴더 열에서 DAG 링크를 클릭합니다. 버킷 세부정보 페이지가 열립니다. 사용자 환경의 버킷에 있는 - /dags폴더의 콘텐츠를 보여줍니다.
- 파일 업로드를 클릭합니다. 그런 후 브라우저 대화상자를 사용해서 DAG에 대해 Python - .py파일을 선택하고 확인합니다.
gcloud
gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"
다음과 같이 바꿉니다.
- ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
- LOCATION: 환경이 위치한 리전
- LOCAL_FILE_TO_UPLOAD: DAG에 대한 Python- .py파일
예를 들면 다음과 같습니다.
gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"
활성 DAG 실행이 포함된 DAG 업데이트
활성 DAG 실행이 포함된 DAG를 업데이트하려면 다음 안내를 따르세요.
- 현재 실행 중인 태스크는 원래 DAG 파일을 사용하여 완료됩니다.
- 예약되었지만 아직 실행되지 않은 모든 태스크는 업데이트된 DAG 파일을 사용합니다.
- 업데이트된 DAG 파일에 더 이상 포함되지 않는 모든 태스크는 삭제된 것으로 표시됩니다.
빈번한 일정으로 실행되는 DAG 업데이트
DAG 파일을 업로드한 후 Airflow가 이 파일을 로드하고 DAG를 업데이트하려면 시간이 걸립니다. DAG가 빈번한 일정으로 실행될 경우, DAG에 업데이트된 DAG 파일 버전이 사용되는지 확인해야 할 수 있습니다. 방법은 다음과 같습니다.
- Airflow UI에서 DAG를 일시중지합니다. 
- 업데이트된 DAG 파일을 업로드합니다. 
- Airflow UI에 업데이트가 표시될 때까지 기다립니다. 즉, DAG가 스케줄러에서 올바르게 파싱되었고 Airflow 데이터베이스에서 업데이트되었는지 확인합니다. - Airflow UI에 업데이트된 DAG가 표시되어도 Airflow 작업자에 업데이트된 DAG 파일 버전이 포함되었다고 보장할 수 없습니다. 이것은 DAG 파일이 스케줄러 및 작업자에 대해 개별적으로 동기화되기 때문입니다. 
- DAG 파일이 환경 내 모든 작업자와 동기화되었는지 확인하기 위해 대기 시간을 늘려야 할 수 있습니다. 동기화는 분당 여러 번 수행됩니다. 정상 환경에서 모든 작업자가 동기화되기 위해서는 약 20~30초 정도의 대기 시간이면 충분합니다. 
- (선택사항) 모든 작업자에게 새 버전의 DAG 파일이 사용되도록 완전히 보장하기 위해서는 개별 작업자의 로그를 조사하세요. 방법은 다음과 같습니다. - Google Cloud 콘솔에서 사용자 환경에 대해 로그 탭을 엽니다. 
- Composer 로그 > 인프라 > Cloud Storage 동기화 항목으로 이동하고 해당 환경의 모든 작업자 로그를 조사합니다. 새 DAG 파일을 업로드한 후 타임스탬프가 있는 최신 - Syncing dags directory로그를 찾습니다. 뒤에- Finished syncing항목이 표시되면 DAG가 이 작업자에서 성공적으로 동기화된 것입니다.
 
- DAG 일시중지를 취소합니다. 
DAG 다시 파싱
DAG는 환경의 버킷에 저장되므로 각 DAG는 먼저 DAG 프로세서에 동기화된 후 DAG 프로세서가 약간의 지연 시간을 두고 파싱합니다. Airflow UI를 통해 DAG를 수동으로 다시 파싱하면 DAG 프로세서가 사용 가능한 현재 버전의 DAG를 다시 파싱합니다. 이 버전은 환경 버킷에 업로드한 최신 버전의 DAG가 아닐 수 있습니다.
파싱 시간이 긴 경우에만 주문형 재파싱을 사용하는 것이 좋습니다. 예를 들어 환경에 파일이 많거나 Airflow 구성 옵션에 긴 DAG 파싱 간격이 구성된 경우에 발생할 수 있습니다.
환경에서 DAG 삭제
DAG를 삭제하려면 사용자 환경 버킷의 환경 /dags 폴더에서 DAG에 대해 Python .py 파일을 삭제합니다.
콘솔
- 콘솔에서 Google Cloud 환경 페이지로 이동합니다. 
- 환경 목록에서 사용자 환경의 이름이 있는 행을 찾고 DAG 폴더 열에서 DAG 링크를 클릭합니다. 버킷 세부정보 페이지가 열립니다. 사용자 환경의 버킷에 있는 - /dags폴더의 콘텐츠를 보여줍니다.
- DAG 파일을 선택하고 삭제를 클릭하고 작업을 확인합니다. 
gcloud
gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE
다음과 같이 바꿉니다.
- ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
- LOCATION: 환경이 위치한 리전
- DAG_FILE: DAG의 Python- .py파일
예를 들면 다음과 같습니다.
gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py
Airflow UI에서 DAG 삭제
Airflow 웹 인터페이스에서 DAG의 메타데이터를 제거하려면 다음을 입력합니다.
Airflow UI
- 사용자 환경의 Airflow UI로 이동합니다.
- DAG에 대해 DAG 삭제를 클릭합니다.
gcloud
gcloud CLI에서 다음 명령어를 실행합니다.
  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME
다음과 같이 바꿉니다.
- ENVIRONMENT_NAME: 환경 이름
- LOCATION: 환경이 위치한 리전
- DAG_NAME: 삭제할 DAG의 이름