编写 DAG(工作流)

本指南介绍如何编写在 Cloud Composer 环境中运行的 Apache Airflow 有向无环图 (DAG)。

构建 DAG

Airflow DAG 在 Python 文件中定义,包含以下几个组成部分:DAG 定义、操作器和操作器关系。以下代码段仅显示了每个组成部分的示例,而未提供上下文:

  1. DAG 定义。

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:
  2. 操作器(用于描述要完成的任务)。一个运算符实例称为一项任务

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')
  3. 操作器关系(用于描述任务的完成顺序)。

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

以下示例演示了一个完整的工作流,该工作流包含 hello_pythongoodbye_bash 两项任务:

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

如需详细了解如何定义 Airflow DAG,请参阅 Airflow 教程Airflow 概念

运算符

以下示例演示了几个常用的 Airflow 运算符。如需查看 Airflow 运算符的权威参考,请参阅 Apache Airflow API 参考文档,或浏览 corecontrib 运算符的源代码。

BashOperator

BashOperator 用于运行命令行程序。

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer 在工作器的 Bash 脚本中运行提供的命令。 工作器是一个基于 Debian 的 Docker 容器,其中包含多个软件包。

PythonOperator

PythonOperator 用于运行任意 Python 代码。Cloud Composer 在一个包含以下软件包的容器中运行 Python 代码:

要安装其他 Python 软件包,请参阅安装 Python 依赖项

Google Cloud 运算符

Google Cloud Airflow 运算符用于运行使用 Google Cloud 产品的任务。Cloud Composer 会自动配置与环境项目之间的 Airflow 连接

EmailOperator

EmailOperator 用于从 DAG 发送电子邮件。如需从 Cloud Composer 环境发送电子邮件,您必须将您的环境配置为使用 SendGrid

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

通知

如果您希望在 DAG 中的运算符发生失败时发送电子邮件通知,可以将 email_on_failure 设置为 True。如需从 Cloud Composer 环境发送电子邮件通知,您必须将您的环境配置为使用 SendGrid

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

指南

  1. 将任何自定义 Python 库放入嵌套目录下一个 DAG 的 ZIP 归档文件中。不要将这些库放入 DAG 目录顶层。

    Airflow 在扫描 dags/ 文件夹时,只会检查以下 Python 模块中的 DAG:位于 DAGs 文件夹顶层的 Python 模块,以及位于某一 ZIP 归档文件(该文件也位于顶层 dags/ 文件夹)顶层的 Python 模块。如果 Airflow 在某一 ZIP 归档文件中遇到既不包含 airflow 又不包含 DAG 子字符串的 Python 模块,则 Airflow 会停止处理该 ZIP 归档文件,并仅返回在此之前发现的 DAG。

  2. 为实现容错功能,请不要在同一个 Python 模块中定义多个 DAG 对象。

  3. 不要将 subDAG 定义为顶层对象。

    一般情况下,Airflow 将拾取 dags/ 目录下某一模块的全局命名空间中的 DAG 对象作为顶层 DAG。被定义为顶层对象的任何 subDag 及其嵌入到的其他 DAG 均按其自己的调度表执行。

  4. 将 DAG 解析时所需的文件放入 dags/(而非 data/)目录中。data/ 目录不会装载到 Web 服务器中。

常见问题解答

创建 DAG

使用运算符

后续步骤