DAG 추가 및 업데이트(워크플로)

이 페이지에서는 사용자 환경의 스토리지 버킷을 확인하는 방법과 사용자 환경에서 DAG를 추가, 업데이트, 삭제하는 방법을 설명합니다.

Cloud Composer는 Cloud Storage를 사용하여 워크플로라고도 하는 Apache Airflow DAG를 저장합니다. 각 환경에는 연결된 Cloud Storage 버킷이 있습니다. Cloud Composer는 Cloud Storage 버킷의 DAG만 예약합니다.

시작하기 전에

  • Apache Airflow는 강력한 DAG 분리를 제공하지 않으므로 프로덕션 환경과 테스트 환경을 별도로 유지하여 DAG 방해를 방지하는 것이 좋습니다. 자세한 내용은 DAG 테스트를 참조하세요.
  • Cloud Composer 환경의 Cloud Storage 버킷에 플러그인을 추가 및 업데이트하려면 다음 권한이 필요합니다.
    • storage.objectAdmin을 사용하여 파일을 업로드합니다.
    • composer.environments.get을 사용하여 DAG 대상 버킷을 찾습니다. 이 권한이 없어도 Cloud Storage API 또는 gsutil을 사용할 수 있습니다.
  • DAG 변경사항은 3~5분 이내에 적용됩니다. Airflow 웹 인터페이스에서 태스크 상태를 볼 수 있습니다.

스토리지 버킷 이름 결정

환경과 연결된 스토리지 버킷의 이름을 지정하려면 다음 안내를 따르세요.

Console

  1. Cloud Console에서 환경 페이지를 엽니다.
    환경 페이지 열기
  2. 이름 열에서 환경 이름을 클릭하여 환경 세부정보 페이지를 엽니다.

    구성 탭에서 Cloud Storage 버킷의 이름이 DAG 폴더의 오른쪽에 표시됩니다.

  3. (선택사항) Cloud Storage에서 버킷을 보려면 버킷 이름을 클릭합니다.

gcloud

다음 명령어를 입력합니다.

gcloud composer environments describe ENVIRONMENT_NAME \
  --location LOCATION \
  --format="get(config.dagGcsPrefix)"

각 항목의 의미는 다음과 같습니다.

  • ENVIRONMENT_NAME은 환경 이름입니다.
  • LOCATION은 환경이 위치한 Compute Engine 리전입니다.
  • --format은 모든 환경 세부정보가 아닌 dagGcsPrefix 속성만을 지정하는 옵션입니다.

dagGcsPrefix 속성은 버킷 이름을 표시합니다.

gs://region-environment_name-random_id-bucket/

rest

사용자 인증 정보에 대한 자세한 내용은 Cloud Composer 인증을 참조하세요.

  1. projects.locations.environments.get메서드를 호출합니다.
  2. 환경 응답에서 config.dagGcsPrefix를 읽어보세요.

RPC

사용자 인증 정보에 대한 자세한 내용은 Cloud Composer 인증을 참조하세요.

  1. Environments.GetEnvironment 메서드를 호출합니다.
  2. 환경 응답에서 config.dag_gcs_prefix를 읽어보세요.

Python

google-auth 라이브러리를 사용하여 사용자 인증 정보를 가져오고 requests 라이브러리를 사용하여 REST API를 호출합니다.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

DAG 추가 또는 업데이트

DAG를 추가하거나 업데이트하려면 DAG의 Python .py 파일을 Cloud Storage 내 환경의 dags 폴더로 옮깁니다.

Console

  1. Cloud Console에서 환경 페이지를 엽니다.
    환경 페이지 열기
  2. 이름 열에서 환경 이름을 클릭하여 환경 세부정보 페이지를 엽니다.

    구성 탭에서 Cloud Storage 버킷의 이름이 DAG 폴더의 오른쪽에 표시됩니다.

  3. Cloud Storage에서 버킷을 보려면 버킷 이름을 클릭합니다.

    기본적으로 dags 폴더가 열립니다.

  4. 파일 업로드를 클릭하고 업로드할 DAG의 로컬 사본을 선택합니다.

  5. 파일을 dags 폴더에 업로드하려면 열기를 클릭합니다.

gcloud

DAG를 추가하거나 업데이트하려면 다음을 수행하세요.

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source LOCAL_FILE_TO_UPLOAD

각 항목의 의미는 다음과 같습니다.

  • ENVIRONMENT_NAME은 환경 이름입니다.
  • LOCATION은 환경이 위치한 Compute Engine 리전입니다.
  • LOCAL_FILE_TO_UPLOAD는 업로드할 DAG입니다.

활성 DAG 실행이 포함된 DAG 업데이트

활성 DAG 실행이 포함된 DAG를 업데이트하려면 다음 안내를 따르세요.

  • 현재 실행 중인 태스크는 원래 DAG 파일을 사용하여 완료됩니다.
  • 예약되었지만 아직 실행되지 않은 모든 태스크는 업데이트된 DAG 파일을 사용합니다.
  • 업데이트된 DAG 파일에 더 이상 포함되지 않는 모든 태스크는 삭제된 것으로 표시됩니다.

빈번한 일정으로 실행되는 DAG 업데이트

DAG 파일을 업로드한 후 Airflow가 이 파일을 로드하고 DAG를 업데이트하려면 시간이 걸립니다. DAG가 빈번한 일정으로 실행될 경우, DAG에 업데이트된 DAG 파일 버전이 사용되는지 확인해야 할 수 있습니다. 방법은 다음과 같습니다.

  1. Airflow UI에서 DAG를 일시중지합니다.
  2. 업데이트된 DAG 파일을 업로드합니다.
  3. Airflow UI에 업데이트가 표시될 때까지 기다립니다. 즉, DAG가 스케줄러에서 올바르게 파싱되었고 Airflow 데이터베이스에서 업데이트되었는지 확인합니다.

    Airflow UI에 업데이트된 DAG가 표시되어도 Airflow 작업자에 업데이트된 DAG 파일 버전이 포함되었다고 보장할 수 없습니다. 이것은 DAG 파일이 스케줄러 및 작업자에 대해 개별적으로 동기화되기 때문입니다.

  4. DAG 파일이 환경 내 모든 작업자와 동기화되었는지 확인하기 위해 대기 시간을 늘려야 할 수 있습니다. 동기화는 분당 여러 번 수행됩니다. 정상 환경에서 모든 작업자가 동기화되기 위해서는 약 20~30초 정도의 대기 시간이면 충분합니다.

  5. (선택사항) 모든 작업자에게 새 버전의 DAG 파일이 사용되도록 완전히 보장하기 위해서는 개별 작업자의 로그를 조사하세요. 방법은 다음과 같습니다.

    1. Console에서 해당 환경에 대한 로그 탭을 엽니다.
    2. Composer 로그 > 인프라 > Cloud Storage 동기화 항목으로 이동하고 해당 환경의 모든 작업자 로그를 조사합니다. 새 DAG 파일을 업로드한 후 타임스탬프가 있는 최신 Syncing dags directory 로그를 찾습니다. 뒤에 Finished syncing 항목이 표시되면 DAG가 이 작업자에서 성공적으로 동기화된 것입니다.
  6. DAG 일시중지를 취소합니다.

DAG 삭제

이 섹션에서는 DAG를 삭제하는 방법을 설명합니다.

환경에서 DAG 삭제

DAG를 삭제하려면 Cloud Storage 내 환경의 dags 폴더에서 Python .py 파일을 삭제합니다. DAG를 삭제해도 Airflow 웹 인터페이스에서 DAG 메타데이터는 삭제되지 않습니다.

Console

  1. Cloud Console의 환경 페이지로 이동합니다.
    환경 페이지 열기
  2. 이름 열에서 환경 이름을 클릭하여 환경 세부정보 페이지를 엽니다.

    구성 탭에서 Cloud Storage 버킷의 이름이 DAGs 폴더 오른쪽에 표시됩니다.

  3. Cloud Storage에서 버킷을 보려면 버킷 이름을 클릭합니다.

    기본적으로 dags 폴더가 열립니다.

  4. 삭제하려는 DAG 옆의 체크박스를 클릭합니다.

  5. Cloud Console 상단에서 삭제를 클릭합니다.

  6. 대화상자가 나타나면 확인을 클릭합니다.

gcloud

gcloud composer environments storage dags delete 
--environment ENVIRONMENT_NAME
--location LOCATION
DAG_NAME.py

각 항목의 의미는 다음과 같습니다.

  • ENVIRONMENT_NAME은 환경 이름입니다.
  • LOCATION은 환경이 위치한 Compute Engine 리전입니다.
  • DAG_NAME.py은 삭제할 DAG입니다.
  • Airflow 1.9.0: 삭제된 DAG의 메타데이터는 Airflow 웹 인터페이스에 계속 표시됩니다.

  • Airflow 1.10.0 이상: gcloud 도구를 사용하여 DAG 메타데이터를 삭제할 수 있습니다.

Airflow 웹 인터페이스에서 DAG 제거

Airflow 웹 인터페이스에서 DAG의 메타데이터를 제거하려면 다음을 입력합니다.

Airflow 1.10 CLI

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

Airflow 2.0 CLI

  gcloud beta composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

각 항목의 의미는 다음과 같습니다.

  • ENVIRONMENT_NAME은 환경 이름입니다.
  • LOCATION은 환경이 위치한 Compute Engine 리전입니다.
  • DAG_NAME은 삭제할 DAG의 이름입니다.

다음 단계