Airflow DAG 쓰기

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 가이드에서는 Cloud Composer 환경에서 실행되는 Apache Airflow 방향성 비순환 그래프(DAG)를 작성하는 방법을 보여줍니다.

Apache Airflow는 강력한 DAG 및 태스크 격리를 제공하지 않으므로 DAG 간섭이 방지되도록 프로덕션 환경과 테스트 환경을 별도로 사용하는 것이 좋습니다. 자세한 내용은 DAG 테스트를 참조하세요.

Airflow DAG 구성

Airflow DAG는 Python 파일에 정의되며 다음 요소로 구성되어 있습니다.

  • DAG 정의
  • Airflow 연산자
  • 연산자 관계

다음 코드 스니펫은 컨텍스트 외부에 있는 각 구성요소의 예시를 보여줍니다.

DAG 정의

다음 예시는 Airflow DAG 정의를 보여줍니다.

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

연산자 및 태스크

Airflow 연산자는 수행할 작업을 설명합니다. 태스크는 연산자의 특정 인스턴스입니다.

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

태스크 관계

태스크 관계는 작업을 완료해야 하는 순서를 설명합니다.

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Python의 전체 DAG 워크플로 예시

다음 워크플로는 hello_python 태스크 및 goodbye_bash 태스크의 2개 태스크로 구성된 완전히 작동하는 DAG 템플릿입니다.


import datetime

from airflow import models

from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator



default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = PythonOperator(task_id="hello", python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = BashOperator(task_id="bye", bash_command="echo Goodbye.")

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow DAG를 정의하는 방법에 대한 자세한 내용은 Airflow 튜토리얼Airflow 개념을 참조하세요.

Airflow 연산자

다음 예에서는 많이 사용되는 몇 가지 Airflow 연산자를 보여줍니다. Airflow 연산자에 대한 신뢰할 수 있는 참고 자료가 필요하면 연산자 및 후크 참조제공업체 색인을 참조하세요.

BashOperator

명령줄 프로그램을 실행하려면 BashOperator를 사용합니다.

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer는 Airflow 작업자의 Bash 스크립트에서 제공한 명령어를 실행합니다. 작업자는 Debian 기반 Docker 컨테이너이며 여러 패키지를 포함합니다.

PythonOperator

임의의 Python 코드를 실행하려면 PythonOperator를 사용합니다.

Cloud Composer는 컨테이너에서 해당 환경에 사용되는 Cloud Composer 이미지 버전의 패키지를 포함하는 Python 코드를 실행합니다.

추가 Python 패키지를 설치하려면 Python 종속 항목 설치를 참조하세요.

Google Cloud 연산자

Google Cloud 제품을 사용하는 태스크를 실행하려면 Google Cloud Airflow 연산자를 사용하세요. 예를 들어 BigQuery 연산자BigQuery의 데이터를 쿼리하고 처리합니다.

Google Cloud 및 Google Cloud에서 제공하는 개별 서비스에 대한 Airflow 연산자가 더 많이 있습니다. 전체 목록은 Google Cloud 연산자를 참조하세요.

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

EmailOperator

DAG에서 이메일을 보내려면 EmailOperator를 사용합니다. Cloud Composer 환경에서 이메일을 보내려면 SendGrid를 사용하도록 환경을 구성해야 합니다.

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

연산자 오류 알림

DAG의 연산자가 실패할 때 이메일 알림을 보내려면 email_on_failureTrue로 설정합니다. Cloud Composer 환경에서 이메일 알림을 보내려면 SendGrid를 사용하도록 환경을 구성해야 합니다.

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

DAG 워크플로 가이드라인

  • 중첩 디렉터리에 있는 DAG의 ZIP 보관 파일에 커스텀 Python 라이브러리를 배치합니다. DAG 디렉터리의 최상위 수준에 라이브러리를 배치하지 마세요.

    Airflow가 dags/ 폴더를 검색할 때는 DAG 폴더의 최상위 수준에 있는 Python 모듈과 최상위 dags/ 폴더에도 있는 ZIP 파일의 최상위 수준에 있는 Python 모듈에서만 DAG를 확인합니다. Airflow는 airflowDAG 하위 문자열을 모두 포함하지 않는 ZIP 파일에서 Python 모듈을 발견하면 ZIP 파일 처리를 중지합니다. Airflow는 해당 지점까지 발견된 DAG만 반환합니다.

  • 내결함성을 위해 동일한 Python 모듈에 여러 DAG 객체를 정의하지 않습니다.

  • SubDAG를 사용하지 마세요. 대신 DAG 내에서 태스크를 그룹화하세요.

  • DAG에서 시간을 파싱하는 데 필요한 파일을 data/ 폴더가 아닌 dags/ 폴더에 배치합니다.

  • DAG 단위 테스트를 구현합니다.

  • DAG 테스트 안내에서 권장한 대로 개발되거나 수정된 DAG 테스트

  • Composer 로컬 개발 CLI 도구는 Airflow 환경을 로컬에서 실행하여 Cloud Composer 2의 Apache Airflow DAG 개발을 간소화합니다. 이 로컬 Airflow 환경은 특정 Cloud Composer 2 버전의 이미지를 사용합니다.

  • 개발된 DAG에서 DAG 파싱 시간을 너무 많이 늘리지 않는지 확인합니다.

  • Airflow 태스크는 여러 가지 이유로 인해 실패할 수 있습니다. 전체 DAG 실행이 실패하지 않도록 태스크 재시도를 사용 설정하는 것이 좋습니다. 최대 재시도 횟수를 0으로 설정하면 재시도가 수행되지 않습니다.

    default_task_retries 옵션을 0 이외의 태스크 재시도 값으로 재정의하는 것이 좋습니다. 또한 태스크 수준에서 retries 매개변수를 설정할 수 있습니다.

  • Airflow 태스크에 GPU를 사용하려면 GPU가 있는 머신을 사용하는 노드를 기반으로 별도의 GKE 클러스터를 만듭니다. GKEStartPodOperator를 사용하여 태스크를 실행합니다.

  • 다른 Airflow 구성요소(스케줄러, 작업자, 웹 서버)가 실행 중인 클러스터의 노드 풀에서 CPU 및 메모리를 많이 사용하는 태스크를 실행하지 마세요. 대신 KubernetesPodOperator 또는 GKEStartPodOperator를 사용하세요.

  • DAG를 환경에 배포할 때 DAG를 해석하고 실행하는 데 반드시 필요한 파일만 /dags 폴더에 업로드합니다.

  • /dags 폴더의 DAG 파일 수를 제한합니다.

    Airflow는 /dags 폴더에서 DAG를 지속적으로 파싱합니다. 파싱은 DAG 폴더를 통해 반복되는 프로세스입니다. 종속 항목과 함께 로드해야 하는 파일 수는 DAG 파싱 및 태스크 예약 성능에 영향을 줍니다. 1개 DAG에 10,000개 파일을 사용하는 대신 각 100개 DAG에 100개 파일을 사용하는 것이 더 효율적이고, 이러한 방식으로 최적화하는 것이 좋습니다. 이 최적화는 파싱 시간과 DAG 작성 및 관리 효율 사이의 균형을 위한 것입니다.

    예를 들어 10,000개의 DAG 파일을 배포하려면 각각 100개의 DAG 파일이 포함된 100개의 ZIP 파일을 만들 수 있습니다.

    위의 힌트 외에도 DAG 파일이 10,000개 이상 있으면 프로그래매틱 방식으로 DAG를 생성하는 것이 좋은 방법일 수 있습니다. 예를 들어 일정 개수의 DAG 객체를 생성하는 단일 Python DAG 파일을 구현할 수 있습니다(예: 20, 100개 DAG 객체).

DAG 작성 FAQ

여러 DAG에서 동일하거나 유사한 태스크를 실행하려 할 경우 코드 반복을 최소화하는 방법은 무엇인가요?

코드 반복을 최소화하려면 래퍼와 라이브러리를 정의하는 것이 좋습니다.

DAG 파일 간에 코드를 재사용하려면 어떻게 해야 하나요?

유틸리티 함수를 로컬 Python 라이브러리에 배치하고 함수를 가져옵니다. 환경 버킷의 dags/ 폴더에 있는 모든 DAG에서 함수를 참조할 수 있습니다.

다른 정의가 발생할 위험을 최소화하려면 어떻게 해야 하나요?

예를 들어 원시 데이터를 수익 측정항목으로 집계하려는 두 팀이 있습니다. 두 팀은 목표는 동일하지만 약간 다른 2가지 작업을 작성합니다. DAG 구현자가 집계되는 수익 정의를 명확하게 설명하도록 수익 데이터와 함께 사용할 라이브러리를 정의하세요.

DAG 간의 종속 항목은 어떻게 설정하나요?

이는 종속 항목을 정의하고자 하는 방식에 따라 달라집니다.

DAG 두 개(DAG A 및 DAG B)가 있고 DAG B가 DAG A 다음에 트리거되도록 하려면 DAG A 끝 부분에 TriggerDagRunOperator를 배치하면 됩니다.

DAG B가 DAG A에서 생성하는 아티팩트(예: Pub/Sub 메시지)에만 의존할 경우 센서가 보다 효율적으로 작동할 수 있습니다.

DAG B가 DAG A와 긴밀하게 통합되면 두 DAG를 하나의 DAG로 병합할 수 있습니다.

고유한 실행 ID를 DAG 및 해당 작업에 전달하려면 어떻게 해야 하나요?

예를 들어 Dataproc 클러스터 이름과 파일 경로를 전달하려고 합니다.

PythonOperator에서 str(uuid.uuid4())을 반환하면 임의의 고유 ID를 생성할 수 있습니다. 그러면 ID가 XComs에 배치되므로 템플릿 필드를 통해 다른 연산자에서 ID를 참조할 수 있습니다.

uuid를 생성하기 전에 DagRun 관련 ID가 더 중요한지 고려합니다. 또한 매크로를 사용하여 Jinja 대체에서 이러한 ID를 참조할 수 있습니다.

DAG에서 태스크를 분리하려면 어떻게 해야 하나요?

각 태스크는 멱등한 작업 단위여야 합니다. 따라서 다단계 워크플로를 단일 태스크 내에 캡슐화해서는 안 됩니다(예: PythonOperator에서 복잡한 프로그램 실행).

여러 소스의 데이터를 집계하려면 단일 DAG에 여러 태스크를 정의해야 하나요?

예를 들어 원시 데이터가 있는 여러 테이블이 있고 각 테이블에 대한 일일 집계를 만들려고 합니다. 태스크는 서로 종속되지 않습니다. 테이블마다 태스크와 DAG를 하나씩 만들어야 하나요? 아니면 전체 DAG 하나를 만들어야 하나요?

각 태스크가 동일한 DAG 수준 속성(예: schedule_interval)을 공유해도 괜찮다면 단일 DAG에 여러 태스크를 정의하는 것이 적합합니다. 그렇지 않을 경우 코드 반복이 최소화되도록 단일 Python 모듈에서 여러 DAG를 모듈 globals()에 배치하여 생성할 수 있습니다

DAG에서 실행되는 동시 태스크 수를 제한하려면 어떻게 해야 하나요?

예를 들어 API 사용량 한도 또는 할당량을 초과하지 않거나 동시 프로세스를 너무 많이 실행하고 싶지 않습니다.

Airflow 웹 UI에서 Airflow 풀을 정의하고 태스크를 DAG의 기존 풀과 연결할 수 있습니다.

연산자 사용 FAQ

DockerOperator를 사용해야 하나요?

DockerOperator는 환경의 클러스터 내에서가 아닌 원격 Docker 설치에서 컨테이너를 실행하는 데 사용되는 경우가 아니면 사용하지 않는 것이 좋습니다. Cloud Composer 환경에서는 연산자가 Docker 데몬에 액세스할 수 없습니다.

대신 KubernetesPodOperator 또는 GKEStartPodOperator를 사용합니다. 이러한 연산자는 각각 Kubernetes 또는 GKE 클러스터에서 Kubernetes 포드를 시작합니다. 환경의 클러스터에서 포드를 실행하면 리소스 경쟁이 발생할 수 있으므로 권장되지 않습니다.

SubDagOperator를 사용해야 하나요?

SubDagOperator를 사용하지 않는 것이 좋습니다.

그룹화 태스크 안내의 제안대로 대안을 사용합니다.

Python 연산자를 완전히 구분하려면 PythonOperators에서만 Python 코드를 실행해야 하나요?

목표에 따라 몇 가지 옵션이 있습니다.

별도의 Python 종속 항목을 유지하는 것이 유일한 목적이면 PythonVirtualenvOperator를 사용하면 됩니다.

KubernetesPodOperator를 사용하는 것이 좋습니다. 이 연산자를 사용하면 Kubernetes 포드를 정의하고 다른 클러스터에서 포드를 실행할 수 있습니다.

커스텀 바이너리 또는 PyPI 이외 패키지를 추가하려면 어떻게 해야 하나요?

비공개 패키지 저장소에 호스팅된 패키지를 설치할 수 있습니다.

DAG 및 해당 태스크에 동일하게 인수를 전달하려면 어떻게 해야 하나요?

Airflow의 기본 제공 Jinja 템플릿을 사용하여 템플릿 필드에서 사용 가능한 인수를 전달할 수 있습니다.

템플릿 대체는 언제 발생하나요?

템플릿 대체는 연산자의 pre_execute 함수가 호출되기 바로 전에 Airflow 작업자에서 발생합니다. 실제로 이는 태스크가 실행되기 직전까지 템플릿이 대체되지 않는다는 것을 의미합니다.

템플릿 대체를 지원하는 연산자 인수를 어떻게 알 수 있나요?

Jinja2 템플릿 대체를 지원하는 연산자 인수는 명시적으로 표시됩니다.

연산자 정의에서 템플릿 대체를 수행할 인수 이름 목록이 포함된 template_fields 필드를 찾습니다.

예를 들어 bash_commandenv 인수의 템플릿을 지원하는 BashOperator를 확인하세요.

다음 단계