Cloud Composer 1에서 Apache Airflow DAG 실행

Cloud Composer 1 | Cloud Composer 2

이 페이지에서는 Cloud Composer 환경을 만들고 Cloud Composer에서 Apache Airflow DAG를 실행하는 방법을 설명합니다.

Airflow를 처음 사용할 경우 Airflow 개념, 객체, 사용법에 대한 상세 설명은 이 튜토리얼을 참조하세요.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  4. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  5. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  6. Cloud Composer API 사용 설정

    API 사용 설정

환경 만들기

콘솔

  1. Google Cloud 콘솔에서 환경 만들기 페이지로 이동합니다.

    환경 만들기로 이동

  2. 이름 필드에 example-environment을 입력합니다.

  3. 위치 드롭다운 목록에서 Cloud Composer 환경의 리전을 선택합니다. 리전 선택에 대한 상세 설명은 사용 가능한 리전을 참조하세요.

  4. 다른 환경 구성 옵션에 제공된 기본값을 사용합니다.

  5. 환경을 만들려면 만들기를 클릭합니다.

  6. 환경이 만들어질 때까지 기다립니다. 완료되면 환경 이름 옆에 녹색 체크표시가 나타납니다.

gcloud

Cloud Composer 서비스 에이전트 계정을 환경 서비스 계정의 새 주 구성원으로 추가하고 Cloud Composer v2 API 서비스 에이전트 확장(roles/composer.ServiceAgentV2Ext) 역할을 부여합니다.

기본적으로 환경은 기본 Compute Engine 서비스 계정을 사용합니다.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

새 환경을 만듭니다.

gcloud composer environments create ENVIRONMENT_NAME \
  --location LOCATION \
  --image-version IMAGE_VERSION

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다. 이 빠른 시작에서는 example-environment가 사용됩니다.
  • LOCATION을 Cloud Composer 환경의 리전으로 바꿉니다. 리전 선택에 대한 상세 설명은 사용 가능한 리전을 참조하세요.
  • IMAGE_VERSIONCloud Composer 이미지 이름으로 바꿉니다. 이 가이드에서는 composer-1.20.12-airflow-1.10.15를 사용하여 최신 Cloud Composer 이미지가 포함된 환경을 만듭니다.

예:

gcloud composer environments create example-environment \
  --location us-central1 \
  --image-version composer-1.20.12-airflow-1.10.15

Terraform

Terraform을 사용하여 이 환경을 구성하려면 Terraform 구성에 다음 리소스 블록을 추가하고 terraform apply를 실행합니다.

이 리소스 블록을 사용하려면 Terraform이 사용하는 서비스 계정에 composer.environments.create 권한이 사용 설정된 역할이 있어야 합니다. Terraform의 서비스 계정에 대한 자세한 내용은 Google 제공업체 구성 참조를 확인하세요.

Terraform을 사용하여 Cloud Composer 환경을 만드는 방법에 대한 상세 설명은 Terraform 문서를 참조하세요.

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다. 이 빠른 시작에서는 example-environment가 사용됩니다.

  • LOCATION을 Cloud Composer 환경의 리전으로 바꿉니다. 리전 선택에 대한 상세 설명은 사용 가능한 리전을 참조하세요.

  • IMAGE_VERSIONCloud Composer 이미지 이름으로 바꿉니다. 이 가이드에서는 composer-1.20.12-airflow-1.10.15를 사용하여 최신 Cloud Composer 이미지가 포함된 환경을 만듭니다.

예:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-1.20.12-airflow-1.10.15"
    }
  }

}

환경 세부정보 보기

환경 생성이 완료되면 Cloud Composer 버전, Airflow 웹 인터페이스의 URL, Cloud Storage의 DAG 폴더와 같은 환경 정보를 볼 수 있습니다.

환경 정보를 보려면 다음 안내를 따르세요.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 세부정보 페이지를 보려면 해당 환경 이름 example-environment를 클릭합니다.

DAG 만들기

Airflow DAG는 예약 및 실행하려는 태스크가 구성된 모음입니다. DAG는 표준 Python 파일에서 정의됩니다.

quickstart.py의 Python 코드:

  1. DAG, composer_sample_dag를 만듭니다. DAG는 하루에 한 번 실행됩니다.
  2. 태스크 하나, print_dag_run_conf를 실행합니다. 태스크는 bash 연산자를 사용하여 DAG 실행 구성을 출력합니다.

DAG를 만들려면 로컬 머신에 quickstart.py 파일의 복사본을 만듭니다.

Airflow 1

import datetime

import airflow
from airflow.operators import bash_operator

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with airflow.DAG(
    "composer_sample_dag",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Airflow 2

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Cloud Storage에 DAG 업로드

Cloud Composer는 환경의 Cloud Storage 버킷의 /dags 폴더에 있는 DAG만 예약합니다.

DAG를 예약하려면 로컬 머신에서 환경의 /dags 폴더로 quickstart.py를 업로드합니다.

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. /dags 폴더를 열려면 example-environmentDAG 폴더 링크를 팔로우합니다.

  3. 버킷 세부정보 페이지에서 파일 업로드를 클릭한 후 quickstart.py의 로컬 복사본을 선택합니다.

  4. 파일을 업로드하려면 열기를 클릭합니다.

    DAG를 업로드하면 Cloud Composer가 DAG를 Airflow에 추가하고 즉시 DAG 실행을 예약합니다. DAG가 Airflow 웹 인터페이스에 표시되는 데 몇 분 정도 걸릴 수 있습니다.

gcloud

quickstart.pygcloud로 업로드하려면 다음 명령어를 실행합니다.

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

Airflow UI에서 DAG 보기

각 Cloud Composer 환경에는 Airflow 웹 인터페이스를 실행하는 웹 서버가 있습니다. Airflow 웹 인터페이스에서 DAG를 관리할 수 있습니다.

Airflow 웹 인터페이스에서 DAG를 보려면 다음 안내를 따르세요.

Airflow 1

  1. Google Cloud Console에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. Airflow 웹 인터페이스를 열려면 example-environmentAirflow 링크를 클릭합니다. Airflow UI가 새 브라우저 창에서 열립니다.

  3. Airflow 툴바에서 DAG 페이지로 이동합니다.

  4. DAG 세부정보 페이지를 열려면 composer_sample_dag을 클릭합니다.

    Airflow UI의 DAG 페이지
    그림 1. Airflow UI의 DAG 페이지(확대하려면 클릭)

    DAG 페이지에 워크플로의 태스크와 종속 항목을 그래픽으로 나타낸 트리 보기가 표시됩니다.

    composer_sample_dags DAG의 트리 보기
    그림 2. composer_sample_dags DAG의 트리 보기

Airflow 2

  1. Google Cloud Console에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. Airflow 웹 인터페이스를 열려면 example-environmentAirflow 링크를 클릭합니다. Airflow UI가 새 브라우저 창에서 열립니다.

  3. Airflow 툴바에서 DAG 페이지로 이동합니다.

  4. DAG 세부정보 페이지를 열려면 composer_sample_dag을 클릭합니다.

    Airflow UI의 DAG 페이지
    그림 1. Airflow UI의 DAG 페이지(확대하려면 클릭)

    DAG 페이지에 워크플로의 태스크와 종속 항목을 그래픽으로 나타낸 트리 보기가 표시됩니다.

    composer_sample_dags DAG의 트리 보기
    그림 2. composer_sample_dags DAG의 트리 보기

Airflow 로그에서 태스크 인스턴스 세부정보 보기

예약한 DAG에는 print_dag_run_conf 태스크가 포함됩니다. 이 태스크는 태스크 인스턴스의 Airflow 로그에 표시되는 DAG 실행 구성을 출력합니다.

태스크 인스턴스 세부정보를 보려면 다음 안내를 따르세요.

Airflow 1

  1. Airflow 웹 인터페이스의 DAG 트리 뷰에서 그래프 뷰를 클릭합니다.

    print_dag_run_conf 태스크 위에 마우스 포인터를 놓으면 해당 태스크의 상태가 표시됩니다.

    composer_sample_dags DAG의 트리 보기
    그림 3. print_dag_run_conf 태스크 상태
  2. print_dag_run_conf 태스크를 클릭합니다.

    태스크 인스턴스 컨텍스트 메뉴에서 메타데이터를 가져와 일부 작업을 수행할 수 있습니다.

    composer_sample_dags 태스크의 태스크 인스턴스 컨텍스트 메뉴
    그림 4. composer_sample_dags 태스크의 태스크 인스턴스 컨텍스트 메뉴
  3. 태스크 인스턴스 컨텍스트 메뉴에서 로그 보기를 클릭합니다.

  4. 로그에서 Running: ['bash'를 찾아 bash 연산자의 출력을 확인합니다.

    Bash 연산자 로그 출력
    그림 5. Bash 연산자 로그 출력

Airflow 2

  1. Airflow 웹 인터페이스의 DAG 트리 뷰에서 그래프 뷰를 클릭합니다.

    print_dag_run_conf 태스크 위에 마우스 포인터를 놓으면 해당 태스크의 상태가 표시됩니다.

    composer_sample_dags DAG의 트리 보기
    그림 3. print_dag_run_conf 태스크 상태
  2. print_dag_run_conf 태스크를 클릭합니다.

    태스크 인스턴스 컨텍스트 메뉴에서 메타데이터를 가져와 일부 작업을 수행할 수 있습니다.

    composer_sample_dags 태스크의 태스크 인스턴스 컨텍스트 메뉴
    그림 4. composer_sample_dags 태스크의 태스크 인스턴스 컨텍스트 메뉴
  3. 태스크 인스턴스 컨텍스트 메뉴에서 로그를 클릭합니다.

  4. 로그에서 Running command: ['bash'를 찾아 bash 연산자의 출력을 확인합니다.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

삭제

이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 다음 단계를 수행합니다.

이 튜토리얼에서 사용된 리소스를 삭제합니다.

  1. Cloud Composer 환경을 삭제합니다.

    1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

      환경으로 이동

    2. example-environment를 선택하고 삭제를 클릭합니다.

    3. 환경이 삭제될 때까지 기다립니다.

  2. 환경의 버킷을 삭제합니다. Cloud Composer 환경을 삭제하면 버킷이 삭제되지 않습니다.

    1. Google Cloud 콘솔에서 스토리지 > 브라우저 페이지로 이동합니다.

      스토리지 > 브라우저로 이동

    2. 해당 환경의 버킷을 선택하고 삭제를 클릭합니다. 예를 들어 이 버킷의 이름을 us-central1-example-environ-c1616fe8-bucket으로 지정할 수 있습니다.

  3. 해당 환경의 Redis 큐의 영구 디스크를 삭제합니다. Cloud Composer 환경을 삭제해도 영구 디스크는 삭제되지 않습니다.

    1. Google Cloud 콘솔에서 Compute Engine > 디스크로 이동합니다.

      디스크로 이동

    2. 환경의 Redis 큐 영구 디스크를 선택하고 삭제를 클릭합니다.

      예를 들어 이 디스크의 이름을 gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee로 지정할 수 있습니다. Cloud Composer 1의 디스크 크기는 항상 Standard persistent disk 유형이며 크기는 2GB입니다.

다음 단계