DAG 트리거

Cloud Composer 1 | Cloud Composer 2

이 페이지에서는 Cloud Composer 환경에서 DAG를 트리거하는 다양한 방법을 설명합니다.

Airflow는 두 가지 DAG 트리거 방법을 제공합니다.

  • 일정에 따라 트리거. DAG를 만들 때 일정을 지정합니다. Airflow는 지정된 예약 매개변수를 기반으로 DAG를 자동으로 트리거합니다.
  • 수동으로 트리거. Airflow CLI 명령어를 사용하거나 Airflow 웹 인터페이스 또는 Cloud Console에서 DAG를 수동으로 트리거할 수 있습니다.
  • 이벤트에 대한 응답으로 트리거. 이벤트에 대한 응답으로 DAG를 트리거하는 표준화된 방법은 센서 사용입니다. 또한 Cloud Functions 및 Airflow REST API를 사용하여 이벤트에 대한 응답으로 DAG를 트리거할 수 있습니다. 자세한 내용은 Cloud Functions로 DAG 트리거를 참조하세요.

일정에 따라 DAG 트리거

일정에 따라 DAG를 트리거하려면 다음 안내를 따르세요.

  1. 이 섹션의 뒷부분에 설명된 대로 DAG 파일에서 start_dateschedule_interval 매개변수를 지정합니다.
  2. 환경에 DAG 파일을 업로드합니다.

예약 매개변수 지정

DAG를 정의할 때 schedule_interval 매개변수에서 DAG를 실행할 빈도를 지정합니다. start_date 매개변수에 Airflow가 DAG 예약을 시작할 시간을 지정합니다. DAG의 태스크에 개별 시작 날짜를 지정하거나 모든 태스크에 대해 단일 시작 날짜를 지정할 수 있습니다. DAG의 태스크에 대한 최소 시작 날짜 및 예약 간격을 기준으로 Airflow가 DAG 실행을 예약합니다.

예약은 다음 방식으로 작동합니다. start_date가 전달된 다음 Airflow는 다음 schedule_interval이 될 때까지 기다립니다. 그런 후 이 예약 간격이 끝날 때 첫 번째 DAG가 수행되도록 예약합니다. 예를 들어 DAG가 매시간(schedule_interval은 1시간) 실행되도록 예약되었고 시작 날짜가 오늘 12:00이면 첫 번째 DAG 실행이 오늘 13:00에 수행됩니다.

다음 예시는 2021년 4월 5일 15:00부터 시작하여 매시간 시행되는 DAG를 보여줍니다. 이 예시에 사용된 매개변수를 사용하여 Airflow는 2021년 4월 5일 16:00에 첫 번째 DAG 실행이 수행되도록 예약합니다.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

with DAG(
    dag_id='example_dag_schedule',
    # At 15:00 on 5 April, 2021
    start_date=datetime(2021, 4, 5, 15, 0),
    # At minute 0 of every hour
    schedule_interval='0 * * * *') as dag:

    # Output the current date and time
    t1 = BashOperator(
        task_id='date',
        bash_command='date',
        dag=dag)

    t1

예약 매개변수에 대한 자세한 내용은 Airflow 문서에서 DAG 실행을 참조하세요.

추가 예약 매개변수 예시

다음 예약 매개변수 예시는 여러 매개변수 조합에서 예약이 어떻게 작동하는지를 보여줍니다.

  • start_datedatetime(2021, 4, 4, 16, 25)이고 schedule_interval30 16 * * *이면 첫 번째 DAG 실행이 2021년 4월 5일 16:30에 수행됩니다.
  • start_datedatetime(2021, 4, 4, 16, 35)이고 schedule_interval30 16 * * *이면 첫 번째 DAG 실행이 2021년 4월 6일 16:30에 수행됩니다. 시작 날짜가 2021년 4월 4일의 예약 간격 이후이기 때문에 DAG 실행이 2021년 4월 5일에 수행되지 않습니다. 대신 일정 간격이 2021년 4월 5일 16:35에 종료되므로, 다음 DAG 실행이 그 다음 날 16:30으로 예약됩니다.
  • start_datedatetime(2021, 4, 4)이고 schedule_interval@daily이면 첫 번째 DAG 실행이 2021년 4월 5일 00:00로 예약됩니다.
  • start_datedatetime(2021, 4, 4, 16, 30)이고 schedule_interval0 * * * *이면 첫 번째 DAG 실행이 2021년 4월 4일 18:00로 예약됩니다. 지정된 날짜 및 시간이 지나면 Airflow가 매시간 0분에 DAG 실행이 수행되도록 예약합니다. 이렇게 수행될 때 가장 가까운 시점은 17:00입니다. 이때 Airflow는 예약 간격의 끝, 즉 18:00에 DAG 실행이 수행되도록 예약합니다.

수동으로 DAG 트리거

DAG를 수동으로 트리거하면 Airflow가 DAG 실행을 수행합니다. 예를 들어 이미 일정에 따라 실행되는 DAG가 있고 이 DAG를 수동으로 트리거하면 Airflow는 DAG에 지정된 실제 일정과 관계없이 DAG를 한 번 실행합니다.

Airflow UI

Airflow 웹 인터페이스에서 DAG를 트리거하려면 다음 안내를 따르세요.

  1. Google Cloud Console에서 환경 페이지로 이동합니다.

환경으로 이동

  1. Airflow 웹 서버 열에서 해당 환경의 Airflow 링크를 따릅니다.

  2. 적절한 권한을 가진 Google 계정으로 로그인합니다.

  3. Airflow 웹 인터페이스의 DAG 페이지에 있는 해당 DAG의 링크 열에서 DAG 트리거 버튼을 클릭합니다.

  4. (선택사항) DAG 실행 구성을 지정합니다.

  5. 트리거를 클릭합니다.

gcloud

Airflow 1.10.*에서 trigger_dag Airflow CLI 명령어를 실행합니다.

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

Airflow 2에서 dags trigger Airflow CLI 명령어를 실행합니다.

  gcloud beta composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION: 환경이 위치한 Compute Engine 리전입니다.
  • DAG_ID를 DAG의 이름으로 바꿉니다.

Cloud Composer 환경에서 Airflow CLI 실행에 대한 자세한 내용은 Airflow CLI 명령어 실행을 참조하세요.

사용 가능한 Airflow CLI 명령어에 대한 자세한 내용은 gcloud composer environments run 명령어 참조를 확인하세요.

다음 단계