DAG 추가 및 업데이트

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 Cloud Composer 환경의 DAG를 관리하는 방법을 설명합니다.

Cloud Composer는 Cloud Storage 버킷을 사용하여 Cloud Composer 환경의 DAG를 저장합니다. 사용자 환경이 이 버킷의 DAG를 Airflow 작업자 및 스케줄러와 같은 Airflow 구성요소에 동기화합니다.

시작하기 전에

  • Apache Airflow는 강력한 DAG 분리를 제공하지 않으므로 프로덕션 환경과 테스트 환경을 별도로 유지하여 DAG 방해를 방지하는 것이 좋습니다. 자세한 내용은 DAG 테스트를 참조하세요.
  • 계정에 DAG를 관리할 수 있는 권한이 충분한지 확인합니다.
  • DAG 변경사항은 3-5분 이내에 Airflow로 전파됩니다. Airflow 웹 인터페이스에서 태스크 상태를 볼 수 있습니다.

사용자 환경의 버킷 액세스

사용자 환경과 연관된 버킷에 액세스하려면 다음 안내를 따르세요.

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 사용자 환경의 이름이 있는 행을 찾고 DAG 폴더 열에서 DAG 링크를 클릭합니다. 버킷 세부정보 페이지가 열립니다. 사용자 환경의 버킷에 있는 /dags 폴더의 콘텐츠를 보여줍니다.

gcloud

gcloud CLI에는 사용자 환경의 버킷에서 DAG를 추가삭제하기 위한 명령이 별도로 있습니다.

또한 사용자 환경의 버킷과 상호작용하려면 gsutil 명령줄 도구를 사용할 수 있습니다. 사용자 환경의 버킷 주소를 가져오려면 다음 gcloud CLI 명령어를 실행합니다.

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.

예를 들면 다음과 같습니다.

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

environments.get API 요청을 생성합니다. Environment 리소스, EnvironmentConfig 리소스, dagGcsPrefix 리소스에 환경 버킷의 주소가 있습니다.

예를 들면 다음과 같습니다.

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

google-auth 라이브러리를 사용하여 사용자 인증 정보를 가져오고 requests 라이브러리를 사용하여 REST API를 호출하세요.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

DAG 추가 또는 업데이트

DAG를 추가하거나 업데이트하려면 DAG의 Python .py 파일을 환경 버킷의 /dags 폴더로 이동합니다.

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 사용자 환경의 이름이 있는 행을 찾고 DAG 폴더 열에서 DAG 링크를 클릭합니다. 버킷 세부정보 페이지가 열립니다. 사용자 환경의 버킷에 있는 /dags 폴더의 콘텐츠를 보여줍니다.

  3. 파일 업로드를 클릭합니다. 그런 후 브라우저 대화상자를 사용해서 DAG에 대해 Python .py 파일을 선택하고 확인합니다.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION: 환경이 위치한 리전
  • LOCAL_FILE_TO_UPLOAD: DAG에 대한 Python .py 파일

예를 들면 다음과 같습니다.

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

활성 DAG 실행이 포함된 DAG 업데이트

활성 DAG 실행이 포함된 DAG를 업데이트하려면 다음 안내를 따르세요.

  • 현재 실행 중인 태스크는 원래 DAG 파일을 사용하여 완료됩니다.
  • 예약되었지만 아직 실행되지 않은 모든 태스크는 업데이트된 DAG 파일을 사용합니다.
  • 업데이트된 DAG 파일에 더 이상 포함되지 않는 모든 태스크는 삭제된 것으로 표시됩니다.

빈번한 일정으로 실행되는 DAG 업데이트

DAG 파일을 업로드한 후 Airflow가 이 파일을 로드하고 DAG를 업데이트하려면 시간이 걸립니다. DAG가 빈번한 일정으로 실행될 경우, DAG에 업데이트된 DAG 파일 버전이 사용되는지 확인해야 할 수 있습니다. 방법은 다음과 같습니다.

  1. Airflow UI에서 DAG를 일시중지합니다.

  2. 업데이트된 DAG 파일을 업로드합니다.

  3. Airflow UI에 업데이트가 표시될 때까지 기다립니다. 즉, DAG가 스케줄러에서 올바르게 파싱되었고 Airflow 데이터베이스에서 업데이트되었는지 확인합니다.

    Airflow UI에 업데이트된 DAG가 표시되어도 Airflow 작업자에 업데이트된 DAG 파일 버전이 포함되었다고 보장할 수 없습니다. 이것은 DAG 파일이 스케줄러 및 작업자에 대해 개별적으로 동기화되기 때문입니다.

  4. DAG 파일이 환경 내 모든 작업자와 동기화되었는지 확인하기 위해 대기 시간을 늘려야 할 수 있습니다. 동기화는 분당 여러 번 수행됩니다. 정상 환경에서 모든 작업자가 동기화되기 위해서는 약 20~30초 정도의 대기 시간이면 충분합니다.

  5. (선택사항) 모든 작업자에게 새 버전의 DAG 파일이 사용되도록 완전히 보장하기 위해서는 개별 작업자의 로그를 조사하세요. 방법은 다음과 같습니다.

    1. Google Cloud 콘솔에서 사용자 환경에 대해 로그 탭을 엽니다.

    2. Composer 로그 > 인프라 > Cloud Storage 동기화 항목으로 이동하고 해당 환경의 모든 작업자 로그를 조사합니다. 새 DAG 파일을 업로드한 후 타임스탬프가 있는 최신 Syncing dags directory 로그를 찾습니다. 뒤에 Finished syncing 항목이 표시되면 DAG가 이 작업자에서 성공적으로 동기화된 것입니다.

  6. DAG 일시중지를 취소합니다.

환경에서 DAG 삭제

DAG를 삭제하려면 사용자 환경 버킷의 환경 /dags 폴더에서 DAG에 대해 Python .py 파일을 삭제합니다.

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 사용자 환경의 이름이 있는 행을 찾고 DAG 폴더 열에서 DAG 링크를 클릭합니다. 버킷 세부정보 페이지가 열립니다. 사용자 환경의 버킷에 있는 /dags 폴더의 콘텐츠를 보여줍니다.

  3. DAG 파일을 선택하고 삭제를 클릭하고 작업을 확인합니다.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION: 환경이 위치한 리전
  • DAG_FILE: DAG의 Python .py 파일

예:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Airflow UI에서 DAG 삭제

Airflow 웹 인터페이스에서 DAG의 메타데이터를 제거하려면 다음을 입력합니다.

Airflow UI

  1. 사용자 환경의 Airflow UI로 이동합니다.
  2. DAG에 대해 DAG 삭제를 클릭합니다.

gcloud

gcloud CLI에서 다음 명령어를 실행합니다.

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION: 환경이 위치한 리전
  • DAG_NAME: 삭제할 DAG의 이름

다음 단계