DAG(ワークフロー)の作成

このガイドでは、Cloud Composer 環境で実行する Apache Airflow 有向非巡回グラフ(DAG)の作成方法を説明します。

DAG を構造化する

Airflow DAG は Python ファイルで定義されており、DAG 定義、オペレーター、オペレーターの関係といったコンポーネントで構成されています。次のコード スニペットは、コンテキスト外の各コンポーネントの例を示しています。

  1. DAG の定義です。

    Airflow 2

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

    Airflow 1

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

  2. 行う処理を記述するオペレーター。オペレーターをインスタンス化したものは、タスクと呼ばれます。

    Airflow 2

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

    Airflow 1

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

  3. 処理を完了する順序を記述するタスクの関係

    Airflow 2

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

    Airflow 1

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

次のワークフローは、完全な処理の例です。hello_python タスクと goodbye_bash タスクという 2 つのタスクで構成されています。

Airflow 2

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow DAG の定義に関する詳細については、Airflow チュートリアルAirflow のコンセプトをご覧ください。

オペレーター

次の例は、いくつかの一般的な Airflow オペレーターを示しています。Airflow オペレーターの信頼できるリファレンスについては、Apache Airflow API リファレンスをご覧になるか、corecontribproviders のオペレーターのソースコードをご覧ください。

BashOperator

BashOperator を使用してコマンドライン プログラムを実行します。

Airflow 2

from airflow.operators import bash
    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f'bq ls {bq_dataset_name} || bq mk {bq_dataset_name}')

Airflow 1

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer では、ワーカーの bash スクリプトで、指定されたコマンドが実行されます。 ワーカーは Debian ベースの Docker コンテナであり、いくつかのパッケージが含まれています。

PythonOperator

PythonOperator を使用して任意の Python コードを実行します。

環境で使用される Cloud Composer イメージのバージョンのパッケージを含むコンテナ内で、Cloud Composer によって Python コードが実行されます。

追加の Python パッケージをインストールするには、Python 依存関係のインストールをご覧ください。

Google Cloud のオペレーター

Google Cloud プロダクトを使用するタスクを実行するには、Google Cloud Airflow のオペレーターを使用します。Cloud Composer によって環境のプロジェクトへの Airflow 接続が自動的に構成されます。

EmailOperator

DAG からメールを送信するには、EmailOperator を使用します。Cloud Composer 環境からメールを送信するには、SendGrid を使用するように環境を構成する必要があります。

Airflow 2

from airflow.operators import email
    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Airflow 1

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

通知

DAG 内のオペレーターが失敗したときにメール通知を送信するには、email_on_failureTrue に設定します。Cloud Composer 環境からメール通知を送信するには、SendGrid を使用するように環境を構成する必要があります。

Airflow 2

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': project_id
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

Airflow 1

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

ガイドライン

  1. ネストされたディレクトリの DAG の ZIP アーカイブにカスタム Python ライブラリを配置します。ライブラリは DAG ディレクトリの最上位には配置しないでください。

    Airflow が dags/ フォルダをスキャンする際、DAG フォルダの最上位にある Python モジュールの DAG、および最上位の dags/ フォルダに配置された ZIP アーカイブの最上位にある Python モジュールの DAG のみがチェックされます。部分文字列 airflowDAG の両方を含まない ZIP アーカイブ内の Python モジュールが存在する場合、Airflow は ZIP アーカイブの処理を停止します。Airflow はその時点までに見つかった DAG のみを返します。

  2. フォールト トラレンスのために、同じ Python モジュール内に複数の DAG オブジェクトを定義しないでください。

  3. subDAGs を最上位オブジェクトとして定義しないでください。

    一般に、最上位 DAG としては、dags/ ディレクトリ内のモジュールのグローバル名前空間にある DAG オブジェクトが選択されます。最上位オブジェクトとして定義されているすべての subDag は、subDag を埋め込む他の DAG のスケジュールに加え、独自のスケジュールで実行されます。

  4. DAG 解析に必要なファイルは、data/ ディレクトリではなく、dags/ ディレクトリに配置します。data/ ディレクトリはウェブサーバーでマウントされません。

DAG の作成に関するよくある質問

複数の DAG で同じまたは類似したタスクを実行する場合、コードの繰り返しを最小限に抑えるにはどうすればよいですか?

コードの繰り返しを最小限に抑えるには、ライブラリを定義することと、ラッパーを定義することをおすすめします。

DAG ファイル間でコードを再利用するにはどうすればよいですか?

ユーティリティ関数をローカルの Python ライブラリに入れて、その関数をインポートしてください。環境のバケット内の dags/ フォルダに置かれた任意の DAG の関数を参照できます。

異なる定義が作成されるリスクを最小限に抑えるにはどうすればよいですか?

たとえば、元データを集計して収益の指標を作成しようとしているチームが 2 つあるとします。これらのチームが同じ目的を達成するために、2 つのわずかに異なったタスクを作成する場合を考えてください。その場合、収益データの処理に対するライブラリを定義すると、DAG の実装者が、集計対象の収益の定義を明確にできます。

DAG 間の依存関係を設定するにはどうすればよいですか?

これは依存関係をどのように定義するかによって異なります。

DAG A と DAG B の 2 つの DAG があり、DAG A の後に DAG B をトリガーする場合は、DAG A の末尾に TriggerDagRunOperator を配置します。

DAG B が Pub/Sub メッセージなどの DAG A が生成するアーティファクトのみに依存している場合は、センサーを使用するほうが有効な場合があります。

DAG B が DAG A と密に統合されている場合は、2 つの DAG を 1 つの DAG にマージできる場合があります。

DAG とそのタスクに一意の実行 ID を渡すにはどうすればよいですか?

たとえば、Dataproc のクラスタ名とファイルパスを渡すとします。

ランダムな一意の ID は、PythonOperatorstr(uuid.uuid4()) を返すことにより生成できます。これにより ID が XComs に配置され、テンプレート フィールドを介して他のオペレーターで ID を参照できるようになります。

uuid を生成する前に、DagRun 固有の ID のほうが有用かどうかを検討してください。マクロを使用して Jinja 置換でこれらの ID を参照することもできます。

DAG 内のタスクを分離するにはどうすればよいですか?

各タスクは、べき等の作業ユニットである必要があります。そのため、PythonOperator 内で実行される複雑なプログラムのように、単一タスク内でマルチステップ ワークフローをカプセル化するのを避ける必要があります。

SubDagOperator は、ワークフロー定義コードを再利用するための Airflow ネイティブのメカニズムです。ただし、Cloud Composer でこのオペレーターを使用する場合については、注意点があります。

複数のソースからのデータを集計する場合、単一の DAG に複数のタスクを定義するべきですか?

たとえば、元データを含む複数のテーブルがあり、各テーブルの日次集計を作成するとします。タスクは互いに依存していません。この場合、各テーブルにタスクと DAG を 1 つずつ作成するべきでしょうか、それとも、1つの大きな DAG を作成しますか。

各タスクで、複数の同じ DAG レベルのプロパティ(schedule_interval など)を共有しても差し支えない場合は、単一の DAG に複数のタスクを定義するのが有効な選択肢です。一方、コードの繰り返しを最小限に抑える必要がある場合は、モジュールの globals() に複数の DAG を配置することで、これらを単一の Python モジュールから生成できます。

DAG で実行中の同時タスクの数を制限するにはどうすればよいですか?

たとえば、API の使用量上限や割り当ての超過を回避する、または過剰な同時プロセスの実行を回避する必要がある場合が該当します。

Airflow ウェブ UI で Airflow Pools を定義し、DAG 内のタスクを既存のプールに関連付けることができます。

オペレーターの使用に関するよくある質問

DockerOperator を使用するべきですか?

リモート Docker インストール(環境のクラスタ内ではない)でコンテナの起動に使用されていない場合、DockerOperator の使用はおすすめしません。Cloud Composer 環境では、オペレータは Docker デーモンにアクセスできません。

代わりに、KubernetesPodOperator または GKEPodOperator を使用します。これらのオペレーターは、Kubernetes Pod をそれぞれ Kubernetes クラスタまたは GKE クラスタに起動できます。環境の GKE クラスタに Pod を起動することは、リソースが競合する可能性があるためおすすめしません。

SubDagOperator を使用するべきですか?

SubDagOperator の使用はおすすめしません。

SubDagOperator によってカプセル化は実現できますが、SubDag タスクでタスクスロットが必要になります。SubDag タスクを実行している Airflow ワーカーがクラッシュすると、SubDag 内のすべてのタスクが失敗し、ワークフローの信頼性が低下します。

Python オペレーターを完全に分離するには、PythonOperators 内でのみ Python コードを実行するべきですか?

目的に応じていくつかの選択肢があります。

Python 依存関係の分離を維持することが唯一の目的である場合は、PythonVirtualenvOperator を使用できます。

KubernetesPodOperator の使用を検討してください。このオペレーターを使用すると、Kubernetes Pod を定義して、他のクラスタでその Pod を実行できます。

Google Cloud 以外で KubernetesPodOperator を使用するにはどうすればよいですか?

GKE クラスタでの認証方法を指定する構成ファイルをマウントし、このファイルを環境のバケット内の /data フォルダに配置します。

このフォルダは Cloud Composer 環境全体にわたってマウントされます。

カスタムのバイナリ パッケージや非 PyPI パッケージを追加するにはどうすればよいですか?

プライベート パッケージ リポジトリでホストされているパッケージをインストールできます。

また、KubernetesPodOperator を使用して、カスタム パッケージで構築された独自のイメージで Kubernetes Pod を実行することもできます。

DAG とそのタスクに対して一律に引数を渡すにはどうすればよいですか?

Jinja テンプレートに対する Airflow の組み込みサポートを使用して、テンプレート フィールドで使用できる引数を渡すことができます。

テンプレート置換はいつ行われますか?

テンプレート置換は、オペレーターの pre_execute 関数が呼び出される直前の Airflow ワーカーで行われます。これは、実際にはタスクが実行される直前までテンプレート置換が行われないことを意味します。

どのオペレーター引数がテンプレート置換をサポートするのかを知るにはどうすればよいですか?

Jinja2 のテンプレート置換をサポートするオペレーターの引数は、そのように明示的にマークされます。

オペレーターの定義内の template_fields フィールドを確認してください。このフィールドにテンプレート置換の対象となる引数名のリストが示されています。

たとえば、BashOperator では、引数 bash_command および env のテンプレートがサポートされています。

次のステップ