Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
本指南介绍如何编写在 Cloud Composer 环境中运行的 Apache Airflow 有向无环图 (DAG)。
Apache Airflow 不具备强大的 DAG 和任务隔离机制, 我们建议您使用单独的生产环境和测试环境 以防止 DAG 干扰如需了解详情,请参阅测试 DAG。
构建 Airflow DAG
Airflow DAG 在 Python 文件中定义,由以下部分组成 组件:
- DAG 定义
- Airflow 运算符
- 运算符关系
以下代码段将脱离具体情境分别显示每个组成部分的示例。
DAG 定义
以下示例演示了 Airflow DAG 定义:
Airflow 2
Airflow 1
运算符和任务
Airflow 操作器用于描述要完成的任务。任务是运算符的特定实例。
Airflow 2
Airflow 1
任务关系
任务关系描述了任务中的 必须完成的工作
Airflow 2
Airflow 1
Python 中的完整 DAG 工作流示例
以下工作流是一个完整的有效 DAG 模板,其中包含
两个任务:一个 hello_python
任务和一个 goodbye_bash
任务:
Airflow 2
Airflow 1
如需详细了解如何定义 Airflow DAG,请参阅 Airflow 教程和 Airflow 概念。
Airflow 运算符
以下示例演示了几个常用的 Airflow 运算符。如需查看 Airflow 运算符的权威参考,请参阅运算符和钩子参考文档以及提供程序索引。
BashOperator
使用 BashOperator 运行命令行程序。
Airflow 2
Airflow 1
Cloud Composer 在 Airflow 工作器的 Bash 脚本中运行提供的命令。工作器是一个基于 Debian 的 Docker 容器,其中包含多个软件包。
gcloud
命令,包括用于处理 Cloud Storage 存储桶的gcloud storage
子命令。bq
命令kubectl
命令
PythonOperator
使用 PythonOperator 运行任意 Python 代码。
Cloud Composer 在一个容器中运行 Python 代码,其中包含您的环境中使用的 Cloud Composer 映像版本的软件包。
如需安装其他 Python 软件包,请参阅安装 Python 依赖项。
Google Cloud 运算符
如需运行使用 Google Cloud 产品的任务,请使用 Google Cloud Airflow 运算符。例如,BigQuery 运算符用于查询和处理 BigQuery 中的数据。
还有许多适用于 Google Cloud 和 Google Cloud 提供的各项服务的 Airflow 运算符。请参阅 如需查看完整列表,请参阅 Google Cloud Operators。
Airflow 2
Airflow 1
EmailOperator
使用 EmailOperator 用于从 DAG 发送电子邮件。发送 从 Cloud Composer 环境中发送的电子邮件 将您的环境配置为使用 SendGrid。
Airflow 2
Airflow 1
运营商故障通知
如果您希望在 DAG 中的运算符发生失败时发送电子邮件通知,可以将 email_on_failure
设置为 True
。如需从 Cloud Composer 环境发送电子邮件通知,您必须将您的环境配置为使用 SendGrid。
Airflow 2
Airflow 1
DAG 工作流指南
将任何自定义 Python 库放入嵌套目录下一个 DAG 的 ZIP 归档文件中。不要将这些库放入 DAG 目录顶层。
Airflow 扫描
dags/
文件夹时,只会检查以下文件夹中的 DAG 位于 DAGs 文件夹顶层和顶层的 Python 模块 (也位于顶级dags/
文件夹中)。如果 Airflow 在某一 ZIP 归档文件中遇到既不包含airflow
又不包含DAG
子字符串的 Python 模块,则 Airflow 会停止处理该 ZIP 归档文件,Airflow 仅返回在此之前找到的 DAG。使用 Airflow 2 而不是 Airflow 1。
Airflow 社区不再发布 Airflow 1 的新次要版本或补丁版本。
为实现容错功能,请不要在同一个 Python 模块中定义多个 DAG 对象。
请勿使用子 DAG。相反, 在 DAG 中对任务进行分组。
将 DAG 解析时所需的文件放入
dags/
文件夹,而不是 位于data/
文件夹中。按照 测试 DAG 的说明。
确认开发的 DAG 不会过多增加 DAG 解析时间。
Airflow 任务可能会因多种原因而失败。为避免整个 DAG 运行失败,我们建议您启用任务重试。将重试次数上限设置为
0
意味着系统不会执行重试。我们建议您使用
0
以外的任务重试值替换default_task_retries
选项。此外,您还可以在任务级别设置retries
参数。如果您想在 Airflow 任务中使用 GPU,请根据使用带有 GPU 的机器的节点创建单独的 GKE 集群。使用 GKEStartPodOperator 运行任务。
避免在运行其他 Airflow 组件(调度器、工作器、Web 服务器)的集群节点池中运行 CPU 和内存密集型任务。请改用 KubernetesPodOperator 或 GKEStartPodOperator。
将 DAG 部署到环境中时,请仅上传 是解释和执行 DAG 所必需的, 放入
/dags
文件夹中。限制
/dags
文件夹中的 DAG 文件数量。Airflow 会持续解析
/dags
文件夹中的 DAG。解析是循环遍历 DAG 文件夹的过程,需要加载的文件数量(及其依赖项)会影响 DAG 解析和任务调度的性能。使用 100 个文件(每个文件包含 100 个 DAG)比使用 10,000 个文件(每个文件包含 1 个 DAG)的效率要高得多,因此建议进行此类优化。这种优化可平衡 DAG 编写和管理的解析时间和效率。例如,您还可以考虑部署 10,000 个 DAG 文件 创建 100 个 ZIP 文件,每个文件包含 100 个 DAG 文件。
除了上述提示之外,如果您有超过 10,000 个 DAG 文件,则通过编程方式生成 DAG 可能是一个不错的选择。例如: 您可以实现单个 Python DAG 文件 DAG 对象(例如 20、100 个 DAG 对象)。
避免使用已废弃的 Airflow 运算符
下表中列出的运算符已弃用。部分运算符 在 Cloud Composer 1 的早期版本中受支持。请避免在 DAG 中使用它们。请改为使用 Google 提供的最新替代方案。
已废弃的运算符 | 要使用的运算符 |
---|---|
BigQueryExecuteQueryOperator | BigQueryInsertJobOperator |
BigQueryPatchDatasetOperator | BigQueryUpdateTableOperator |
DataflowCreateJavaJobOperator | BeamRunJavaPipelineOperator |
DataflowCreatePythonJobOperator | BeamRunPythonPipelineOperator |
DataprocScaleClusterOperator | DataprocUpdateClusterOperator |
DataprocSubmitPigJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkSqlJobOperator | DataprocSubmitJobOperator |
DataprocSubmitSparkJobOperator | DataprocSubmitJobOperator |
DataprocSubmitHadoopJobOperator | DataprocSubmitJobOperator |
DataprocSubmitPySparkJobOperator | DataprocSubmitJobOperator |
MLEngineManageModelOperator | MLEngineCreateModelOperator、MLEngineGetModelOperator |
MLEngineManageVersionOperator | MLEngineCreateVersion、MLEngineSetDefaultVersion、MLEngineListVersions、MLEngineDeleteVersion |
GCSObjectsWtihPrefixExistenceSensor | GCSObjectsWithPrefixExistenceSensor |
有关编写 DAG 的常见问题解答
如果我想在多个 DAG 中运行相同或类似的任务,如何尽量减少重复代码?
为了最大限度地减少重复代码,我们建议定义库和封装容器。
如何在 DAG 文件之间重复使用代码?
将您的实用函数放入一个本地 Python 库中并导入这些函数。您可以在环境存储桶内 dags/
文件夹中的任何 DAG 中引用这些函数。
如何尽量降低出现不同定义的风险?
例如,您有两个团队希望将原始数据汇总成收入指标。为了完成相同的任务,这两个团队各自编写了两项略微不同的任务 这个词。建议针对收入数据来定义库,这样,DAG 实现者就必须阐明要汇总的收入的定义。
如何设置 DAG 之间的依赖项?
这取决于您想要如何定义依赖项。
如果您有两个 DAG(即 DAG A 和 DAG B),并且希望 DAG B 在 DAG A 之后触发,则可以在 DAG A 末尾添加一个 TriggerDagRunOperator
。
如果 DAG B 仅依赖于 DAG A 生成的工件(例如 Pub/Sub 消息),那么可能更适合使用传感器。
如果 DAG B 与 DAG A 紧密集成,则或许可以将两个 DAG 合并为一个 DAG。
如何将唯一运行 ID 传递给某一 DAG 及其任务?
例如,您想传递 Dataproc 集群名称和文件路径。
您可以通过在 PythonOperator
中返回 str(uuid.uuid4())
来随机生成一个唯一 ID。这会将 ID
XComs
,以便在其他运算符中引用该 ID
通过模板化字段。
在生成 uuid
之前,请考虑 DagRun 专用的 ID 是否更加有用。您还可以使用宏在 Jinja 替代变量中引用这些 ID。
如何在 DAG 中分离任务?
每项任务都应该是一个具有幂等性的工作单元。因此,应避免将一个涉及多个步骤的工作流封装到单项任务中,例如,在 PythonOperator
中运行一个复杂程序。
如果我需要汇总多个来源中的数据,那么是否应该在一个 DAG 中定义多项任务?
例如,您有多个包含原始数据的表格,并且希望针对每个表格创建每日汇总数据。这些任务并不相互依赖。在这种情况下,您是应该为每个表格分别创建一项任务和一个 DAG,还是应该创建一个通用 DAG?
如果您能接受各项任务共用相同的 DAG 级属性(例如 schedule_interval
),那么最好在一个 DAG 中定义多项任务。否则,可以通过一个 Python 模块生成多个 DAG(将这些 DAG 放入该模块的 globals()
中即可),以尽量减少重复代码。
如何限制在一个 DAG 中运行的并发任务数量?
例如,您想避免超出 API 用量限额或配额,或避免同时运行过多进程。
你可以定义 Airflow 网页界面中的 Airflow 池以及关联任务 现有池共享
有关使用运算符的常见问题解答
我是否应该使用 DockerOperator
?
我们不建议使用
DockerOperator
(除非将其用于启动
容器(而不是在环境的
)。在 Cloud Composer 环境中,运维者无权访问 Docker 守护程序。
请改用 KubernetesPodOperator
或
GKEStartPodOperator
。这些运算符可将 Kubernetes Pod
Kubernetes 或 GKE 集群。请注意,
建议将 Pod 启动到环境的集群中,因为这可能会导致
再到资源竞争
我是否应该使用 SubDagOperator
?
我们不建议您使用 SubDagOperator
。
按照任务分组中的建议,使用替代方法。
如果我想将 Python 运算符完全隔离,是否应该仅在 PythonOperators
中运行 Python 代码?
有几种方案可供您选择,具体取决于您的目标。
如果您只想维护单独的 Python 依赖项,可以使用 PythonVirtualenvOperator
。
请考虑使用 KubernetesPodOperator
。通过此运算符,可定义 Kubernetes pod 并在其他集群中运行这些 pod。
如何添加自定义二进制文件或非 PyPI 软件包?
如何将参数统一传递给某一 DAG 及其任务?
您可以使用 Airflow 的内置支持 Jinja 模板,用于传递可以使用的参数 。
何时会发生模板替换?
在开始调用运算符的 pre_execute
函数之前,系统会在 Airflow 工作器上进行模板替换。实际上,这意味着模板
才会被替换。
如何确定哪些运算符参数支持模板替换?
支持 Jinja2 模板替换的运算符参数均有此类明确标注。
查找运算符定义中的 template_fields
字段,该字段包含将接受模板替换的参数名称列表。
例如,请参阅 BashOperator
,它支持 bash_command
和 env
参数模板化。