Cloud Composer 3에서 Apache Airflow DAG 실행

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 빠른 시작 가이드에서는 Cloud Composer 환경을 만들고 Cloud Composer 3에서 Apache Airflow DAG를 실행하는 방법을 보여줍니다.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  6. Enable the Cloud Composer API.

    Enable the API

  7. 빠른 시작을 완료하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.

    역할 부여에 대한 자세한 내용은 액세스 관리를 참조하세요.

    커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

환경 만들기

  1. Google Cloud 콘솔에서 환경 만들기 페이지로 이동합니다.

    환경 만들기로 이동

  2. 이름 필드에 example-environment을 입력합니다.

  3. 위치 드롭다운 목록에서 Cloud Composer 환경의 리전을 선택합니다. 이 가이드에서는 us-central1 리전을 사용합니다.

  4. 다른 환경 구성 옵션에 제공된 기본값을 사용합니다.

  5. 만들기를 클릭하고 환경이 생성될 때까지 기다립니다.

  6. 완료되면 환경 이름 옆에 녹색 체크표시가 나타납니다.

DAG 파일 만들기

Airflow DAG는 예약 및 실행하려는 태스크가 구성된 모음입니다. DAG는 표준 Python 파일에서 정의됩니다.

이 가이드에서는 quickstart.py 파일에 정의된 Airflow DAG 예시를 사용합니다. 이 파일의 Python 코드는 다음을 수행합니다.

  1. DAG, composer_sample_dag를 만듭니다. 이 DAG는 매일 실행됩니다.
  2. 태스크 하나, print_dag_run_conf를 실행합니다. 태스크는 bash 연산자를 사용하여 DAG 실행 구성을 출력합니다.

quickstart.py 파일의 사본을 로컬 머신에 저장합니다.

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

환경의 버킷에 DAG 파일 업로드

모든 Cloud Composer 환경에는 Cloud Storage 버킷이 연결되어 있습니다. Cloud Composer의 Airflow는 이 버킷의 /dags 폴더에 있는 DAG만 예약합니다.

DAG를 예약하려면 로컬 머신에서 환경의 /dags 폴더로 quickstart.py를 업로드합니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름 example-environment를 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. DAG 폴더 열기를 클릭합니다. 버킷 세부정보 페이지가 열립니다.

  4. 파일 업로드를 클릭한 후 quickstart.py의 복사본을 선택합니다.

  5. 파일을 업로드하려면 열기를 클릭합니다.

DAG 보기

DAG 파일을 업로드하면 Airflow는 다음을 수행합니다.

  1. 업로드한 DAG 파일을 파싱합니다. DAG가 Airflow에서 사용 가능해지기까지 몇 분 정도 걸릴 수 있습니다.
  2. 사용 가능한 DAG 목록에 DAG를 추가합니다.
  3. DAG 파일에 제공한 일정에 따라 DAG를 실행합니다.

DAG UI에서 확인하여 DAG가 오류 없이 처리되고 Airflow에서 사용할 수 있는지 확인합니다. DAG UI는 Google Cloud 콘솔에서 DAG 정보를 볼 수 있는 Cloud Composer 인터페이스입니다. 또한 Cloud Composer는 기본 Airflow 웹 인터페이스인 Airflow UI에 대한 액세스를 제공합니다.

  1. Airflow가 이전에 업로드한 DAG 파일을 처리하고 첫 번째 DAG 실행을 완료할 때까지 약 5분 정도 기다립니다(뒷부분에서 설명).

  2. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  3. 환경 목록에서 환경 이름 example-environment를 클릭합니다. 환경 세부정보 페이지가 열립니다.

  4. DAG 탭으로 이동합니다.

  5. composer_quickstart DAG가 DAG 목록에 있는지 확인합니다.

    DAG 목록에는 상태 및 일정과 같은 추가 정보와 함께 composer_quickstart DAG가 표시됩니다.
    그림 1. DAG 목록에는 composer_quickstart DAG가 표시됩니다(확대하려면 클릭).

DAG 실행 세부정보 보기

DAG를 한 번 실행하는 것을 DAG 실행이라고 합니다. DAG 파일의 시작 날짜가 어제로 설정되어 있으므로 Airflow는 예시 DAG의 DAG 실행을 즉시 실행합니다. Airflow는 이러한 방식으로 지정된 DAG의 일정을 따라잡습니다.

예시 DAG에는 콘솔에서 echo 명령어를 실행하는 print_dag_run_conf 태스크 하나가 포함되어 있습니다. 이 명령어는 DAG(DAG 실행의 숫자 식별자)에 대한 메타 정보를 출력합니다.

  1. DAG 탭에서 composer_quickstart를 클릭합니다. DAG의 실행 탭이 열립니다.

  2. DAG 실행 목록에서 첫 번째 항목을 클릭합니다.

    DAG 실행 목록에 최근 DAG 실행(실행 날짜 및 상태)이 표시됩니다.
    그림 2. composer_quickstart DAG의 DAG 실행 목록(확대하려면 클릭)
  3. 예시 DAG의 개별 태스크에 대한 정보를 자세히 보여주는 DAG 실행 세부정보가 표시됩니다.

    print_dag_run_conf 항목, 시작 시간, 종료 시간, 기간이 있는 태스크 목록
    그림 3. DAG 실행에서 실행된 태스크 목록(확대하려면 클릭)
  4. DAG 실행 로그 섹션에는 DAG 실행의 모든 태스크 로그가 나열됩니다. 로그에서 echo 명령어의 출력을 볼 수 있습니다.

    태스크의 로그 항목입니다. 하나는 출력이고 다른 하나는 식별자를 나열합니다.
    그림 4. print_dag_run_conf 태스크 로그(확대하려면 클릭)

삭제

이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 다음 단계를 수행합니다.

이 튜토리얼에서 사용된 리소스를 삭제합니다.

  1. Cloud Composer 환경을 삭제합니다.

    1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

      환경으로 이동

    2. example-environment를 선택하고 삭제를 클릭합니다.

    3. 환경이 삭제될 때까지 기다립니다.

  2. 환경의 버킷을 삭제합니다. Cloud Composer 환경을 삭제하면 버킷이 삭제되지 않습니다.

    1. Google Cloud 콘솔에서 스토리지 > 브라우저 페이지로 이동합니다.

      스토리지 > 브라우저로 이동

    2. 해당 환경의 버킷을 선택하고 삭제를 클릭합니다. 예를 들어 이 버킷의 이름을 us-central1-example-environ-c1616fe8-bucket으로 지정할 수 있습니다.

  3. 해당 환경의 Redis 큐의 영구 디스크를 삭제합니다. Cloud Composer 환경을 삭제해도 영구 디스크는 삭제되지 않습니다.

    1. Google Cloud 콘솔에서 Compute Engine > 디스크로 이동합니다.

      디스크로 이동

    2. 환경의 Redis 큐 영구 디스크를 선택하고 삭제를 클릭합니다.

      예를 들어 이 디스크의 이름을 gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee로 지정할 수 있습니다. Cloud Composer 1의 디스크 크기는 항상 Standard persistent disk 유형이며 크기는 2GB입니다.

다음 단계