import datetime
from airflow import models
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime.datetime(2018, 1, 1),
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
from airflow.operators import bash_operator
from airflow.operators import python_operator
def greeting():
import logging
logging.info('Hello World!')
# An instance of an operator is called a task. In this case, the
# hello_python task calls the "greeting" Python function.
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=greeting)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='echo Goodbye.')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash
다음 워크플로는 완전히 작동하는 예시이며 hello_python 태스크와 goodbye_bash 태스크라는 태스크 두 개로 구성됩니다.
from __future__ import print_function
import datetime
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': datetime.datetime(2018, 1, 1),
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
def greeting():
import logging
logging.info('Hello World!')
# An instance of an operator is called a task. In this case, the
# hello_python task calls the "greeting" Python function.
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=greeting)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='echo Goodbye.')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash
from airflow.operators import email_operator
# Send email confirmation
email_summary = email_operator.EmailOperator(
task_id='email_summary',
to=models.Variable.get('email'),
subject='Sample BigQuery notify data ready',
html_content="""
Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
12AM. The most popular question was '{question_title}' with
{view_count} views. Top 100 questions asked are now available at:
{export_location}.
""".format(
min_date=min_query_date,
max_date=max_query_date,
question_title=(
'{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
'key=\'return_value\')[0][0] }}'
),
view_count=(
'{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
'key=\'return_value\')[0][1] }}'
),
export_location=output_file))
알림
DAG의 연산자가 실패할 때 이메일 알림을 보내려면 email_on_failure를 True로 설정합니다. Cloud Composer 환경에서 이메일 알림을 보내려면 SendGrid를 사용하도록 환경을 구성해야 합니다.
from airflow import models
default_dag_args = {
'start_date': yesterday,
# Email whenever an Operator in the DAG fails.
'email': models.Variable.get('email'),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
with models.DAG(
'composer_sample_bq_notify',
schedule_interval=datetime.timedelta(weeks=4),
default_args=default_dag_args) as dag:
가이드라인
중첩 디렉토리에 있는 DAG의 ZIP 보관 파일에 커스텀 Python 라이브러리를 배치합니다. DAG 디렉터리의 최상위 수준에 라이브러리를 배치하지 마세요.
Airflow가 dags/ 폴더를 검색할 때는 DAG 폴더의 최상위 수준에 있는 Python 모듈과 최상위 dags/ 폴더에도 있는 ZIP 보관 파일의 최상위 수준에 있는 Python 모듈에서만 DAG를 확인합니다. Airflow는 airflow 및 DAG 하위 문자열을 모두 포함하지 않는 ZIP 보관 파일에서 Python 모듈을 발견하면 ZIP 보관 파일 처리를 중지합니다. Airflow는 해당 지점까지 발견된 DAG만 반환합니다.
내결함성을 위해 동일한 Python 모듈에 여러 DAG 객체를 정의하지 않습니다.
subDAG를 최상위 객체로 정의하지 마세요.
일반적으로 Airflow는 dags/ 디렉터리에서 모듈의 전역 네임스페이스에 있는 DAG 객체를 최상위 DAG로 선택합니다. 최상위 객체로 정의된 모든 하위 DAG는 하위 DAG를 포함하는 다른 DAG의 일정은 물론 자체 일정에도 실행됩니다.
DAG에서 시간을 파싱하는 데 필요한 파일을 data/ 디렉터리가 아닌 dags/ 디렉터리에 배치합니다. data/ 디렉터리는 웹 서버에 마운트되지 않습니다.
FAQ
DAG 만들기
여러 DAG에서 동일하거나 유사한 작업을 실행하려고 하는데 코드 반복을 최소화하려면 어떻게 해야 하나요?
예를 들어 원시 데이터가 있는 여러 테이블이 있고 각 테이블에 대한 일일 집계를 만들려고 합니다. 태스크는 서로 종속되지 않습니다. 테이블마다 태스크와 DAG를 하나씩 만들어야 하나요? 아니면 전체 DAG 하나를 만들어야 하나요?
각 태스크가 동일한 DAG 수준 속성(예: schedule_interval)을 공유해도 괜찮다면 단일 DAG에 여러 태스크를 정의하는 것이 적합합니다. 그렇지 않을 경우 코드 반복이 최소화되도록 단일 Python 모듈에서 여러 DAG를 모듈 전역에 배치하여 생성할 수 있습니다
DAG에서 실행되는 동시 태스크 수를 제한하려면 어떻게 해야 하나요?
예를 들어 API 사용량 한도 또는 할당량을 초과하지 않거나 동시 프로세스를 너무 많이 실행하고 싶지 않습니다.
Airflow 웹 UI에서 Airflow 풀을 정의하고 태스크를 DAG의 기존 풀과 연결할 수 있습니다.
DockerOperator는 원격 Docker 설치에서 컨테이너를 실행하는 데 사용하는 경우 외에는 환경의 GKE 클러스터 어디서도 사용되지 않는 것이 좋습니다. Composer는 각 Airflow 작업자 내에 있는 각 GKE 노드의 Docker 데몬을 마운트하지 않으므로 사용자가 수동으로 설치하지 않으면 연산자가 Docker 데몬에 액세스할 수 없습니다(또한 Pod가 다시 시작될 때 지속되지 않음).
해결 방법으로 KubernetesPodOperator 또는 GKEPodOperator를 대신 사용하여 Kubernetes 클러스터 또는 GKE 클러스터에서 Kubernetes Pod를 각각 실행하는 것이 좋습니다. 환경의 GKE 클러스터에서 Pod를 실행하면 리소스 경쟁이 발생할 수 있으므로 이는 권장되지 않습니다.
SubDagOperator를 사용하지 않는 것이 좋습니다.
SubDagOperator는 캡슐화를 제공할 수 있지만 SubDag 태스크에는 태스크 슬롯이 필요합니다. SubDag 태스크를 실행 중인 작업자가 종료되면 SubDag 내의 모든 태스크가 실패하므로 신뢰할 수 없는 워크플로가 발생합니다.
Python 연산자를 완전히 구분하려면 DockerOperators에서만 Python 코드를 실행해야 하나요?