DAG 问题排查(工作流)

本页面介绍常见工作流问题的问题排查步骤和信息。

排查工作流问题

要开始排查问题,请按如下所述操作:

  1. 检查 Airflow 日志
  2. 查看 Google Cloud 的操作套件
  3. 在 Cloud Console 中,检查运行环境的 Google Cloud 组件 对应的页面上是否存在错误。
  4. Airflow 网页界面中,检查相应 DAG 的 Graph View 中是否显示失败的任务实例。

    提示:如需浏览大型 DAG 以查找失败的任务实例,请替换 Web 服务器的默认 dag_orientation 配置,以将图表视图方向从 LR(从左到右)更改为 RL(从右到左)。

    部分
    webserver dag_orientation LRTBRLBT

调试运算符故障

要调试某个操作器的故障,请执行以下操作:

  1. 检查任务特定的错误
  2. 检查 Airflow 日志
  3. 查看 Google Cloud 的操作套件
  4. 检查该运算符特定的日志。
  5. 修复错误。
  6. 将 DAG 上传dags/ 文件夹。
  7. 在 Airflow 网页界面中,清除该 DAG 的过往状态
  8. 恢复或运行该 DAG。

常见问题

以下部分介绍了一些常见 DAG 问题的症状和可行修复措施。

任务失败且没有发出日志

Google Kubernetes Engine pod 受到 Kubernetes pod 生命周期和 pod 逐出的约束。在 Cloud Composer 中,任务高峰和工作器的协同调度是 pod 逐出的两个最常见原因。

当特定 pod 过度使用某个节点的资源(相对于该节点配置的资源消耗预期)时,可能会发生 pod 逐出。例如,当一个 pod 中运行多个内存密集型任务时,系统可能会执行逐出,并且这些任务的组合负载会导致该 pod 运行的节点超出内存消耗限制。

如果 Airflow 工作器 pod 被逐出,则该 pod 上运行的所有任务实例都会中断,之后被 Airflow 标记为失败。

日志被缓冲。如果工作器 pod 在缓冲区刷新之前被逐出,则不会发出日志。如果任务失败且没有日志,则表示 Airflow 工作器已因内存不足 (OOM) 重启。即使未发出 Airflow 日志,Cloud Logging 中也可能存在一些日志。您可以查看日志,例如,在 Google Cloud Console 中选择环境,导航到日志标签页,然后查看所有日志 -> Airflow 日志 -> 工作器 -> (单个工作器) 下单个工作器的日志。

DAG 的执行受 RAM 限制。每个任务的执行都以两个 Airflow 进程开始,即任务执行进程与任务监控进程。每个节点最多可以执行 6 项并发任务(也就是说,Airflow 模块大约需要加载 12 个进程)。任务的执行可能会使用更多内存,具体取决于 DAG 的性质。

症状

  1. 在 Cloud Console 中,转到 Kubernetes Engine -> 工作负载面板。

    打开“工作负载”面板

  2. 如果存在显示 Evictedairflow-worker pod,请点击每个被逐出的 pod 并查看窗口顶部的 The node was low on resource: memory 消息。

修复

DAG 加载导入超时

具体情况:

  • Airflow 网页界面:DAG 列表页面顶部出现一个红色提醒框,其中显示 Broken DAG: [/path/to/dagfile] Timeout
  • Google Cloud 的运维套件:airflow-scheduler 日志包含类似如下的条目:
    • “ERROR - Process timed out”
    • “ERROR - Failed to import: /path/to/dagfile”
    • “AirflowTaskTimeout: Timeout”

修复:

替换 dagbag_import_timeout Airflow 配置选项,并留出更多时间来进行 DAG 解析:

部分
core dagbag_import_timeout 新超时值

提升进出 Airflow 数据库的网络流量

您的环境的 GKE 集群和 Airflow 数据库之间的流量网络量取决于 DAG 数量、DAG 中的任务数量以及 DAG 访问 Airflow 数据库中的数据的方式。以下因素可能会影响网络用量:

  • 对 Airflow 数据库的查询。如果您的 DAG 执行了大量查询,则会生成大量流量。示例:在继续其他任务、查询 XCom 表、转储 Airflow 数据库内容之前检查任务的状态。

  • 任务数量过多。要安排的任务越多,产生的网络流量就越多。此考虑因素适用于 DAG 中的任务总数和调度频率。在 Airflow 调度器安排 DAG 运行时,它会向 Airflow 数据库发出查询并产生流量。

  • Airflow 网页界面会向 Airflow 数据库发出查询,因此会产生网络流量。大量使用包含图表、任务和图示的页面可能会产生大量网络流量。

DAG 导致 Airflow Web 服务器崩溃或导致其返回 502 gateway timeout 错误

有多种不同原因可能会导致 Web 服务器故障。检查 Cloud Logging 中的 airflow-webserver 日志,确定 502 gateway timeout 错误的原因。

重量级计算负载

避免在 DAG 解析时运行重量级计算负载。与工作器和调度器节点不同(它们可以通过自定义机器类型来提高 CPU 和内存容量),Web 服务器使用的是固定机器类型,因此,如果解析时的计算量过于庞大,可能会导致 DAG 解析失败。

请注意,Web 服务器具有 2 个 vCPU 和 2 GB 内存core-dagbag_import_timeout 的默认值为 30 秒。此超时值决定了 Airflow 在将 Python 模块载入 dags/ 文件夹时所花费的时长上限。

权限不正确

Web 服务器不是使用与工作器和调度器相同的服务帐号运行。因此,工作器和调度器可能可以访问 Web 服务器无法访问的用户管理资源。

我们建议您避免在 DAG 解析期间访问非公开资源。在某些情况下,如果需要访问此类资源,您需要向 Web 服务器的服务帐号授予相应权限。该服务帐号名称来源于您的 Web 服务器网域。例如,如果网域为 foo-tp.appspot.com,则服务帐号为 foo-tp@appspot.gserviceaccount.com

DAG 错误

Web 服务器在 App Engine 上运行,并且独立于环境的 GKE 集群。Web 服务器会解析 DAG 定义文件;如果 DAG 中存在错误,则可能发生 502 gateway timeout。如果有问题的 DAG 没有导致 GKE 中正在运行的任何进程中断,则 Airflow 会在 Web 服务器未正常发挥作用的情况下正常工作。在这种情况下,您可以使用 gcloud composer environments run 来从您的环境中检索详细信息,并可在 Web 服务器变得不可用时使用它来解决问题。

在其他情况下,您可以在 GKE 中运行 DAG 解析,并查找抛出致命 Python 异常或发生超时(默认为 30 秒)的 DAG。 要进行问题排查,请连接到 Airflow 工作器容器中的远程 shell,并测试是否存在语法错误。 如需了解详情,请参阅测试 DAG

在任务执行期间或紧接其后抛出 Lost connection to MySQL server during query 异常

满足以下条件时,通常会发生 Lost connection to MySQL server during query 异常:

  • DAG 使用 PythonOperator 或自定义运算符。
  • DAG 向 Airflow 数据库发出查询。

如果通过可调用的函数进行了多次查询,则回溯可能会错误地指向 Airflow 代码中的 self.refresh_from_db(lock_for_update=True) 行;它是任务执行后的第一个数据库查询。导致出现异常的实际原因发生在这之前,即 SQLAlchemy 会话未正确关闭时。

SQLAlchemy 会话限定在线程范围内,并且在稍后可在 Airflow 代码中继续的可调用函数会话中创建。如果一次会话中的查询之间存在严重延迟,则表示 MySQL 或 PostgreSQL 服务器可能已经关闭了连接。Cloud Composer 环境中的连接超时设置为大约 10 分钟。

修复:

  • 使用 airflow.utils.db.provide_session 修饰器。此修饰器在 session 参数中提供到 Airflow 数据库的有效会话,并在函数结束时正确关闭会话。
  • 请勿使用单个长时间运行的函数。请改为将所有数据库查询移动到单独的函数中,以使多个函数具有 airflow.utils.db.provide_session 修饰器。在这种情况下,会话会在检索查询结果后自动关闭。