Airflow DAG を書き込む

Cloud Composer 1 | Cloud Composer 2

このガイドでは、Cloud Composer 環境で実行する Apache Airflow 有向非巡回グラフ(DAG)の作成方法を説明します。

Apache Airflow では強固な DAG とタスクの分離が実現されないため、DAG の干渉を回避するために分離された状態で本番環境とテスト環境を使用することをおすすめします。詳細については、DAG のテストをご覧ください。

Airflow DAG の構造化

Airflow DAG は Python ファイルで定義されており、次のコンポーネントで構成されています。

  • DAG 定義
  • Airflow オペレーター
  • オペレーターの関係

次のコード スニペットは、コンテキスト外の各コンポーネントの例を示しています。

DAG の定義

次の例は、DAG の定義を示しています。

Airflow 2

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

Airflow 1

import datetime

from airflow import models

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:

オペレーターとタスク

オペレーターは行う処理を記述します。タスクタスクは、オペレーターの特定のインスタンスです。

Airflow 2

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

Airflow 1

from airflow.operators import bash_operator
from airflow.operators import python_operator

    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

タスクの関係

タスクの関係には、処理を完了する順序を記述します。

Airflow 2

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Airflow 1

# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash

Python による完全な DAG ワークフローの例

次のワークフローは、hello_python タスクと goodbye_bash タスクという 2 つのタスクで構成される、完全な作業用 DAG テンプレートです。

Airflow 2


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1


import datetime

from airflow import models

from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    "start_date": datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    "composer_sample_simple_greeting",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    def greeting():
        import logging

        logging.info("Hello World!")

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id="hello", python_callable=greeting
    )

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id="bye", bash_command="echo Goodbye."
    )

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow DAG の定義の詳細については、Airflow のチュートリアルAirflow のコンセプトをご覧ください。

Airflow オペレーター

次の例は、いくつかの一般的な Airflow オペレーターを示しています。Airflow オペレーターの信頼できるリファレンスについては、オペレーターとフックのリファレンスプロバイダ インデックスをご覧ください。

BashOperator

BashOperator を使用してコマンドライン プログラムを実行します。

Airflow 2

from airflow.operators import bash

    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Airflow 1

from airflow.operators import bash_operator

    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id="make_bq_dataset",
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}",
    )

Cloud Composer では、Airflow ワーカーの bash スクリプトで、指定されたコマンドが実行されます。 ワーカーは Debian ベースの Docker コンテナであり、いくつかのパッケージが含まれています。

PythonOperator

PythonOperator を使用して任意の Python コードを実行します。

環境で使用される Cloud Composer イメージのバージョンのパッケージを含むコンテナ内で、Cloud Composer によって Python コードが実行されます。

追加の Python パッケージをインストールするには、Python 依存関係のインストールをご覧ください。

Google Cloud のオペレーター

Google Cloud プロダクトを使用するタスクを実行するには、Google Cloud Airflow オペレーターを使用します。たとえば、BigQuery のオペレーターは、BigQuery でデータのクエリ、処理を行います。

Google Cloud には、他にも多くの Airflow オペレーターと Google Cloud が提供する個々のサービスがあります。完全なリストについては、Google Cloud のオペレーターをご覧ください。

Airflow 2

from airflow.providers.google.cloud.operators import bigquery
from airflow.providers.google.cloud.transfers import bigquery_to_gcs

    bq_recent_questions_query = bigquery.BigQueryInsertJobOperator(
        task_id="bq_recent_questions_query",
        configuration={
            "query": {
                "query": RECENT_QUESTIONS_QUERY,
                "useLegacySql": False,
                "destinationTable": {
                    "projectId": project_id,
                    "datasetId": bq_dataset_name,
                    "tableId": bq_recent_questions_table_id,
                },
            }
        },
        location=location,
    )

Airflow 1

from airflow.contrib.operators import bigquery_operator

    # Query recent StackOverflow questions.
    bq_recent_questions_query = bigquery_operator.BigQueryOperator(
        task_id="bq_recent_questions_query",
        sql="""
        SELECT owner_display_name, title, view_count
        FROM `bigquery-public-data.stackoverflow.posts_questions`
        WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
            AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
        ORDER BY view_count DESC
        LIMIT 100
        """.format(
            max_date=max_query_date, min_date=min_query_date
        ),
        use_legacy_sql=False,
        destination_dataset_table=bq_recent_questions_table_id,
    )

EmailOperator

DAG からメールを送信するには、EmailOperator を使用します。Cloud Composer 環境からメールを送信するには、SendGrid を使用するように環境を構成する必要があります。

Airflow 2

from airflow.operators import email

    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

Airflow 1

from airflow.operators import email_operator

    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id="email_summary",
        to="{{var.value.email}}",
        subject="Sample BigQuery notify data ready",
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][0] }}"
            ),
            view_count=(
                "{{ ti.xcom_pull(task_ids='bq_read_most_popular', "
                "key='return_value')[0][1] }}"
            ),
            export_location=output_file,
        ),
    )

オペレーターの失敗時の通知

DAG 内のオペレーターが失敗したときにメール通知を送信するには、email_on_failureTrue に設定します。Cloud Composer 環境からメール通知を送信するには、SendGrid を使用するように環境を構成する必要があります。

Airflow 2

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": project_id,
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

Airflow 1

from airflow import models

default_dag_args = {
    "start_date": yesterday,
    # Email whenever an Operator in the DAG fails.
    "email": "{{var.value.email}}",
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "project_id": "{{var.value.gcp_project}}",
}

with models.DAG(
    "composer_sample_bq_notify",
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args,
) as dag:

DAG ワークフローのガイドライン

  1. ネストされたディレクトリの DAG の ZIP アーカイブにカスタム Python ライブラリを配置します。ライブラリは DAG ディレクトリの最上位には配置しないでください。

    Airflow が dags/ フォルダをスキャンするとき、DAG フォルダの最上位にある Python モジュールの DAG、および最上位の dags/ フォルダに配置された ZIP アーカイブの最上位にある Python モジュールの DAG のみがチェックされます。部分文字列 airflowDAG の両方を含まない ZIP アーカイブ内の Python モジュールが存在する場合、Airflow は ZIP アーカイブの処理を停止します。Airflow はその時点までに見つかった DAG のみを返します。

  2. Airflow 1 ではなく Airflow 2 を使用します。

    Airflow コミュニティでは、Airflow 1 の新しいマイナー リリースやパッチリリースが公開されなくなりました。

  3. フォールト トラレンスのために、同じ Python モジュール内に複数の DAG オブジェクトを定義しないでください。

  4. SubDAG を使用しないでください。代わりに、DAG 内のタスクをグループ化します。

  5. DAG 解析に必要なファイルは、data/ ディレクトリではなく、dags/ フォルダに配置します。

  6. DAG の単体テストを実装します。

  7. DAG のテスト手順で推奨されているように、開発または変更された DAG をテストします。

  8. 開発された DAG によって DAG 解析時間が過剰に増加していないことを確認します。

  9. Airflow タスクは、いくつかの理由で失敗することがあります。DAG の実行全体のエラーを回避するには、タスクの再試行を有効にすることをおすすめします。 最大再試行回数を 0 に設定すると、再試行は行われません。

    default_task_retries オプションをオーバーライドし、タスク再試行の値を 0 以外に設定することをおすすめします。さらに、タスクレベルで retries パラメータを設定できます。

  10. Airflow タスクで GPU を使用する場合は、GPU を備えたマシンを使用して、ノードに基づいて個別の GKE クラスタを作成します。タスクを実行するには GKEStartPodOperator を使用します。

  11. CPU とメモリを大量に消費するタスクを、他の Airflow コンポーネント(スケジューラ、ワーカー、ウェブサーバー)が実行されているクラスタのノードプールで実行しないでください。 代わりに、KubernetesPodOperator または GKEStartPodOperator を使用してください。

  12. DAG を環境にデプロイする場合は、DAG の解釈と実行に不可欠なファイルのみを /dags フォルダにアップロードします。

  13. /dags フォルダ内の DAG ファイルの数を制限します。

    Airflow は、/dags フォルダ内の DAG を継続的に解析します。解析は DAG フォルダをループ処理し、読み込む必要があるファイルの数(依存関係とともに)が、DAG の解析とタスクのスケジューリングのパフォーマンスに影響します。DAG が 100 個あるファイルを 100 個使用するほうが、1 個の DAG を 10,000 個使用するよりはるかに効率的であるため、このような最適化をおすすめします。この最適化は、解析時間と DAG の作成と管理の効率のバランスを取るものです。

    また、たとえば、10,000 個の DAG ファイルをデプロイするには、それぞれに 100 個の DAG ファイルを含む 100 個の ZIP ファイルを作成することができます。

    前述のヒントに加えて、10,000 個を超える DAG ファイルがある場合は、プログラムによる方法で DAG を生成するのが適切な選択肢である可能性があります。たとえば、いくつかの DAG オブジェクト(例: 20,100 個の DAG オブジェクト)を生成する単一の Python DAG ファイルを実装できます。

サポートが終了した Airflow オペレーターの使用を回避する

次の表に示すオペレーターはサポートが終了しました。DAG では使用しないでください。代わりに、提供されている最新の代替手法を使用してください。

非推奨の演算子 使用する演算子
BigQueryExecuteQueryOperator BigQueryInsertJobOperator
BigQueryPatchDatasetOperator BigQueryUpdateTableOperator
DataflowCreateJavaJobOperator BeamRunJavaPipelineOperator
DataflowCreatePythonJobOperator BeamRunPythonPipelineOperator
DataprocScaleClusterOperator DataprocUpdateClusterOperator
DataprocSubmitPigJobOperator DataprocSubmitJobOperator
DataprocSubmitSparkSqlJobOperator DataprocSubmitJobOperator
DataprocSubmitSparkJobOperator DataprocSubmitJobOperator
DataprocSubmitHadoopJobOperator DataprocSubmitJobOperator
DataprocSubmitPySparkJobOperator DataprocSubmitJobOperator
MLEngineManageModelOperator MLEngineCreateModelOperator、MLEngineGetModelOperator
MLEngineManageVersionOperator MLEngineCreateVersion、MLEngineSetDefaultVersion、MLEngineListVersions、MLEngineDeleteVersion
GCSObjectsWtihPrefixExistenceSensor GCSObjectsWithPrefixExistenceSensor

DAG の作成に関するよくある質問

複数の DAG で同じまたは類似したタスクを実行する場合、コードの繰り返しを最小限に抑えるにはどうすればよいですか?

コードの繰り返しを最小限に抑えるには、ライブラリを定義したりラッパーを定義したりすることをおすすめします。

DAG ファイル間でコードを再利用するにはどうすればよいですか?

ユーティリティ関数をローカルの Python ライブラリに入れて、その関数をインポートしてください。環境のバケット内の dags/ フォルダに置かれた任意の DAG の関数を参照できます。

異なる定義が作成されるリスクを最小限に抑えるにはどうすればよいですか?

たとえば、元データを集計して収益の指標を作成しようとしているチームが 2 つあるとします。これらのチームが同じ目的を達成するために、2 つのわずかに異なったタスクを作成する場合を考えてください。 その場合、収益データの処理に対するライブラリを定義すると、DAG の実装者が、集計対象の収益の定義を明確にできます。

DAG 間の依存関係を設定するにはどうすればよいですか?

これは依存関係をどのように定義するかによって異なります。

DAG A と DAG B の 2 つの DAG があり、DAG A の後に DAG B をトリガーする場合は、DAG A の末尾に TriggerDagRunOperator を配置します。

DAG B が Pub/Sub メッセージなどの DAG A が生成するアーティファクトのみに依存している場合は、センサーを使用するほうが有効な場合があります。

DAG B が DAG A と密に統合されている場合は、2 つの DAG を 1 つの DAG にマージできる場合があります。

DAG とそのタスクに一意の実行 ID を渡すにはどうすればよいですか?

たとえば、Dataproc のクラスタ名とファイルパスを渡すとします。

ランダムな一意の ID は、PythonOperatorstr(uuid.uuid4()) を返すことにより生成できます。これにより ID が XComs に配置され、テンプレート フィールドを介して他のオペレーターで ID を参照できるようになります。

DagRun 固有の ID のほうが有用な場合もあるので、uuid を生成する前にその可能性を検討してください。マクロを使用して、Jinja 置換でこれらの ID を参照することもできます。

DAG のタスクを分けるにはどうすればよいですか?

各タスクは、べき等の作業ユニットである必要があります。そのため、PythonOperator 内で実行される複雑なプログラムのように、単一タスク内でマルチステップ ワークフローをカプセル化するのを避ける必要があります。

複数のソースからのデータを集計する場合、単一の DAG に複数のタスクを定義するべきですか?

たとえば、元データを含む複数のテーブルがあり、各テーブルの日次集計を作成するとします。タスクは互いに依存していません。この場合、各テーブルにタスクと DAG を 1 つずつ作成するべきでしょうか、それとも、1つの大きな DAG を作成しますか。

各タスクで、複数の同じ DAG レベルのプロパティ(schedule_interval など)を共有しても差し支えない場合は、単一の DAG に複数のタスクを定義するのが有効な選択肢です。一方、コードの繰り返しを最小限に抑える必要がある場合は、モジュールの globals() に複数の DAG を配置することで、これらを単一の Python モジュールから生成できます。

DAG で実行中の同時タスクの数を制限するにはどうすればよいですか?

たとえば、API の使用量上限や割り当ての超過を回避する、または過剰な同時プロセスの実行を回避する必要がある場合が該当します。

Airflow ウェブ UI で Airflow Pools を定義し、DAG 内のタスクを既存のプールに関連付けることができます。

オペレーターの使用に関するよくある質問

DockerOperator を使用するべきですか?

リモートでの Docker のインストール(環境のクラスタ内ではありません)でコンテナの起動に使用されていない場合は、DockerOperator の使用はおすすめしません。Cloud Composer 環境では、オペレーターは Docker デーモンにアクセスできません。

代わりに、KubernetesPodOperator または GKEStartPodOperator を使用します。これらのオペレーターは、Kubernetes Pod をそれぞれ Kubernetes クラスタまたは GKE クラスタに起動します。環境のクラスタに Pod を起動することは、リソースが競合する可能性があるためおすすめしません。

SubDagOperator を使用するべきですか?

SubDagOperator の使用はおすすめしません。

タスクのグループ化の手順に記載されている代替手法を使用します

Python オペレーターを完全に分離するには、PythonOperators 内でのみ Python コードを実行するべきですか?

目的に応じていくつかの選択肢があります。

Python 依存関係の分離を維持することが唯一の目的である場合は、PythonVirtualenvOperator を使用できます。

KubernetesPodOperator の使用を検討してください。このオペレーターを使用すると、Kubernetes Pod を定義して、他のクラスタで Pod を実行できます。

カスタムのバイナリ パッケージや非 PyPI パッケージを追加するにはどうすればよいですか?

プライベート パッケージ リポジトリでホストされているパッケージをインストールできます。

また、KubernetesPodOperator を使用して、カスタム パッケージで構築された独自のイメージで Kubernetes Pod を実行することもできます。

DAG とそのタスクに対して一律に引数を渡すにはどうすればよいですか?

Jinja テンプレートに対する Airflow の組み込みサポートを使用して、テンプレート フィールドで使用できる引数を渡すことができます。

テンプレート置換はいつ行われますか?

テンプレート置換は、オペレーターの pre_execute 関数が呼び出される直前の Airflow ワーカーで行われます。これは、実際にはタスクが実行される直前までテンプレート置換が行われないことを意味します。

どの演算子引数がテンプレート置換をサポートするのかを知るにはどうすればよいですか?

Jinja2 のテンプレート置換をサポートするオペレーターの引数は、そのように明示的にマークされます。

オペレーターの定義内の template_fields フィールドを確認してください。このフィールドにテンプレート置換の対象となる引数名のリストが示されています。

たとえば、BashOperator では、引数 bash_command および env のテンプレートがサポートされています。

次のステップ