DAG 问题排查

Cloud Composer 1 | Cloud Composer 2

本页面提供了常见工作流问题的问题排查步骤和信息。

许多 DAG 执行问题是由非最佳环境性能引起的。您可以按照优化环境性能和费用指南来优化 Cloud Composer 2 环境。

某些 DAG 执行问题可能是由 Airflow 调度器无法正常运行或最佳原因所致。请按照调度器问题排查说明解决此问题。

排查工作流问题

要开始排查问题,请按如下所述操作:

  1. 检查 Airflow 日志
  2. 查看 Monitoring 信息中心
  3. 查看 Cloud Monitoring
  4. 在 Cloud Console 中,请检查页面上是否存在环境组件的错误。
  5. Airflow 网页界面中,检查 DAG 视图的图表视图中是否有失败的任务实例。

    提示:如需浏览大型 DAG 以查找失败的任务实例,请替换以下 Airflow 配置选项,将图视图方向从 LR 更改为 RL:

    价值
    webserver dag_orientation LRTBRLBT

调试运算符故障

要调试某个操作器的故障,请执行以下操作:

  1. 检查任务特定的错误
  2. 检查 Airflow 日志
  3. 查看 Cloud Monitoring
  4. 查看特定于运营商的日志。
  5. 修正错误。
  6. 将 DAG 上传dags/ 文件夹。
  7. 在 Airflow 网页界面中,清除该 DAG 的过往状态
  8. 恢复或运行该 DAG。

常见问题

以下部分介绍了一些常见 DAG 问题的症状和可行修复措施。

由于 DAG 解析错误,任务失败且未发出日志

有时,可能会出现细微的 DAG 错误,从而导致 Airflow 调度器和 DAG 处理器能够调度任务(分别)解析 DAG 文件,但 Airflow 工作器无法执行来自此类 DAG 的任务,因为 Python DAG 文件中存在编程错误。这可能会导致 Airflow 任务标记为 Failed,且其执行期间没有日志。

解决方案:在 Airflow 工作器日志中验证 Airflow 工作器是否未出现因缺少 DAG 或 DAG 解析错误而导致的错误。

由于资源压力,任务失败且未发出日志

症状:在执行任务期间,负责执行 Airflow 任务的 Airflow 工作器的子进程突然中断。Airflow 工作器日志中显示的错误可能类似于以下错误:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

解决方案:

由于 Pod 逐出,任务失败且未发出日志

Google Kubernetes Engine Pod 受限于 Kubernetes Pod 生命周期和 pod 逐出。任务激增和工作器共同调度是 Cloud Composer 中逐出 pod 的两个最常见原因。

当特定 pod 过度使用某个节点的资源(相对于该节点配置的资源消耗预期)时,可能会发生 pod 逐出。例如,如果一个 pod 中运行多个内存密集型任务,并且这些任务的合并负载导致此 pod 运行的节点超出内存消耗限制,可能会发生逐出。

如果 Airflow 工作器 pod 被逐出,则该 pod 上运行的所有任务实例都会中断,之后被 Airflow 标记为失败。

日志被缓冲。如果工作器 pod 在缓冲区刷新之前被逐出,则不会发出日志。如果任务失败且没有日志,则表示 Airflow 工作器已因内存不足 (OOM) 重启。即使未发出 Airflow 日志,Cloud Logging 中仍可能存在部分日志。

要查看日志,请执行以下操作:

  1. 在 Google Cloud Console 中,转到环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到日志标签页。

  4. 查看所有日志 ->Airflow 日志 ->工作器 ->(单个工作器)下各个工作器的日志。

DAG 的执行存在内存限制。每个任务的执行都以两个 Airflow 进程开始,即任务执行进程与任务监控进程。每个节点最多可以执行 6 项并发任务(也就是说,Airflow 模块大约需要加载 12 个进程)。任务的执行可能会使用更多内存,具体取决于 DAG 的性质。

具体情况:

  1. 在 Google Cloud Console 中,转到工作负载页面。

    转到“工作负载”

  2. 如果存在显示 Evictedairflow-worker pod,请点击每个被逐出的 pod 并查看窗口顶部的 The node was low on resource: memory 消息。

修复:

DAG 加载导入超时

具体情况:

  • 在 Airflow 网页界面中,DAG 列表页面顶部会显示一个红色提醒框,其中显示 Broken DAG: [/path/to/dagfile] Timeout
  • 在 Cloud Monitoring 中:airflow-scheduler 日志包含类似于以下内容的条目:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

修复:

替换 dag_file_processor_timeout Airflow 配置选项,并留出更多时间来解析 DAG:

价值
core dag_file_processor_timeout 新超时值

提升进出 Airflow 数据库的网络流量

您的环境的 GKE 集群和 Airflow 数据库之间的流量网络量取决于 DAG 数量、DAG 中的任务数量以及 DAG 访问 Airflow 数据库中的数据的方式。以下因素可能会影响网络用量:

  • 对 Airflow 数据库的查询。如果您的 DAG 执行了大量查询,则会生成大量流量。示例:在继续其他任务、查询 XCom 表、转储 Airflow 数据库内容之前检查任务的状态。

  • 任务数量过多。要安排的任务越多,产生的网络流量就越多。此考虑因素适用于 DAG 中的任务总数和调度频率。在 Airflow 调度器安排 DAG 运行时,它会向 Airflow 数据库发出查询并产生流量。

  • Airflow 网页界面会向 Airflow 数据库发出查询,因此会产生网络流量。大量使用包含图表、任务和图示的页面可能会产生大量网络流量。

DAG 导致 Airflow Web 服务器崩溃或导致其返回 502 gateway timeout 错误

有多种不同原因可能会导致 Web 服务器故障。检查 Cloud Logging 中的 airflow-webserver 日志以确定 502 gateway timeout 错误的原因。

重量级计算负载

本部分仅适用于 Cloud Composer 1。

避免在 DAG 解析时运行重量级计算任务。

与工作器和调度器节点不同(它们可以进行自定义以具有更高的 CPU 和内存容量),Web 服务器使用固定的机器类型,这可能会导致解析时间计算量过大的 DAG 解析失败。

请注意,Web 服务器具有 2 个 vCPU 和 2 GB 内存core-dagbag_import_timeout 的默认值为 30 秒。此超时值决定了 Airflow 在将 Python 模块载入 dags/ 文件夹时所花费的时长上限。

权限不正确

本部分仅适用于 Cloud Composer 1。

Web 服务器不是使用与工作器和调度器相同的服务帐号运行。因此,工作器和调度器可能可以访问 Web 服务器无法访问的用户管理资源。

我们建议您避免在 DAG 解析期间访问非公开资源。在某些情况下,如果需要访问此类资源,您需要向 Web 服务器的服务帐号授予相应权限。该服务帐号名称来源于您的 Web 服务器网域。例如,如果网域为 example-tp.appspot.com,则服务帐号为 example-tp@appspot.gserviceaccount.com

DAG 错误

本部分仅适用于 Cloud Composer 1。

Web 服务器在 App Engine 上运行,并且独立于环境的 GKE 集群。Web 服务器会解析 DAG 定义文件;如果 DAG 中存在错误,则可能发生 502 gateway timeout。如果有问题的 DAG 没有破坏 GKE 中运行的任何进程,则 Airflow 会在没有正常运行的网络服务器的情况下正常运行。在这种情况下,您可以使用 gcloud composer environments run 从环境中检索详细信息,并在网络服务器不可用时作为临时解决方法。

在其他情况下,您可以在 GKE 中运行 DAG 解析,并查找抛出致命 Python 异常或发生超时(默认为 30 秒)的 DAG。 要进行问题排查,请连接到 Airflow 工作器容器中的远程 shell,并测试是否存在语法错误。 如需了解详情,请参阅测试 DAG

访问 Airflow Web 服务器时出现 504 错误

请参阅访问 Airflow 界面时出现错误 504

在任务执行期间或紧接其后抛出 Lost connection to MySQL server during query 异常

当满足以下条件时,通常会出现 Lost connection to MySQL / PostgreSQL server during query 异常:

  • DAG 使用 PythonOperator 或自定义运算符。
  • DAG 向 Airflow 数据库发出查询。

如果通过可调用的函数进行了多次查询,则回溯可能会错误地指向 Airflow 代码中的 self.refresh_from_db(lock_for_update=True) 行;它是任务执行后的第一个数据库查询。导致出现异常的实际原因发生在这之前,即 SQLAlchemy 会话未正确关闭时。

SQLAlchemy 会话限定在线程范围内,并且在稍后可在 Airflow 代码中继续的可调用函数会话中创建。如果一次会话中的查询之间存在严重延迟,则表示 MySQL 或 PostgreSQL 服务器可能已经关闭了连接。Cloud Composer 环境中的连接超时设置为大约 10 分钟。

修复:

  • 使用 airflow.utils.db.provide_session 修饰器。此修饰器在 session 参数中提供到 Airflow 数据库的有效会话,并在函数结束时正确关闭会话。
  • 请勿使用单个长时间运行的函数。请改为将所有数据库查询移动到单独的函数中,以使多个函数具有 airflow.utils.db.provide_session 修饰器。在这种情况下,会话会在检索查询结果后自动关闭。

控制同一 DAG 的 DAG、任务和并行执行时间

如果您要控制特定 DAG 的单次 DAG 执行时长,可以使用 dagrun_timeout DAG 参数来实现。例如,如果您预计单个 DAG 运行(无论执行是成功还是失败)不得超过 1 小时,则将此参数设置为 3600 秒。

您还可以控制单个 Airflow 任务的持续时间。为此,您可以使用 execution_timeout

如果要控制要用于特定 DAG 的活跃 DAG 数量,可以使用 [core]max-active-runs-per-dag Airflow 配置选项执行此操作。

如果您希望在给定时刻只运行一个 DAG 实例,请将 max-active-runs-per-dag 参数设置为 1

影响 DAG 和插件同步到调度程序、工作器和网络服务器的问题

Cloud Composer 会将 /dags/plugins 文件夹的内容同步到调度程序和工作器。/dags/plugins 文件夹中的某些对象可能会阻止此同步正常运行或至少减慢同步速度。

  • /dags”文件夹已同步到调度程序和工作器。该文件夹不会同步到 Cloud Composer 2 中的网络服务器,也不会在 Cloud Composer 1 中开启 DAG Serialization

  • /plugins 文件夹会同步到调度器、工作器和网络服务器。

您可能会遇到以下问题:

  • 您将使用压缩转码的 gzip 压缩文件上传到了 /dags/plugins 文件夹。通常,如果您使用 gsutil cp -Z 命令将数据上传到存储分区,则会出现这种情况。

    解决方案:删除使用了压缩转码的对象,并将其重新上传到存储分区。

  • 其中一个对象被命名为 ''。此类对象不会同步到调度程序和工作器,并且可能完全停止同步。

    解决方案:重命名有问题的对象。

  • /dags/plugins 文件夹中的其中一个对象的对象名称末尾包含 / 符号。这些对象可能会误导同步过程,因为 / 符号表示对象是文件夹,而不是文件。

    解决方案:从有问题的对象名称中移除 / 符号。

  • 请勿在 /dags/plugins 文件夹中存储不必要的文件。

    有时,您实现的 DAG 和插件会附带其他文件,例如存储这些组件的测试的文件。这些文件会同步到工作器和调度程序,并影响将这些文件复制到调度程序、工作器和网络服务器所需的时间。

    解决方案:不要在 /dags/plugins 文件夹中存储任何其他和不必要的文件。

由调度程序和工作器生成 Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' 错误

之所以出现此问题,是因为 Cloud Storage 中的对象具有重叠的命名空间,而调度程序和工作器同时使用传统文件系统。例如,可以将环境名称相同的文件夹和对象添加到环境的存储分区中。当存储分区同步到环境的调度程序和工作器时,会生成此错误,这可能会导致任务失败。

如需解决此问题,请确保环境的存储分区中没有重叠的命名空间。例如,如果 /dags/misc(一个文件)和 /dags/misc/example_file.txt(另一个文件)都在存储分区中,则调度程序会生成错误。

连接到 Airflow 元数据数据库时出现暂时性中断

Cloud Composer 基于分布式云基础架构运行。 这意味着,有时可能会出现一些暂时性问题,并且这些问题可能会中断 Airflow 任务的执行。

在这种情况下,您可能会在 Airflow 工作器日志中看到以下错误消息:

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

"Can't connect to MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

此类间歇性问题也可能是由为您的 Cloud Composer 环境执行的维护操作引起的。

通常,此类错误是间歇性的,如果您的 Airflow 任务遵循幂等原则,并且您已配置重试,则应对其加以豁免。您还可以考虑定义维护期

导致此类错误的另一个原因可能是您的环境中的集群缺少资源。在这种情况下,您可以按照调节环境优化环境说明中所述的方式对环境进行扩容或优化。

DAG 在 Airflow 界面或 DAGS 界面中不可见,且调度程序无法对其进行调度

DAG 处理器会先解析每个 DAG,然后调度程序才能调度该 DAG,并在 DAG 显示在 Airflow 界面或 DAGS 界面中。此外,[core]dag_file_processor_timeout 参数用于控制 DAG 处理器在解析单个 DAG 时所花费的时间。

如果您的 DAG 在 Airflow 界面或 DAGS 界面中不可见,则

  • 如果 DAG 处理器能够正确处理您的 DAG,请检查 DAG 处理器日志。如有任何问题,您可能会在 DAG 处理器或调度程序日志中看到以下日志条目
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for /usr/local/airflow/dags/example_dag.py with PID 21903 started at 2020-12-03T03:05:55.442709+00:00 has timed out, killing it.

此错误可能是由于以下某个原因造成的:

  • 您的 DAG 未正确实现,并且 DAG 处理器无法解析它。在这种情况下,请更正您的 DAG。

  • 解析 DAG 所需的时间超过 [core]dag_file_processor_timeout 秒。在这种情况下,您可以延长超时时间以满足自己的需求。注意:解析 DAG 所需的时间较长也意味着您的 DAG 没有以最佳方式实现(例如,您读取大量环境变量,执行 DAG 中的外部服务调用)。如果出现这种情况,请优化您的 DAG,以便 DAG 处理器可以快速对其进行处理。

Airflow 数据库负载过重的症状

有时,在 Airflow 工作器日志中,您可能会看到以下警告日志条目。

对于 MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

对于 PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

此类错误或警告可能表示 Airflow 数据库因必须处理的查询数量而不堪重负。

可能的解决方案:

后续步骤