DAG(워크플로) 작성

이 가이드에서는 Cloud Composer 환경에서 실행되는 Apache Airflow Directed Acyclic Graph(DAG)를 작성하는 방법을 보여줍니다.

DAG 구성

Airflow DAG는 Python 파일에 정의되며 DAG 정의, 연산자, 연산자 관계와 같은 구성요소로 이루어져 있습니다. 다음 코드 스니펫은 컨텍스트 외부에 있는 각 구성요소의 예를 보여줍니다.

  1. DAG 정의

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:
  2. 수행할 작업을 설명하는 연산자. 연산자의 인스턴스화를 태스크라고 합니다.

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')
  3. 작업을 완료해야 하는 순서를 설명하는 연산자 관계

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

다음 워크플로는 완전히 작동하는 예시이며 hello_python 태스크와 goodbye_bash 태스크라는 태스크 두 개로 구성됩니다.

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow DAG를 정의하는 방법에 대한 자세한 내용은 Airflow 가이드Airflow 개념을 참조하세요.

연산자

다음 예에서는 많이 사용되는 몇 가지 Airflow 연산자를 보여줍니다. Airflow 연산자에 대한 신뢰할 수 있는 참고 자료가 필요하면 Apache Airflow API 참조를 확인하거나 corecontrib 연산자의 소스 코드를 찾아보세요.

BashOperator

명령줄 프로그램을 실행하려면 BashOperator를 사용합니다.

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer는 작업자의 Bash 스크립트에서 제공한 명령어를 실행합니다. 작업자는 Debian 기반 Docker 컨테이너이며 여러 패키지를 포함합니다.

PythonOperator

임의의 Python 코드를 실행하려면 PythonOperator를 사용합니다. Cloud Composer는 다음과 같은 패키지가 포함된 컨테이너에서 Python 코드를 실행합니다.

추가 Python 패키지를 설치하려면 Python 종속 항목 설치를 참조하세요.

Google Cloud 연산자

Google Cloud 제품을 사용하는 태스크를 실행하려면 Google Cloud Airflow 연산자를 사용합니다. Cloud Composer는 환경의 프로젝트에 대한 Airflow 연결을 자동으로 구성합니다.

EmailOperator

DAG에서 이메일을 보내려면 EmailOperator를 사용합니다. Cloud Composer 환경에서 이메일을 보내려면 SendGrid를 사용하도록 환경을 구성해야 합니다.

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

알림

DAG의 연산자가 실패할 때 이메일 알림을 보내려면 email_on_failureTrue로 설정합니다. Cloud Composer 환경에서 이메일 알림을 보내려면 SendGrid를 사용하도록 환경을 구성해야 합니다.

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

가이드라인

  1. 중첩 디렉토리에 있는 DAG의 ZIP 보관 파일에 커스텀 Python 라이브러리를 배치합니다. DAG 디렉터리의 최상위 수준에 라이브러리를 배치하지 마세요.

    Airflow가 dags/ 폴더를 검색할 때는 DAG 폴더의 최상위 수준에 있는 Python 모듈과 최상위 dags/ 폴더에도 있는 ZIP 보관 파일의 최상위 수준에 있는 Python 모듈에서만 DAG를 확인합니다. Airflow는 airflowDAG 하위 문자열을 모두 포함하지 않는 ZIP 보관 파일에서 Python 모듈을 발견하면 ZIP 보관 파일 처리를 중지합니다. Airflow는 해당 지점까지 발견된 DAG만 반환합니다.

  2. 내결함성을 위해 동일한 Python 모듈에 여러 DAG 객체를 정의하지 않습니다.

  3. subDAG를 최상위 객체로 정의하지 마세요.

    일반적으로 Airflow는 dags/ 디렉터리에서 모듈의 전역 네임스페이스에 있는 DAG 객체를 최상위 DAG로 선택합니다. 최상위 객체로 정의된 모든 하위 DAG는 하위 DAG를 포함하는 다른 DAG의 일정은 물론 자체 일정에도 실행됩니다.

  4. DAG에서 시간을 파싱하는 데 필요한 파일을 data/ 디렉터리가 아닌 dags/ 디렉터리에 배치합니다. data/ 디렉터리는 웹 서버에 마운트되지 않습니다.

FAQ

DAG 만들기

연산자 사용

다음 단계