Airflow DAG を書き込む

このガイドでは、Cloud Composer 環境で実行する Apache Airflow 有向非巡回グラフ(DAG)の作成方法を説明します。

Airflow DAG の構造化

Airflow DAG は Python ファイルで定義されており、DAG 定義、オペレーター、オペレーターの関係といったコンポーネントで構成されています。次のコード スニペットは、コンテキスト外の各コンポーネントの例を示しています。

  1. DAG の定義です。

    Airflow 2

    import datetime
    
    from airflow import models
    
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        "start_date": datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        "composer_sample_simple_greeting",
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args,
    ) as dag:

    Airflow 1

    import datetime
    
    from airflow import models
    
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        "start_date": datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
        "composer_sample_simple_greeting",
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args,
    ) as dag:

  2. 行う処理を記述するオペレーター。オペレーターをインスタンス化したものは、タスクと呼ばれます。

    Airflow 2

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
    
        def greeting():
            import logging
    
            logging.info("Hello World!")
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id="hello", python_callable=greeting
        )
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id="bye", bash_command="echo Goodbye."
        )

    Airflow 1

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
    
        def greeting():
            import logging
    
            logging.info("Hello World!")
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id="hello", python_callable=greeting
        )
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id="bye", bash_command="echo Goodbye."
        )

  3. 処理を完了する順序を記述するタスクの関係

    Airflow 2

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

    Airflow 1

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Python での完全な DAG ワークフローの例

次のワークフローは、hello_python タスクと goodbye_bash タスクの 2 つのタスクで構成される完全な作業用 DAG テンプレートです。

Airflow 2


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow のチュートリアルとコンセプト

Airflow DAG の定義に関する詳細については、Airflow チュートリアルAirflow のコンセプトをご覧ください。

Airflow オペレータ

次の例は、いくつかの一般的な Airflow オペレーターを示しています。Airflow オペレーターの信頼できるリファレンスについては、Apache Airflow API リファレンスをご覧になるか、corecontribproviders のオペレーターのソースコードをご覧ください。

BashOperator

BashOperator を使用してコマンドライン プログラムを実行します。

Airflow 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Airflow 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer では、ワーカーの bash スクリプトで、指定されたコマンドが実行されます。 ワーカーは Debian ベースの Docker コンテナであり、いくつかのパッケージが含まれています。

PythonOperator

PythonOperator を使用して任意の Python コードを実行します。

環境で使用される Cloud Composer イメージのバージョンのパッケージを含むコンテナ内で、Cloud Composer によって Python コードが実行されます。

追加の Python パッケージをインストールするには、Python 依存関係のインストールをご覧ください。

Google Cloud のオペレーター

Google Cloud プロダクトを使用するタスクを実行するには、Google Cloud Airflow のオペレーターを使用します。Cloud Composer によって環境のプロジェクトへの Airflow 接続が自動的に構成されます。

EmailOperator

DAG からメールを送信するには、EmailOperator を使用します。Cloud Composer 環境からメールを送信するには、SendGrid を使用するように環境を構成する必要があります。

Airflow 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Airflow 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

通知

DAG 内のオペレーターが失敗したときにメール通知を送信するには、email_on_failureTrue に設定します。Cloud Composer 環境からメール通知を送信するには、SendGrid を使用するように環境を構成する必要があります。

Airflow 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Airflow 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

DAG ワークフロー ガイドライン

  1. Airflow 1 ではなく Airflow 2 を使用します。

    Airflow コミュニティは Airflow 1 の新しいマイナー リリースやパッチリリースを公開しなくなりました。

  2. ネストされたディレクトリの DAG の ZIP アーカイブにカスタム Python ライブラリを配置します。ライブラリは DAG ディレクトリの最上位には配置しないでください。

    Airflow が dags/ フォルダをスキャンする際、DAG フォルダの最上位にある Python モジュールの DAG、および最上位の dags/ フォルダに配置された ZIP アーカイブの最上位にある Python モジュールの DAG のみがチェックされます。部分文字列 airflowDAG の両方を含まない ZIP アーカイブ内の Python モジュールが存在する場合、Airflow は ZIP アーカイブの処理を停止します。Airflow はその時点までに見つかった DAG のみを返します。

  3. フォールト トラレンスのために、同じ Python モジュール内に複数の DAG オブジェクトを定義しないでください。

  4. SubDAG を使用しないでください。代わりに、代替方法を使用してください。

  5. DAG 解析に必要なファイルは、data/ ディレクトリではなく、dags/ フォルダに配置します。

  6. DAG の単体テストを実装します。

  7. DAG のテスト手順で推奨されているように、開発または変更された DAG をテストします。

  8. 開発された DAG によって DAG 解析時間が過剰に増加していないことを確認します。

  9. Airflow タスクは複数の理由で失敗することがあります。DAG 全体の実行の失敗を回避するために、タスクの再試行を有効にすることをおすすめします。最大再試行回数を 0 に設定すると、再試行は行われません。

    default_task_retries オプションをオーバーライドして、タスクの廃止に 0 以外の値を設定することをおすすめします。さらに、再試行パラメータをタスクレベルで設定することもできます(必要な場合)。

  10. Airflow タスクで GPU を使用する場合は、GPU を備えたマシンを使用するノードに基づいて個別の GKE クラスタを作成します。GKEStartPodOperator を使用してタスクを実行します。

  11. CPU とメモリを大量に消費するタスクを、他の Airflow コンポーネント(スケジューラ、ワーカー、ウェブサーバー)が実行されているクラスタのノードプールで実行しないでください。代わりに KubernetesPodOperator または GKEStartPodOperator を使用してください。

  12. DAG を環境にデプロイする場合は、DAG の解釈と実行に不可欠なファイルのみを /dags フォルダにアップロードします。

  13. /dags フォルダ内の DAG ファイルの数を制限します。

    Airflow は /dags フォルダ内の DAG を継続的に解析しています。解析は DAG フォルダをループ処理するプロセスで、(必要な依存関係とともに)読み込む必要があるファイルの数が、DAG 解析とタスク スケジューリングのパフォーマンスに影響を与えます。それぞれ 1 つの DAG を持つ 10,000 ファイルよりも 100 個の DAG を持つ 100 ファイルを使用する方がはるかに効率的であるため、そのような最適化をおすすめします。この最適化は、解析時間と DAG の作成と管理の効率のバランスを取るものです。

    たとえば、10,000 個の DAG ファイルをデプロイする場合は、100 個の ZIP ファイルを作成し、それぞれに 100 個の DAG ファイルを含めることができます。

    前述のヒントに加えて、10,000 個を超える DAG ファイルがある場合は、プログラムによる方法で DAG を生成するのが適切な選択肢である可能性があります。たとえば、いくつかの DAG オブジェクト(例: 20,100 個の DAG オブジェクト)を生成する単一の Python DAG ファイルを実装できます。

DAG の作成に関するよくある質問

複数の DAG で同じまたは類似したタスクを実行する場合、コードの繰り返しを最小限に抑えるにはどうすればよいですか?

コードの繰り返しを最小限に抑えるには、ライブラリを定義したりラッパーを定義したりすることをおすすめします。

DAG ファイル間でコードを再利用するにはどうすればよいですか?

ユーティリティ関数をローカルの Python ライブラリに入れて、その関数をインポートしてください。環境のバケット内の dags/ フォルダに置かれた任意の DAG の関数を参照できます。

異なる定義が作成されるリスクを最小限に抑えるにはどうすればよいですか?

たとえば、元データを集計して収益の指標を作成しようとしているチームが 2 つあるとします。これらのチームが同じ目的を達成するために、2 つのわずかに異なったタスクを作成する場合を考えてください。その場合、収益データの処理に対するライブラリを定義すると、DAG の実装者が、集計対象の収益の定義を明確にできます。

DAG 間の依存関係を設定するにはどうすればよいですか?

これは依存関係をどのように定義するかによって異なります。

DAG A と DAG B の 2 つの DAG があり、DAG A の後に DAG B をトリガーする場合は、DAG A の末尾に TriggerDagRunOperator を配置します。

DAG B が Pub/Sub メッセージなどの DAG A が生成するアーティファクトのみに依存している場合は、センサーを使用するほうが有効な場合があります。

DAG B が DAG A と密に統合されている場合は、2 つの DAG を 1 つの DAG にマージできる場合があります。

DAG とそのタスクに一意の実行 ID を渡すにはどうすればよいですか?

たとえば、Dataproc のクラスタ名とファイルパスを渡すとします。

ランダムな一意の ID は、PythonOperatorstr(uuid.uuid4()) を返すことにより生成できます。これにより ID が XComs に配置され、テンプレート フィールドを介して他のオペレーターで ID を参照できるようになります。

DagRun 固有の ID のほうが有用な場合もあるので、uuid を生成する前にその可能性を検討してください。マクロを使用して Jinja 置換でこれらの ID を参照することもできます。

DAG のタスクを分けるにはどうすればよいですか?

各タスクは、べき等の作業ユニットである必要があります。そのため、PythonOperator 内で実行される複雑なプログラムのように、単一タスク内でマルチステップ ワークフローをカプセル化するのを避ける必要があります。

複数のソースからのデータを集計する場合、単一の DAG に複数のタスクを定義するべきですか?

たとえば、元データを含む複数のテーブルがあり、各テーブルの日次集計を作成するとします。タスクは互いに依存していません。この場合、各テーブルにタスクと DAG を 1 つずつ作成するべきでしょうか、それとも、1つの大きな DAG を作成しますか。

各タスクで、複数の同じ DAG レベルのプロパティ(schedule_interval など)を共有しても差し支えない場合は、単一の DAG に複数のタスクを定義するのが有効な選択肢です。一方、コードの繰り返しを最小限に抑える必要がある場合は、モジュールの globals() に複数の DAG を配置することで、これらを単一の Python モジュールから生成できます。

DAG で実行中の同時タスクの数を制限するにはどうすればよいですか?

たとえば、API の使用量上限や割り当ての超過を回避する、または過剰な同時プロセスの実行を回避する必要がある場合が該当します。

Airflow ウェブ UI で Airflow Pools を定義し、DAG 内のタスクを既存のプールに関連付けることができます。

オペレーターの使用に関するよくある質問

DockerOperator を使用するべきですか?

リモートでの Docker のインストール(環境のクラスタ内ではありません)でコンテナの起動に使用されていない場合は、DockerOperator の使用はおすすめしません。Cloud Composer 環境では、オペレーターは Docker デーモンにアクセスできません。

代わりに、KubernetesPodOperator または GKEStartPodOperator を使用します。これらのオペレーターは、Kubernetes Pod をそれぞれ Kubernetes クラスタまたは GKE クラスタに起動します。環境のクラスタに Pod を起動することは、リソースが競合する可能性があるためおすすめしません。

SubDagOperator を使用するべきですか?

SubDagOperator の使用はおすすめしません。

タスクのグループ化の手順に記載されている代替手法を使用します

Python オペレーターを完全に分離するには、PythonOperators 内でのみ Python コードを実行するべきですか?

目的に応じていくつかの選択肢があります。

Python 依存関係の分離を維持することが唯一の目的である場合は、PythonVirtualenvOperator を使用できます。

KubernetesPodOperator の使用を検討してください。この演算子を使用すると、Kubernetes Pod を定義して、他のクラスタで Pod を実行できます。

カスタムのバイナリ パッケージや非 PyPI パッケージを追加するにはどうすればよいですか?

プライベート パッケージ リポジトリでホストされているパッケージをインストールできます。

また、KubernetesPodOperator を使用して、カスタム パッケージで構築された独自のイメージで Kubernetes Pod を実行することもできます。

DAG とそのタスクに対して一律に引数を渡すにはどうすればよいですか?

Jinja テンプレートに対する Airflow の組み込みサポートを使用して、テンプレート フィールドで使用できる引数を渡すことができます。

テンプレート置換はいつ行われますか?

テンプレート置換は、オペレーターの pre_execute 関数が呼び出される直前の Airflow ワーカーで行われます。これは、実際にはタスクが実行される直前までテンプレート置換が行われないことを意味します。

どのオペレーター引数がテンプレート置換をサポートするのかを知るにはどうすればよいですか?

Jinja2 のテンプレート置換をサポートするオペレーターの引数は、そのように明示的にマークされます。

オペレーターの定義内の template_fields フィールドを確認してください。このフィールドにテンプレート置換の対象となる引数名のリストが示されています。

たとえば、BashOperator では、引数 bash_command および env のテンプレートがサポートされています。

サポートが終了した Airflow オペレーターの使用を回避する

次の表に示す演算子は非推奨です。DAG では使用しないでください。代わりに、提供されている最新の方法を使用してください。

非推奨の演算子 使用する演算子
BigQueryExecuteQueryOperator BigQueryInsertJobOperator
BigQueryPatchDatasetOperator BigQueryUpdateTableOperator
DataflowCreateJavaJobOperator BeamRunJavaPipelineOperator
DataflowCreatePythonJobOperator BeamRunPythonPipelineOperator
DataprocScaleClusterOperator DataprocUpdateClusterOperator
DataprocSubmitPigJobOperator DataprocSubmitJobOperator
DataprocSubmitSparkSqlJobOperator DataprocSubmitJobOperator
DataprocSubmitSparkJobOperator DataprocSubmitJobOperator
DataprocSubmitHadoopJobOperator DataprocSubmitJobOperator
DataprocSubmitPySparkJobOperator DataprocSubmitJobOperator
MLEngineManageModelOperator MLEngineCreateModelOperator、MLEngineGetModelOperator
MLEngineManageVersionOperator MLEngineCreateVersion、MLEngineSetDefaultVersion、MLEngineListVersions、MLEngineDeleteVersion
GCSObjectsWtihPrefixExistenceSensor GCSObjectsWithPrefixExistenceSensor

次のステップ