编写 DAG(工作流)

本指南介绍如何编写在 Cloud Composer 环境中运行的 Apache Airflow 有向无环图 (DAG)。

构建 DAG

Airflow DAG 在 Python 文件中定义,包含以下几个组成部分:DAG 定义、操作器和操作器关系。以下代码段仅显示了每个组成部分的示例,而未提供上下文:

  1. DAG 定义。

    import datetime
    
    from airflow import models
    default_dag_args = {
        # The start_date describes when a DAG is valid / can be run. Set this to a
        # fixed point in time rather than dynamically, since it is evaluated every
        # time a DAG is parsed. See:
        # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
        'start_date': datetime.datetime(2018, 1, 1),
    }
    
    # Define a DAG (directed acyclic graph) of tasks.
    # Any task you create within the context manager is automatically added to the
    # DAG object.
    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:
  2. 操作器(用于描述要完成的任务)。一个操作器实例称为一项任务

    from airflow.operators import bash_operator
    from airflow.operators import python_operator
        def greeting():
            import logging
            logging.info('Hello World!')
    
        # An instance of an operator is called a task. In this case, the
        # hello_python task calls the "greeting" Python function.
        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)
    
        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')
  3. 操作器关系(用于描述任务的完成顺序)。

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

以下示例演示了一个完整的工作流,该工作流包含两项任务,即 hello_python 任务和 goodbye_bash 任务:

from __future__ import print_function

import datetime

from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator

default_dag_args = {
    # The start_date describes when a DAG is valid / can be run. Set this to a
    # fixed point in time rather than dynamically, since it is evaluated every
    # time a DAG is parsed. See:
    # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
    'start_date': datetime.datetime(2018, 1, 1),
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
        'composer_sample_simple_greeting',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    def greeting():
        import logging
        logging.info('Hello World!')

    # An instance of an operator is called a task. In this case, the
    # hello_python task calls the "greeting" Python function.
    hello_python = python_operator.PythonOperator(
        task_id='hello',
        python_callable=greeting)

    # Likewise, the goodbye_bash task calls a Bash script.
    goodbye_bash = bash_operator.BashOperator(
        task_id='bye',
        bash_command='echo Goodbye.')

    # Define the order in which the tasks complete by using the >> and <<
    # operators. In this example, hello_python executes before goodbye_bash.
    hello_python >> goodbye_bash

如需详细了解如何定义 Airflow DAG,请参阅 Airflow 教程Airflow 概念

操作器

以下示例演示了几个常用的 Airflow 操作器。如需查看 Airflow 操作器的权威参考,请参阅 Apache Airflow API 参考,或浏览 corecontrib 操作器的源代码。

BashOperator

BashOperator 用于运行命令行程序。

from airflow.operators import bash_operator
    # Create BigQuery output dataset.
    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls {} || bq mk {}'.format(
            bq_dataset_name, bq_dataset_name))

Cloud Composer 在工作器的 Bash 脚本中运行提供的命令。工作器是一个基于 Debian 的 Docker 容器,其中包含多个软件包。

PythonOperator

PythonOperator 用于运行任意 Python 代码。Cloud Composer 在一个包含以下软件包的容器中运行 Python 代码:

要安装其他 Python 软件包,请参阅安装 Python 依赖项

Google Cloud Platform 操作器

您可以使用 Google Cloud Platform Airflow 操作器来运行使用 Google Cloud Platform 产品的任务。Cloud Composer 会自动配置与环境项目之间的 Airflow 连接

EmailOperator

您可以使用 EmailOperator 从 DAG 发送电子邮件。要从 Cloud Composer 环境发送电子邮件,您必须将您的环境配置为使用 SendGrid

from airflow.operators import email_operator
    # Send email confirmation
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=models.Variable.get('email'),
        subject='Sample BigQuery notify data ready',
        html_content="""
        Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date}
        12AM. The most popular question was '{question_title}' with
        {view_count} views. Top 100 questions asked are now available at:
        {export_location}.
        """.format(
            min_date=min_query_date,
            max_date=max_query_date,
            question_title=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][0] }}'
            ),
            view_count=(
                '{{ ti.xcom_pull(task_ids=\'bq_read_most_popular\', '
                'key=\'return_value\')[0][1] }}'
            ),
            export_location=output_file))

通知

如果您希望在 DAG 中的操作器发生失败时发送电子邮件通知,可以将 email_on_failure 设置为 True。要从 Cloud Composer 环境发送电子邮件通知,您必须将您的环境配置为使用 SendGrid

from airflow import models
default_dag_args = {
    'start_date': yesterday,
    # Email whenever an Operator in the DAG fails.
    'email': models.Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

with models.DAG(
        'composer_sample_bq_notify',
        schedule_interval=datetime.timedelta(weeks=4),
        default_args=default_dag_args) as dag:

常见问题解答

创建 DAG

使用操作器

后续步骤

此页内容是否有用?请给出您的反馈和评价:

发送以下问题的反馈:

此网页
Cloud Composer