编写 DAG(工作流)

本指南介绍如何编写在 Cloud Composer 环境中运行的 Apache Airflow 有向无环图 (DAG)。

构建 DAG

Airflow DAG 在 Python 文件中定义,包含以下几个组成部分:DAG 定义、操作器和操作器关系。以下代码段仅显示了每个组成部分的示例,而未提供上下文:

  1. DAG 定义。

    Airflow 2

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

    Airflow 1

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:

  2. 操作器(用于描述要完成的任务)。一个运算符实例称为一项任务

    Airflow 2

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

    Airflow 1

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

  3. 任务关系,用于描述任务的完成顺序。

    Airflow 2

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

    Airflow 1

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

以下示例演示了一个完整的工作流,该工作流包含 hello_pythongoodbye_bash 两项任务:

Airflow 2

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

Airflow 1

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

如需详细了解如何定义 Airflow DAG,请参阅 Airflow 教程Airflow 概念

操作器

以下示例演示了几个常用的 Airflow 运算符。如需查看 Airflow 运算符的权威参考信息,请参阅 Apache Airflow API 参考文档,或浏览 corecontribproviders 运算符的源代码。

BashOperator

BashOperator 用于运行命令行程序。

Airflow 2

from airflow.operators import bash
    # Create BigQuery output dataset.
    make_bq_dataset = bash.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command=f'bq ls {bq_dataset_name} || bq mk {bq_dataset_name}')

Airflow 1

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer 在工作器的 Bash 脚本中运行提供的命令。 工作器是一个基于 Debian 的 Docker 容器,其中包含多个软件包。

PythonOperator

PythonOperator 用于运行任意 Python 代码。

Cloud Composer 在一个容器中运行 Python 代码,其中包含您的环境中使用的 Cloud Composer 映像版本的软件包。

如需安装其他 Python 软件包,请参阅安装 Python 依赖项

Google Cloud 运算符

Google Cloud Airflow 运算符用于运行使用 Google Cloud 产品的任务。Cloud Composer 会自动配置与环境项目之间的 Airflow 连接

EmailOperator

EmailOperator 用于从 DAG 发送电子邮件。如需从 Cloud Composer 环境发送电子邮件,您必须将您的环境配置为使用 SendGrid

Airflow 2

from airflow.operators import email
    # Send email confirmation (you will need to set up the email operator
    # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification
    # for more info on configuring the email operator in Cloud Composer)
    email_summary = email.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

Airflow 1

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

通知

如果您希望在 DAG 中的运算符发生失败时发送电子邮件通知,可以将 email_on_failure 设置为 True。如需从 Cloud Composer 环境发送电子邮件通知,您必须将您的环境配置为使用 SendGrid

Airflow 2

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': project_id
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

Airflow 1

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

指南

  1. 将任何自定义 Python 库放入嵌套目录下一个 DAG 的 ZIP 归档文件中。不要将这些库放入 DAG 目录顶层。

    Airflow 在扫描 dags/ 文件夹时,只会检查以下 Python 模块中的 DAG:位于 DAGs 文件夹顶层的 Python 模块,以及位于某一 ZIP 归档文件(该文件也位于顶层 dags/ 文件夹)顶层的 Python 模块。如果 Airflow 在某一 ZIP 归档文件中遇到既不包含 airflow 又不包含 DAG 子字符串的 Python 模块,则 Airflow 会停止处理该 ZIP 归档文件,并仅返回在此之前发现的 DAG。

  2. 为实现容错功能,请不要在同一个 Python 模块中定义多个 DAG 对象。

  3. 不要将 subDAG 定义为顶层对象。

    一般情况下,Airflow 将拾取 dags/ 目录下某一模块的全局命名空间中的 DAG 对象作为顶层 DAG。被定义为顶层对象的任何 subDag 及其嵌入到的其他 DAG 均按其自己的调度表执行。

  4. 将 DAG 解析时所需的文件放入 dags/(而非 data/)目录中。data/ 目录不会装载到 Web 服务器中。

有关编写 DAG 的常见问题解答

如果我想在多个 DAG 中运行相同或类似的任务,如何尽量减少重复代码?

为了最大限度地减少重复代码,我们建议定义库和封装容器。

如何在 DAG 文件之间重复使用代码?

将您的工具函数放入一个本地 Python 库中并导入这些函数。您可以在环境存储桶内 dags/ 文件夹中的任何 DAG 中引用这些函数。

如何尽量降低出现不同定义的风险?

例如,您有两个团队希望将原始数据汇总成收入指标。为实现此目的,这两个团队都各自编写了一个略微不同的任务。建议针对收入数据来定义库,这样,DAG 实现者就必须阐明要汇总的收入的定义。

如何设置 DAG 之间的依赖项?

这取决于您想要如何定义依赖项。

如果您有两个 DAG(即 DAG A 和 DAG B),并且希望 DAG B 在 DAG A 之后触发,则可以在 DAG A 末尾添加一个 TriggerDagRunOperator

如果 DAG B 仅依赖于 DAG A 生成的工件(例如 Pub/Sub 消息),那么可能更适合使用传感器。

如果 DAG B 与 DAG A 紧密集成,则或许可以将两个 DAG 合并为一个 DAG。

如何将唯一运行 ID 传递给某一 DAG 及其任务?

例如,您想传递 Dataproc 集群名称和文件路径。

您可以通过在 PythonOperator 中返回 str(uuid.uuid4()) 来随机生成一个唯一 ID。这会将 ID 放入 XComs 中,以便您通过模板化字段在其他运算符中引用该 ID。

在生成 uuid 之前,请考虑 DagRun 专用的 ID 是否更加有用。您还可以使用在 Jinja 替代变量中引用这些 ID。

如何在 DAG 中分离任务?

每项任务都应该是一个具有幂等性的工作单元。因此,应避免将一个涉及多个步骤的工作流封装到单项任务中,例如,在 PythonOperator 中运行一个复杂程序。

Airflow 本身具有重复使用工作流定义代码的机制,即 SubDagOperator。不过,在 Cloud Composer 中使用此运算符时需要注意一些事项

如果我需要汇总多个来源中的数据,那么是否应该在一个 DAG 中定义多项任务?

例如,您有多个包含原始数据的表格,并且希望针对每个表格创建每日汇总数据。这些任务并不相互依赖。在这种情况下,您是应该为每个表格分别创建一项任务和一个 DAG,还是应该创建一个通用 DAG?

如果您能接受各项任务共用相同的 DAG 级属性(例如 schedule_interval),那么最好在一个 DAG 中定义多项任务。否则,可以通过一个 Python 模块生成多个 DAG(将这些 DAG 放入该模块的 globals() 中即可),以尽量减少重复代码。

如何限制在一个 DAG 中运行的并发任务数量?

例如,您想避免超出 API 用量限额或配额,或避免同时运行过多进程。

您可以在 Airflow 网页界面中定义 Airflow 池,并将任务与 DAG 中的现有池相关联。

有关使用运算符的常见问题解答

我是否应该使用 DockerOperator

我们不推荐使用 DockerOperator,除非用于在远程 Docker 安装(而非环境集群)中启动容器。在 Cloud Composer 环境中,运算符无权访问 Docker 守护程序。

请改用 KubernetesPodOperatorGKEPodOperator。这些运算符可以将 Kubernetes pod 分别发布到 Kubernetes 或 GKE 集群。请注意,不建议将 pod 发布到环境的集群中,因为这会导致资源争用情况。

我是否应该使用 SubDagOperator

我们不建议您使用 SubDagOperator

虽然 SubDagOperator 可以提供封装处理,但 SubDag 任务需要一个任务槽。如果运行 SubDag 任务的 Airflow 工作器崩溃,则该 SubDag 中的所有任务都将失败,从而导致工作流不可靠。

如果我想将 Python 运算符完全隔离,是否应该仅在 PythonOperators 中运行 Python 代码?

有几种方案可供您选择,具体取决于您的目标。

如果您只想维护单独的 Python 依赖项,可以使用 PythonVirtualenvOperator

请考虑使用 KubernetesPodOperator。通过此运算符,可定义 Kubernetes pod 并在其他集群中运行这些 pod。

如何在 Google Cloud 外部使用 KubernetesPodOperator

您可以装载一个配置文件,在其中指定如何向 GKE 集群进行身份验证,并将该文件放在环境存储桶的 /data 文件夹中。

此文件夹会装载到整个 Cloud Composer 环境中。

如何添加自定义二进制文件或非 PyPI 软件包?

您可以安装托管在私有软件包代码库中的软件包

您还可以使用 KubernetesPodOperator 通过自己用自定义软件包构建的映像来运行 Kubernetes pod。

如何将参数统一传递给某一 DAG 及其任务?

您可以使用 Airflow 对 Jinja 模板化的内置支持传递可在模板化字段中使用的参数。

何时会发生模板替换?

在开始调用运算符的 pre_execute 函数之前,系统会在 Airflow 工作器上进行模板替换。实际上,这意味着模板只有在任务快临近运行时才会被替换。

如何确定哪些运算符参数支持模板替换?

支持 Jinja2 模板替换的运算符参数均有此类明确标注。

查找运算符定义中的 template_fields 字段,该字段包含将接受模板替换的参数名称列表。

例如,请参阅 BashOperator,它支持 bash_commandenv 参数模板化。

后续步骤