DAG(ワークフロー)の作成

このガイドでは、Cloud Composer 環境で実行する Apache Airflow 有向非巡回グラフ(DAG)の作成方法を説明します。

DAG を構造化する

Airflow DAG は Python ファイルで定義されており、DAG 定義、演算子、演算子の関係といったコンポーネントで構成されています。次のコード スニペットは、コンテキスト外の各コンポーネントの例を示しています。

  1. DAG の定義

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:
  2. 行う処理を記述する演算子。演算子のインスタンス化はタスクと呼ばれます。

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')
  3. 処理を完了する順序を記述する演算子の関係

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

次のワークフローは、完全な処理の例です。hello_python タスクと goodbye_bash タスクという 2 つのタスクで構成されています。

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow DAG の定義に関する詳細については、Airflow チュートリアルAirflow のコンセプトをご覧ください。

演算子

次の例は、いくつかの一般的な Airflow 演算子を示しています。Airflow 演算子の信頼できるリファレンスについては、Apache Airflow API リファレンスか、core および contrib 演算子のソースコードをご覧ください。

BashOperator

BashOperator を使用してコマンドライン プログラムを実行します。

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer では、ワーカーの bash スクリプトで、指定されたコマンドが実行されます。ワーカーは Debian ベースの Docker コンテナであり、いくつかのパッケージが含まれています。

PythonOperator

PythonOperator を使用して任意の Python コードを実行します。Cloud Composer により、次のパッケージを含むコンテナ内で Python コードが実行されます。

追加の Python パッケージをインストールするには、Python 依存関係のインストールをご覧ください。

Google Cloud Platform の演算子

Google Cloud Platform Airflow の演算子を使用して、Google Cloud Platform プロダクトを使用するタスクを実行します。Cloud Composer によって環境のプロジェクトへの Airflow 接続が自動的に構成されます。

EmailOperator

EmailOperator を使用して DAG からメールを送信します。Cloud Composer 環境からメールを送信するには、SendGrid を使用するように環境を構成する必要があります。

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

通知

DAG 内の演算子が失敗したときにメール通知を送信するには、email_on_failureTrue に設定します。Cloud Composer 環境からメール通知を送信するには、SendGrid を使用するように環境を構成する必要があります。

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

よくある質問

DAG の作成

演算子の使用

次のステップ

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...