DAG(워크플로) 작성

이 가이드에서는 Cloud Composer 환경에서 실행되는 Apache Airflow Directed Acyclic Graph(DAG)를 작성하는 방법을 보여줍니다.

DAG 구성

Airflow DAG는 Python 파일에 정의되며 DAG 정의, 연산자, 연산자 관계와 같은 구성요소로 이루어져 있습니다. 다음 코드 스니펫은 컨텍스트 외부에 있는 각 구성요소의 예를 보여줍니다.

  1. DAG 정의

    Airflow 2

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

    Airflow 1

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

  2. 수행할 작업을 설명하는 연산자. 연산자의 인스턴스화를 태스크라고 합니다.

    Airflow 2

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

    Airflow 1

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

  3. 작업을 완료해야 하는 순서를 설명하는 태스크 관계

    Airflow 2

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

    Airflow 1

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

다음 워크플로는 완전히 작동하는 예시이며 hello_python 태스크와 goodbye_bash 태스크라는 태스크 두 개로 구성됩니다.

Airflow 2

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow DAG를 정의하는 방법에 대한 자세한 내용은 Airflow 가이드Airflow 개념을 참조하세요.

연산자

다음 예에서는 많이 사용되는 몇 가지 Airflow 연산자를 보여줍니다. Airflow 연산자에 대한 신뢰할 수 있는 참고 자료가 필요하면 Apache Airflow API 참조를 확인하거나 core, contrib, providers 연산자의 소스 코드를 찾아보세요.

BashOperator

명령줄 프로그램을 실행하려면 BashOperator를 사용합니다.

Airflow 2

from airflow.operators import bash
    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f'bq ls {bq_dataset_name} || bq mk {bq_dataset_name}')

Airflow 1

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer는 작업자의 Bash 스크립트에서 제공한 명령어를 실행합니다. 작업자는 Debian 기반 Docker 컨테이너이며 여러 패키지를 포함합니다.

PythonOperator

임의의 Python 코드를 실행하려면 PythonOperator를 사용합니다.

Cloud Composer는 컨테이너에서 해당 환경에 사용되는 Cloud Composer 이미지 버전을 포함하는 Python 코드를 실행합니다.

추가 Python 패키지를 설치하려면 Python 종속 항목 설치를 참조하세요.

Google Cloud 연산자

Google Cloud 제품을 사용하는 태스크를 실행하려면 Google Cloud Airflow 연산자를 사용합니다. Cloud Composer는 환경의 프로젝트에 대한 Airflow 연결을 자동으로 구성합니다.

EmailOperator

DAG에서 이메일을 보내려면 EmailOperator를 사용합니다. Cloud Composer 환경에서 이메일을 보내려면 SendGrid를 사용하도록 환경을 구성해야 합니다.

Airflow 2

from airflow.operators import email
    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Airflow 1

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

알림

DAG의 연산자가 실패할 때 이메일 알림을 보내려면 email_on_failureTrue로 설정합니다. Cloud Composer 환경에서 이메일 알림을 보내려면 SendGrid를 사용하도록 환경을 구성해야 합니다.

Airflow 2

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': project_id
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

Airflow 1

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

가이드라인

  1. 중첩 디렉토리에 있는 DAG의 ZIP 보관 파일에 커스텀 Python 라이브러리를 배치합니다. DAG 디렉터리의 최상위 수준에 라이브러리를 배치하지 마세요.

    Airflow가 dags/ 폴더를 검색할 때는 DAG 폴더의 최상위 수준에 있는 Python 모듈과 최상위 dags/ 폴더에도 있는 ZIP 보관 파일의 최상위 수준에 있는 Python 모듈에서만 DAG를 확인합니다. Airflow는 airflowDAG 하위 문자열을 모두 포함하지 않는 ZIP 보관 파일에서 Python 모듈을 발견하면 ZIP 보관 파일 처리를 중지합니다. Airflow는 해당 지점까지 발견된 DAG만 반환합니다.

  2. 내결함성을 위해 동일한 Python 모듈에 여러 DAG 객체를 정의하지 않습니다.

  3. subDAG를 최상위 객체로 정의하지 마세요.

    일반적으로 Airflow는 dags/ 디렉터리에서 모듈의 전역 네임스페이스에 있는 DAG 객체를 최상위 DAG로 선택합니다. 최상위 객체로 정의된 모든 하위 DAG는 하위 DAG를 포함하는 다른 DAG의 일정은 물론 자체 일정에도 실행됩니다.

  4. DAG에서 시간을 파싱하는 데 필요한 파일을 data/ 디렉터리가 아닌 dags/ 디렉터리에 배치합니다. data/ 디렉터리는 웹 서버에 마운트되지 않습니다.

DAG 작성 FAQ

여러 DAG에서 동일하거나 유사한 태스크를 실행하려 할 경우 코드 반복을 최소화하는 방법은 무엇인가요?

코드 반복을 최소화하려면 래퍼와 라이브러리를 정의하는 것이 좋습니다.

DAG 파일 간에 코드를 재사용하려면 어떻게 해야 하나요?

유틸리티 함수를 로컬 Python 라이브러리에 배치하고 함수를 가져옵니다. 환경 버킷의 dags/ 폴더에 있는 모든 DAG에서 함수를 참조할 수 있습니다.

다른 정의가 발생할 위험을 최소화하려면 어떻게 해야 하나요?

예를 들어 원시 데이터를 수익 측정항목으로 집계하려는 두 팀이 있습니다. 두 팀은 목표는 동일하지만 약간 다른 2가지 작업을 작성합니다. DAG 구현자가 집계되는 수익 정의를 명확하게 설명하도록 수익 데이터와 함께 사용할 라이브러리를 정의하세요.

DAG 간의 종속성은 어떻게 설정하나요?

이는 종속성을 정의하고자 하는 방식에 따라 달라집니다.

DAG 두 개(DAG A 및 DAG B)가 있고 DAG B가 DAG A 다음에 트리거되도록 하려면 DAG A 끝 부분에 TriggerDagRunOperator를 배치하면 됩니다.

DAG B가 DAG A에서 생성하는 아티팩트(예: Pub/Sub 메시지)에만 의존할 경우 센서가 보다 효율적으로 작동할 수 있습니다.

DAG B가 DAG A와 긴밀하게 통합되면 두 DAG를 하나의 DAG로 병합할 수 있습니다.

고유한 실행 ID를 DAG 및 해당 작업에 전달하려면 어떻게 해야 하나요?

예를 들어 Dataproc 클러스터 이름과 파일 경로를 전달하려고 합니다.

PythonOperatorstr(uuid.uuid4())를 반환하면 임의의 고유 ID를 생성할 수 있습니다. 그러면 ID가 XComs에 배치되므로 템플릿 필드를 통해 다른 연산자에서 이 ID를 참조할 수 있습니다.

uuid를 생성하기 전에 DagRun 관련 ID가 더 중요한지 고려합니다. 또한 매크로를 사용하여 Jinja 대체에서 이러한 ID를 참조할 수도 있습니다.

DAG에서 태스크를 분리하려면 어떻게 해야 하나요?

각 태스크는 멱등한 작업 단위여야 합니다. 따라서 다단계 워크플로를 단일 태스크 내에 캡슐화해서는 안 됩니다(예: PythonOperator에서 복잡한 프로그램 실행).

워크플로 정의 코드를 재사용하기 위한 Airflow 전용 메커니즘은 SubDagOperator입니다. 하지만 Cloud Composer에서 이 연산자를 사용할 때는 주의사항이 있습니다.

여러 소스의 데이터를 집계하려면 단일 DAG에 여러 태스크를 정의해야 하나요?

예를 들어 원시 데이터가 있는 여러 테이블이 있고 각 테이블에 대한 일일 집계를 만들려고 합니다. 태스크는 서로 종속되지 않습니다. 테이블마다 태스크와 DAG를 하나씩 만들어야 하나요? 아니면 전체 DAG 하나를 만들어야 하나요?

각 태스크가 동일한 DAG 수준 속성(예: schedule_interval)을 공유해도 괜찮다면 단일 DAG에 여러 태스크를 정의하는 것이 적합합니다. 그렇지 않을 경우 코드 반복이 최소화되도록 단일 Python 모듈에서 여러 DAG를 모듈 globals()에 배치하여 생성할 수 있습니다

DAG에서 실행되는 동시 태스크 수를 제한하려면 어떻게 해야 하나요?

예를 들어 API 사용량 한도 또는 할당량을 초과하지 않거나 동시 프로세스를 너무 많이 실행하고 싶지 않습니다.

Airflow 웹 UI에서 Airflow 풀을 정의하고 태스크를 DAG의 기존 풀과 연결할 수 있습니다.

연산자 사용 FAQ

DockerOperator를 사용해야 하나요?

DockerOperator는 원격 Docker 설치에서 컨테이너를 실행하는 데 사용하는 경우 외에는 환경의 클러스터 어디서도 사용하지 않는 것이 좋습니다. Cloud Composer 환경에서는 연산자가 Docker 데몬에 액세스할 수 없습니다.

대신 KubernetesPodOperator 또는 GKEPodOperator를 사용합니다. 이러한 운영자는 각각 Kubernetes 또는 GKE 클러스터에서 Kubernetes Pod를 시작할 수 있습니다. 환경의 클러스터에서 Pod를 실행하면 리소스 경쟁이 발생할 수 있으므로 이는 권장되지 않습니다.

SubDagOperator를 사용해야 하나요?

SubDagOperator를 사용하지 않는 것이 좋습니다.

SubDagOperator는 캡슐화를 제공할 수 있지만 SubDag 태스크에는 태스크 슬롯이 필요합니다. SubDag 태스크를 실행 중인 Airflow 작업자가 종료되면 SubDag 내의 모든 태스크가 실패하므로 신뢰할 수 없는 워크플로가 발생합니다.

Python 연산자를 완전히 구분하려면 PythonOperators에서만 Python 코드를 실행해야 하나요?

목표에 따라 몇 가지 옵션이 있습니다.

별도의 Python 종속 항목을 유지하는 것이 유일한 목적이라면 PythonVirtualenvOperator를 사용할 수 있습니다.

KubernetesPodOperator 사용을 고려하세요. 이 연산자를 사용하면 Kubernetes 포드를 정의하고 다른 클러스터에서 이 포드를 실행할 수 있습니다.

Google Cloud 외부에서 KubernetesPodOperator를 사용하려면 어떻게 해야 하나요?

GKE 클러스터로 인증하는 방법을 지정하는 구성 파일을 마운트하고 환경 버킷의 /data 폴더에 배치할 수 있습니다.

이 폴더는 Cloud Composer 환경 전체에 마운트됩니다.

커스텀 바이너리 또는 PyPI 이외 패키지를 추가하려면 어떻게 해야 하나요?

비공개 패키지 저장소에 호스팅된 패키지를 설치할 수 있습니다.

KubernetesPodOperator를 사용하여 커스텀 패키지로 빌드된 자체 이미지로 Kubernetes 포드를 실행할 수도 있습니다.

DAG 및 해당 태스크에 동일하게 인수를 전달하려면 어떻게 해야 하나요?

Airflow의 기본 제공 Jinja 템플릿을 사용하여 템플릿 필드에서 사용 가능한 인수를 전달할 수 있습니다.

템플릿 대체는 언제 발생하나요?

템플릿 대체는 연산자의 pre_execute 함수가 호출되기 바로 전에 Airflow 작업자에서 발생합니다. 실제로 이는 태스크가 실행되기 직전까지 템플릿이 대체되지 않는다는 것을 의미합니다.

템플릿 대체를 지원하는 연산자 인수를 어떻게 알 수 있나요?

Jinja2 템플릿 대체를 지원하는 연산자 인수는 명시적으로 표시됩니다.

연산자 정의에서 템플릿 대체를 수행할 인수 이름 목록이 포함된 template_fields 필드를 찾습니다.

예를 들어 bash_commandenv 인수의 템플릿을 지원하는 BashOperator를 확인하세요.

다음 단계