Cloud Composer 1에서 Apache Airflow DAG 실행(Google Cloud CLI)

Cloud Composer 1 | Cloud Composer 2

이 빠른 시작 가이드에서는 Cloud Composer 환경을 만들고 Cloud Composer 1에서 Apache Airflow DAG를 실행하는 방법을 보여줍니다.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud CLI를 설치합니다.
  3. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  6. Google Cloud CLI를 설치합니다.
  7. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  8. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  9. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  10. Cloud Composer API를 사용 설정합니다.

    gcloud services enable composer.googleapis.com
  11. 빠른 시작을 완료하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 다음 IAM 역할을 부여해 달라고 요청하세요.

    역할 부여에 대한 자세한 내용은 액세스 관리를 참조하세요.

    커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

환경 만들기

최신 Cloud Composer 1 버전으로 us-central1 리전에 example-environment라는 새 환경을 만듭니다.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.20.12-airflow-1.10.15

DAG 파일 만들기

Airflow DAG는 예약 및 실행하려는 태스크가 구성된 모음입니다. DAG는 표준 Python 파일에서 정의됩니다.

이 가이드에서는 quickstart.py 파일에 정의된 Airflow DAG 예시를 사용합니다. 이 파일의 Python 코드는 다음을 수행합니다.

  1. DAG, composer_sample_dag를 만듭니다. 이 DAG는 매일 실행됩니다.
  2. 태스크 하나, print_dag_run_conf를 실행합니다. 태스크는 bash 연산자를 사용하여 DAG 실행 구성을 출력합니다.

quickstart.py 파일의 사본을 로컬 머신에 저장합니다.

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

환경의 버킷에 DAG 파일 업로드

모든 Cloud Composer 환경에는 Cloud Storage 버킷이 연결되어 있습니다. Cloud Composer의 Airflow는 이 버킷의 /dags 폴더에 있는 DAG만 예약합니다.

DAG를 예약하려면 로컬 머신에서 환경의 /dags 폴더로 quickstart.py를 업로드합니다.

Google Cloud CLI로 quickstart.py를 업로드하려면 quickstart.py 파일이 있는 폴더에서 다음 명령어를 실행합니다.

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

DAG 보기

DAG 파일을 업로드하면 Airflow는 다음을 수행합니다.

  1. 업로드한 DAG 파일을 파싱합니다. DAG가 Airflow에서 사용 가능해지기까지 몇 분 정도 걸릴 수 있습니다.
  2. 사용 가능한 DAG 목록에 DAG를 추가합니다.
  3. DAG 파일에 제공한 일정에 따라 DAG를 실행합니다.

DAG UI에서 확인하여 DAG가 오류 없이 처리되고 Airflow에서 사용할 수 있는지 확인합니다. DAG UI는 Google Cloud 콘솔에서 DAG 정보를 볼 수 있는 Cloud Composer 인터페이스입니다. 또한 Cloud Composer는 기본 Airflow 웹 인터페이스인 Airflow UI에 대한 액세스를 제공합니다.

  1. Airflow가 이전에 업로드한 DAG 파일을 처리하고 첫 번째 DAG 실행을 완료할 때까지 약 5분 정도 기다립니다(뒷부분에서 설명).

  2. Google Cloud CLI에서 다음 명령어를 실행합니다. 이 명령어는 환경의 DAG를 나열하는 dags list Airflow CLI 명령어를 실행합니다.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. 명령어 출력에 composer_quickstart DAG가 나열되어 있는지 확인합니다.

    출력 예시:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

DAG 실행 세부정보 보기

DAG를 한 번 실행하는 것을 DAG 실행이라고 합니다. DAG 파일의 시작 날짜가 어제로 설정되어 있으므로 Airflow는 예시 DAG의 DAG 실행을 즉시 실행합니다. Airflow는 이러한 방식으로 지정된 DAG의 일정을 따라잡습니다.

예시 DAG에는 콘솔에서 echo 명령어를 실행하는 print_dag_run_conf 태스크 하나가 포함되어 있습니다. 이 명령어는 DAG(DAG 실행의 숫자 식별자)에 대한 메타 정보를 출력합니다.

Google Cloud CLI에서 다음 명령어를 실행합니다. 이 명령어는 composer_quickstart DAG의 DAG 실행을 나열합니다.

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

출력 예시:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI는 태스크 로그를 보는 명령어를 제공하지 않습니다. Cloud Composer DAG UI, Airflow UI 또는 Cloud Logging 등 다른 방법을 사용하여 Airflow 태스크 로그를 확인할 수 있습니다. 이 가이드에서는 특정 DAG 실행의 로그에 대한 Cloud Logging을 쿼리하는 방법을 보여줍니다.

Google Cloud CLI에서 다음 명령어를 실행합니다. 이 명령어는 composer_quickstart DAG의 특정 DAG 실행에 대한 Cloud Logging의 로그를 읽습니다. --format 인수는 로그 메시지의 텍스트만 표시하도록 출력 형식을 지정합니다.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

다음과 같이 바꿉니다.

  • RUN_ID를 이전에 실행한 tasks states-for-dag-run 명령어 출력의 run_id 값으로 바꿉니다. 예를 들면 2024-02-17T15:38:38.969307+00:00입니다.

출력 예시:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

삭제

이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 Google Cloud 프로젝트를 삭제하면 됩니다.

이 튜토리얼에서 사용된 리소스를 삭제합니다.

  1. Cloud Composer 환경을 삭제합니다.

    1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

      환경으로 이동

    2. example-environment를 선택하고 삭제를 클릭합니다.

    3. 환경이 삭제될 때까지 기다립니다.

  2. 환경의 버킷을 삭제합니다. Cloud Composer 환경을 삭제하면 버킷이 삭제되지 않습니다.

    1. Google Cloud 콘솔에서 스토리지 > 브라우저 페이지로 이동합니다.

      스토리지 > 브라우저로 이동

    2. 해당 환경의 버킷을 선택하고 삭제를 클릭합니다. 예를 들어 이 버킷의 이름을 us-central1-example-environ-c1616fe8-bucket으로 지정할 수 있습니다.

  3. 해당 환경의 Redis 큐의 영구 디스크를 삭제합니다. Cloud Composer 환경을 삭제해도 영구 디스크는 삭제되지 않습니다.

    1. Google Cloud 콘솔에서 Compute Engine > 디스크로 이동합니다.

      디스크로 이동

    2. 환경의 Redis 큐 영구 디스크를 선택하고 삭제를 클릭합니다.

      예를 들어 이 디스크의 이름을 gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee로 지정할 수 있습니다. Cloud Composer 1의 디스크 크기는 항상 Standard persistent disk 유형이며 크기는 2GB입니다.

다음 단계