DAG 问题排查

Cloud Composer 1 | Cloud Composer 2

本页面介绍了常见工作流问题的问题排查步骤和相关信息。

许多 DAG 执行问题都是由非最佳环境性能引起的。您可以按照优化环境性能和费用指南来优化 Cloud Composer 2 环境。

某些 DAG 执行问题可能是由于 Airflow 调度器无法正常运行或无法正常运行所致。请按照调度器问题排查说明解决这些问题。

排查工作流问题

要开始排查问题,请按如下所述操作:

  1. 检查 Airflow 日志

    您可以通过替换以下 Airflow 配置选项来提高 Airflow 的日志记录级别。

    Airflow 2

    logging logging_level 默认值为 INFO。 设置为 DEBUG 可提高日志消息的详细程度。

    Airflow 1

    core logging_level 默认值为 INFO。 设置为 DEBUG 可提高日志消息的详细程度。
  2. 检查 Monitoring 信息中心

  3. 查看 Cloud Monitoring

  4. 在 Google Cloud 控制台中,检查环境组件对应的页面上是否存在错误。

  5. Airflow 网页界面中,检查 DAG 的图表视图,了解失败的任务实例。

    webserver dag_orientation LRTBRLBT

调试运算符故障

要调试某个操作器的故障,请执行以下操作:

  1. 检查任务特定的错误
  2. 检查 Airflow 日志
  3. 查看 Cloud Monitoring
  4. 查看运营商专属日志。
  5. 修正错误。
  6. 将 DAG 上传dags/ 文件夹。
  7. 在 Airflow 网页界面中,清除该 DAG 的过往状态
  8. 恢复或运行该 DAG。

排查任务执行问题

Airflow 是一个分布式系统,其中包含许多实体,例如调度器、执行器和工作器,这些实体通过任务队列和 Airflow 数据库相互通信并发送信号(如 SIGTERM)。下图简要展示了 Airflow 组件之间的互连。

Airflow 组件之间的交互
图 1. Airflow 组件之间的交互(点击可放大)

在 Airflow 等分布式系统中,可能存在一些网络连接问题,或者底层基础架构可能会出现间歇性问题;这可能会导致任务失败并被重新安排执行,或者任务可能无法成功完成(例如,僵尸任务或执行卡住的任务)。Airflow 具有可处理此类情况的机制并自动恢复正常运行。以下部分介绍了在 Airflow 执行任务期间发生的常见问题:僵尸任务、毒药和 SIGTERM 信号。

对僵尸任务进行问题排查

Airflow 会检测任务与执行该任务的进程之间的两种不匹配情况:

  • “僵尸任务”是指本应运行但并未运行的任务。如果任务进程已终止或无响应,Airflow 工作器因过载而未及时报告任务状态,或执行任务的虚拟机已关闭,则可能会发生这种情况。Airflow 会定期查找此类任务,然后根据任务的设置失败或重试任务。

  • 取消任务是指不应运行的任务。Airflow 会定期查找此类任务并终止它们。

执行僵尸任务的最常见原因是环境集群中的 CPU 和内存资源不足。因此,Airflow 工作器可能无法报告任务状态,或者传感器可能会突然中断。在这种情况下,调度器会将给定任务标记为僵尸任务。为避免出现僵尸任务,请为您的环境分配更多资源。

如需详细了解如何伸缩 Cloud Composer 环境,请参阅伸缩环境指南。如果您遇到僵尸任务,一种可能的解决方案是增加僵尸任务的超时时间。因此,调度程序需要等待更长时间才会将任务视为僵尸。这样,Airflow 工作器就有更多的时间来报告任务的状态。

如需增加僵尸任务的超时时间,请替换 [scheduler]scheduler_zombie_task_threshold Airflow 配置选项的值:

Notes
scheduler scheduler_zombie_task_threshold 新的超时设置(以秒为单位) 默认值为 300

毒药问题排查

毒药丸是 Airflow 用于关停 Airflow 任务的一种机制。

Airflow 会在以下情况下使用毒药:

  • 当调度器终止未按时完成的任务时。
  • 当任务超时或执行时间过长时。

当 Airflow 使用毒药丸时,您可以在执行该任务的 Airflow 工作器日志中看到以下日志条目:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

可能的解决方案:

  • 检查任务代码是否存在可能导致任务运行时间过长的错误。
  • (Cloud Composer 2) 为 Airflow 工作器增加 CPU 和内存,可加快任务的执行速度。
  • 增加 [celery_broker_transport_options]visibility-timeout Airflow 配置选项的值。

    因此,调度程序会等待更长时间的任务完成,然后再将任务视为僵尸任务。对于耗时数小时的耗时任务,此选项特别有用。如果值过低(例如 3 小时),调度程序会将运行 5 或 6 小时的任务视为“挂起”(僵尸任务)。

  • 增加 [core]killed_task_cleanup_time Airflow 配置选项的值。

    值越长,Airflow 工作器就能有更多时间妥善完成任务。如果值过低,Airflow 任务可能会突然中断,没有足够的时间来妥善完成工作。

排查 SIGTERM 信号问题

SIGTERM 信号用于 Linux、Kubernetes、Airflow 调度器和 Celery 来终止负责运行 Airflow 工作器或 Airflow 任务的进程。

在环境中发送 SIGTERM 信号的原因可能有以下几种:

  • 一项任务变成了僵尸任务,必须停止。

  • 调度器发现了任务的重复项,并向任务发送毒药丸和 SIGTERM 信号以停止该任务。

  • Pod 横向自动扩缩中,GKE 控制平面会发送 SIGTERM 信号以移除不再需要的 Pod。

  • 调度器可以向 DagFileProcessorManager 进程发送 SIGTERM 信号。此类 SIGTERM 信号由调度程序用于管理 DagFileProcessorManager 进程生命周期,可以放心地忽略。

    示例:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • local_task_job 中的检测信号回调与退出回调之间的竞态条件,用于监控任务的执行情况。如果检测信号检测到某个任务被标记为成功,则无法区分任务本身是成功,还是 Airflow 已被告知将任务视为成功。不过,它会终止任务运行程序,而不会等待其退出。

    您可以放心地忽略此类 SIGTERM 信号。该任务已处于成功状态,整个 DAG 运行的执行不会受到影响。

    日志条目 Received SIGTERM. 是常规退出与处于成功状态的任务终止之间的唯一区别。

    检测信号和退出回调之间的竞态条件
    图 2.检测信号和退出回调之间的竞态条件(点击可放大)
  • Airflow 组件使用的资源(CPU、内存)超出了集群节点允许的数量。

  • GKE 服务执行维护操作,并向即将升级的节点上运行的 Pod 发送 SIGTERM 信号。使用 SIGTERM 终止任务实例时,您可以在执行该任务的 Airflow 工作器日志中看到以下日志条目:

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

可能的解决方案:

如果运行任务的虚拟机内存不足,就会出现此问题。这与 Airflow 配置无关,但与虚拟机可用内存量相关。

增加的内存取决于您使用的 Cloud Composer 版本。例如:

  • 在 Cloud Composer 2 中,您可以为 Airflow 工作器分配更多 CPU 和内存资源。

  • 对于 Cloud Composer 1,您可以使用性能更高的机器类型重新创建您的环境。

  • 在这两个版本的 Cloud Composer 中,您可以降低 [celery]worker_concurrency 并发 Airflow 配置选项的值。此选项决定了给定 Airflow 工作器并发执行的任务数量。

如需详细了解如何优化 Cloud Composer 2 环境,请参阅优化环境性能和费用

Cloud Logging 查询以发现 Pod 重启或逐出的原因

Cloud Composer 的环境使用 GKE 集群作为计算基础架构层。在本部分中,您将找到一些有用的查询,这些查询有助于找出 Airflow 工作器或 Airflow 调度器重启或逐出的原因。

可通过以下方式调整下列查询:

  • 您可在 Cloud Logging 中指定您感兴趣的时间轴;例如过去 6 小时或 3 天;您也可以自行指定时间范围

  • 您应指定 Cloud Composer 的 CLUSTER_NAME

  • 您还可以通过添加 POD_NAME

发现重启的容器

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

用于将结果限制到特定 Pod 的替代查询:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

发现因内存不足事件而关停的容器

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

用于将结果限制到特定 Pod 的替代查询:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

发现已停止执行的容器

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

用于将结果限制到特定 Pod 的替代查询:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

更新或升级操作对 Airflow 任务执行的影响

更新或升级操作会中断当前正在执行的 Airflow 任务,除非任务在可延迟模式下执行。

我们建议您预计对 Airflow 任务执行的影响最小时执行这些操作,并在 DAG 和任务中设置适当的重试机制。

常见问题

以下部分介绍了一些常见 DAG 问题的症状和可行修复措施。

Airflow 任务被“Negsignal.SIGKILL”中断

有时,您的任务使用的内存可能超过为 Airflow 工作器分配的内存。在这种情况下,它可能会被 Negsignal.SIGKILL 中断。系统会发送此信号以避免进一步的内存消耗,因为内存消耗可能会影响其他 Airflow 任务的执行。在 Airflow 工作器日志中,您可能会看到以下日志条目:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

可能的解决方案:

  • 降低了 Airflow 工作器的 worker_concurrency

  • 对于 Cloud Composer 2,请增加 Airflow 工作器的内存

  • 如果您使用的是 Cloud Composer 1,请升级到 Composer 集群中使用的容量更大的机器类型

  • 优化任务以减少内存用量

由于 DAG 解析错误,任务失败且未发出日志

有时,可能存在细微的 DAG 错误,从而导致 Airflow 调度器和 DAG 处理器分别安排执行任务和解析 DAG 文件,但 Airflow 工作器无法从此类 DAG 执行任务,因为 Python DAG 文件中存在编程错误。这可能会导致 Airflow 任务被标记为 Failed 且该任务执行时没有任何日志。

解决方案:

  • 在 Airflow 工作器日志中验证 Airflow 工作器是否未引发与 DAG 缺失或 DAG 解析错误相关的错误。

  • 增加与 DAG 解析相关的参数:

  • 另请参阅检查 DAG 处理器日志

由于资源压力,任务失败,且不发出日志

症状:在任务执行期间,负责执行 Airflow 任务的 Airflow 工作器子进程突然中断。Airflow 工作器日志中显示的错误可能类似于以下内容:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

解决方案:

由于 Pod 逐出,任务失败且未发出日志

Google Kubernetes Engine Pod 受 Kubernetes Pod 生命周期和 Pod 逐出的约束。任务激增和工作器协同调度是 Cloud Composer 中 Pod 逐出的两个最常见的原因。

当特定 pod 过度使用某个节点的资源(相对于该节点配置的资源消耗预期)时,可能会发生 pod 逐出。例如,当一个 Pod 中运行多个内存占用量较大的任务时,可能会发生逐出,这些任务的合并负载会导致运行此 Pod 的节点超出内存消耗限制。

如果 Airflow 工作器 pod 被逐出,则该 pod 上运行的所有任务实例都会中断,之后被 Airflow 标记为失败。

日志被缓冲。如果工作器 pod 在缓冲区刷新之前被逐出,则不会发出日志。如果任务失败且没有日志,则表示 Airflow 工作器已因内存不足 (OOM) 重启。即使 Airflow 日志未发出,Cloud Logging 中也可能存在一些日志。

要查看日志,请执行以下操作:

  1. 在 Google Cloud 控制台中,转到环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 前往日志标签页。

  4. 所有日志 -> Airflow 日志 -> 工作器 -> (单个工作器)下查看各个工作器的日志。

DAG 的执行受内存限制。每个任务的执行都以两个 Airflow 进程开始,即任务执行进程与任务监控进程。每个节点最多可以执行 6 项并发任务(也就是说,Airflow 模块大约需要加载 12 个进程)。任务的执行可能会使用更多内存,具体取决于 DAG 的性质。

具体情况:

  1. 在 Google Cloud 控制台中,前往工作负载页面。

    进入“工作负载”

  2. 如果存在显示 Evictedairflow-worker pod,请点击每个被逐出的 pod 并查看窗口顶部的 The node was low on resource: memory 消息。

修复:

  • 在 Cloud Composer 1 中,创建一个新的 Cloud Composer 环境,其机器类型大于当前机器类型。
  • 在 Cloud Composer 2 中,提高 Airflow 工作器的内存限制
  • 检查 airflow-worker pod 中的日志,了解可能的逐出原因。如需详细了解如何从各个 pod 提取日志,请参阅排查已部署的工作负载的问题
  • 请确保 DAG 中的任务具有幂等性且可重试。
  • 避免将不必要的文件下载到 Airflow 工作器的本地文件系统。

    Airflow 工作器的本地文件系统容量有限。例如,在 Cloud Composer 2 中,一个工作器可以具有 1 GB 到 10 GB 的存储空间。当存储空间用尽时,GKE 控制平面会逐出 Airflow 工作器 Pod。这会使逐出的工作器正在执行的所有任务失败。

    存在问题的操作示例:

    • 下载文件或对象,并将其本地存储在 Airflow 工作器中。您可以改为将这些对象直接存储在合适的服务(例如 Cloud Storage 存储桶)中。
    • 通过 Airflow 工作器访问 /data 文件夹中的大型对象。Airflow 工作器将对象下载到其本地文件系统中。您可以改为实现 DAG,以便在 Airflow 工作器 Pod 之外处理大文件。

DAG 加载导入超时

具体情况:

  • 在 Airflow 网页界面中,DAG 列表页面顶部的红色提醒框会显示 Broken DAG: [/path/to/dagfile] Timeout
  • 在 Cloud Monitoring 中:airflow-scheduler 日志包含类似于以下内容的条目:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

修复:

替换 dag_file_processor_timeout Airflow 配置选项,并留出更多时间进行 DAG 解析:

core dag_file_processor_timeout 新超时值

DAG 执行未在预期时间内结束

具体情况:

有时,DAG 运行不会结束,因为 Airflow 任务卡住了,并且 DAG 运行的时长超出了预期。在正常条件下,Airflow 任务不会无限期地保持在队列或正在运行状态,因为 Airflow 提供了超时和清理过程来帮助避免这种情况。

修复:

  • 针对 DAG 使用 dagrun_timeout 参数。例如:dagrun_timeout=timedelta(minutes=120)。因此,每个 DAG 运行都必须在 DAG 运行超时时间内完成,并且未完成的任务会被标记为 FailedUpstream Failed。如需详细了解 Airflow 任务状态,请参阅 Apache Airflow 文档

  • 您可以使用任务执行超时参数为基于 Apache Airflow 运算符运行的任务定义默认超时时间。

未执行 DAG 运行

具体情况:

如果动态设置 DAG 的时间表日期,可能会导致各种意外的副作用。例如:

  • DAG 执行始终在将来,该 DAG 永远不会执行。

  • 过往的 DAG 运行会被标记为已执行且成功,即使并未执行也是如此。

如需了解详情,请参阅 Apache Airflow 文档

修复:

  • 按照 Apache Airflow 文档中的建议操作。

  • 为 DAG 设置静态 start_date。您可以选择使用 catchup=False 停止为过去的日期运行 DAG。

  • 除非您了解此方法的副作用,否则请避免使用 datetime.now()days_ago(<number of days>)

提升进出 Airflow 数据库的网络流量

您的环境的 GKE 集群和 Airflow 数据库之间的流量网络量取决于 DAG 数量、DAG 中的任务数量以及 DAG 访问 Airflow 数据库中的数据的方式。以下因素可能会影响网络用量:

  • 对 Airflow 数据库的查询。如果您的 DAG 执行了大量查询,则会生成大量流量。示例:在继续其他任务、查询 XCom 表、转储 Airflow 数据库内容之前检查任务的状态。

  • 任务数量过多。要安排的任务越多,产生的网络流量就越多。此考虑因素适用于 DAG 中的任务总数和调度频率。在 Airflow 调度器安排 DAG 运行时,它会向 Airflow 数据库发出查询并产生流量。

  • Airflow 网页界面会向 Airflow 数据库发出查询,因此会产生网络流量。大量使用包含图表、任务和图示的页面可能会产生大量网络流量。

DAG 导致 Airflow Web 服务器崩溃或导致其返回 502 gateway timeout 错误

有多种不同原因可能会导致 Web 服务器故障。查看 Cloud Logging 中的 airflow-webserver 日志,确定导致 502 gateway timeout 错误的原因。

重量级计算负载

本部分仅适用于 Cloud Composer 1。

避免在 DAG 解析时运行重量级计算。

与工作器和调度器节点(它们的机器类型可进行自定义以具有更高的 CPU 和内存容量)不同,Web 服务器使用固定的机器类型,如果解析时计算负载过重,可能会导致 DAG 解析失败。

请注意,Web 服务器具有 2 个 vCPU 和 2 GB 内存core-dagbag_import_timeout 的默认值为 30 秒。此超时值决定了 Airflow 在将 Python 模块载入 dags/ 文件夹时所花费的时长上限。

权限不正确

本部分仅适用于 Cloud Composer 1。

Web 服务器不是使用与工作器和调度器相同的服务账号运行。因此,工作器和调度器可能可以访问 Web 服务器无法访问的用户管理资源。

我们建议您避免在 DAG 解析期间访问非公开资源。在某些情况下,如果需要访问此类资源,您需要向 Web 服务器的服务账号授予相应权限。该服务账号名称来源于您的 Web 服务器网域。例如,如果网域为 example-tp.appspot.com,则服务帐号为 example-tp@appspot.gserviceaccount.com

DAG 错误

本部分仅适用于 Cloud Composer 1。

Web 服务器在 App Engine 上运行,并且独立于环境的 GKE 集群。Web 服务器会解析 DAG 定义文件;如果 DAG 中存在错误,则可能发生 502 gateway timeout。如果有问题的 DAG 没有破坏 GKE 中运行的任何进程,则 Airflow 可以在没有正常运行的 Web 服务器的情况下正常运行。在这种情况下,您可以使用 gcloud composer environments run 从您的环境中检索详细信息,并在 Web 服务器不可用时使用它作为权宜解决方法。

在其他情况下,您可以在 GKE 中运行 DAG 解析,并查找抛出致命 Python 异常或发生超时(默认为 30 秒)的 DAG。 要进行问题排查,请连接到 Airflow 工作器容器中的远程 shell,并测试是否存在语法错误。 如需了解详情,请参阅测试 DAG

处理 DAG 和插件文件夹中的大量 DAG 和插件

/dags/plugins 文件夹的内容会从环境的存储桶同步到 Airflow 工作器和调度器的本地文件系统。

这些文件夹中存储的数据越多,执行同步所需的时间就越长。若要解决此类情况,请执行以下操作:

  • 限制 /dags/plugins 文件夹中的文件数量。请仅存储尽可能少的必需文件。

  • 如果可能,请增加 Airflow 调度器和工作器可用的磁盘空间。

  • 如果可能,请增加 Airflow 调度器和工作器的 CPU 和内存,以便更快地执行同步操作。

  • 如果 DAG 数量非常多,请将 DAG 分为几批,将其压缩为 zip 归档文件,然后将这些归档文件部署到 /dags 文件夹中。此方法可加快 DAG 同步过程。Airflow 组件会在处理 DAG 之前解压缩 zip 归档文件。

  • 通过程序化生成 DAG 也可以限制存储在 /dags 文件夹中的 DAG 文件数量。请参阅有关程序化 DAG 的部分,以避免调度和执行以编程方式生成的 DAG 时遇到问题。

请勿安排以编程方式同时生成的 DAG

您可以通过从 DAG 文件以编程方式生成 DAG 对象,这种方法可以高效地编写许多只有细微差别的类似 DAG。

切记不要立即安排所有此类 DAG 执行。Airflow 工作器可能没有足够的 CPU 和内存资源来执行同时安排的所有任务。

为避免安排程序化 DAG 时出现问题,请执行以下操作:

  • 提高工作器并发能力并扩大环境规模,以便它可以同时执行更多任务。
  • 生成 DAG 的方式使其时间表随时间均匀分布,避免同时安排数百项任务,让 Airflow 工作器有时间执行所有计划任务。

访问 Airflow Web 服务器时出错 504

请参阅访问 Airflow 界面时出现错误 504

在任务执行期间或紧接其后抛出 Lost connection to Postgres / MySQL server during query 异常

在满足以下条件时,通常会发生 Lost connection to Postgres / MySQL server during query 异常:

  • DAG 使用 PythonOperator 或自定义运算符。
  • DAG 向 Airflow 数据库发出查询。

如果通过可调用的函数进行了多次查询,则回溯可能会错误地指向 Airflow 代码中的 self.refresh_from_db(lock_for_update=True) 行;它是任务执行后的第一个数据库查询。导致出现异常的实际原因发生在这之前,即 SQLAlchemy 会话未正确关闭时。

SQLAlchemy 会话限定在线程范围内,并且在稍后可在 Airflow 代码中继续的可调用函数会话中创建。如果某个会话中的查询之间存在明显的延迟,则连接可能已被 Postgres 或 MySQL 服务器关闭。Cloud Composer 环境中的连接超时设置为大约 10 分钟。

修复:

  • 使用 airflow.utils.db.provide_session 修饰器。此修饰器在 session 参数中提供到 Airflow 数据库的有效会话,并在函数结束时正确关闭会话。
  • 请勿使用单个长时间运行的函数。请改为将所有数据库查询移动到单独的函数中,以使多个函数具有 airflow.utils.db.provide_session 修饰器。在这种情况下,会话会在检索查询结果后自动关闭。

控制多个 DAG、任务的执行时间以及同一 DAG 的并行执行

如果要控制特定 DAG 的单个 DAG 执行的时长,可以使用 dagrun_timeout DAG 参数来实现此目的。例如,如果您预计单次 DAG 运行(无论执行是否成功)不得超过 1 小时,则应将此参数设置为 3600 秒。

您还可以控制允许单个 Airflow 任务的持续时长。为此,您可以使用 execution_timeout

如果您想控制特定 DAG 的活跃 DAG 运行数量,可以使用 [core]max-active-runs-per-dag Airflow 配置选项来实现。

如果您希望在特定时刻仅运行一个 DAG 实例,请将 max-active-runs-per-dag 参数设置为 1

影响 DAG 和插件同步到调度器、工作器和 Web 服务器的问题

Cloud Composer 会将 /dags/plugins 文件夹的内容同步到调度器和工作器。/dags/plugins 文件夹中的某些对象可能会阻止此同步正常运行,或至少会减慢其速度。

  • /dags”文件夹已同步到调度器和工作器。在 Cloud Composer 2 中,此文件夹不会同步到 Web 服务器;如果您在 Cloud Composer 1 中启用了 DAG Serialization,则此文件夹不会同步到 Web 服务器。

  • /plugins”文件夹已同步到调度器、工作器和 Web 服务器。

您可能会遇到以下问题:

  • 您已将使用压缩转码的 gzip 压缩文件上传至 /dags/plugins 文件夹。通常,如果您使用 gsutil cp -Z 命令将数据上传到存储桶,就会发生这种情况。

    解决方案:删除使用压缩转码的对象,然后将其重新上传到存储桶。

  • 其中一个对象名为“.”,此类对象未同步到调度器和工作器,可能会完全停止同步。

    解决方案:重命名有问题的对象。

  • 文件夹和 DAG Python 文件具有相同的名称,例如 a.py。在这种情况下,DAG 文件未正确同步到 Airflow 组件。

    解决方案:移除与 DAG Python 文件同名的文件夹。

  • /dags/plugins 文件夹中的某个对象的名称末尾包含一个 / 符号。此类对象可能会误导同步过程,因为 / 符号表示对象是一个文件夹而不是文件。

    解决方案:从有问题的对象名称中移除 / 符号。

  • 请勿将不需要的文件存储在 /dags/plugins 文件夹中。

    有时,您实现的 DAG 和插件附带其他文件,例如用于存储这些组件测试的文件。这些文件会同步到工作器和调度器,并影响将这些文件复制到调度器、工作器和 Web 服务器所需的时间。

    解决方案:不要将任何额外和不必要的文件存储在 /dags/plugins 文件夹中。

调度器和工作器生成了 Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' 个错误

之所以出现此问题,是因为对象在 Cloud Storage 中可以具有重叠的命名空间,而调度器和工作器同时使用传统文件系统。例如,您可以将文件夹和同名对象同时添加到环境的存储桶中。当存储桶同步到环境的调度器和工作器时,就会生成此错误,这可能会导致任务失败。

如需解决此问题,请确保环境的存储桶中没有重叠的命名空间。例如,如果 /dags/misc(一个文件)和 /dags/misc/example_file.txt(另一个文件)都在某个存储桶中,则调度器会生成一个错误。

连接到 Airflow 元数据数据库时出现暂时性中断

Cloud Composer 在分布式云基础架构上运行。 这意味着不时会出现一些暂时性问题,它们可能会中断 Airflow 任务的执行。

在这种情况下,您可能会在 Airflow 工作器日志中看到以下错误消息:

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

对您的 Cloud Composer 环境执行维护操作也可能会导致这种间歇性问题。

此类错误通常是间歇性的,如果您的 Airflow 任务具有幂等性,并且您配置了重试,则应该不受此类错误的影响。您还可以考虑定义维护期

导致此类错误的另一个原因可能是环境的集群中缺少资源。在这种情况下,您可以按照扩缩环境优化环境中的说明来纵向扩容或优化环境。

DAG 运行被标记为成功,但未执行任何任务

如果 DAG 运行 execution_date 早于 DAG 的 start_date,则您可能会看到没有运行任何任务但仍被标记为成功的 DAG 运行。

未执行任务的成功 DAG 运行
图 3.未执行任务的成功 DAG 运行(点击可放大)

原因

在下列任一情况下,都可能会出现这种情况:

  • 不匹配是由 DAG 的 execution_datestart_date 之间的时区差异引起的。例如,在使用 pendulum.parse(...) 设置 start_date 时,可能会发生这种情况。

  • DAG 的 start_date 设置为动态值,例如 airflow.utils.dates.days_ago(1)

解决方案

  • 请确保 execution_datestart_date 使用的是同一时区。

  • 指定静态 start_date 并将其与 catchup=False 结合使用,以避免运行开始日期已过的 DAG。

某个 DAG 在 Airflow 界面或 DAG 界面中不可见,并且调度器未对其进行调度

DAG 处理器会在调度器可以调度每个 DAG 之前以及 DAG 显示在 Airflow 界面DAG 界面之前对其进行解析。

以下 Airflow 配置选项定义了解析 DAG 的超时时间:

如果 DAG 未显示在 Airflow 界面或 DAG 界面中:

  • 如果 DAG 处理器能够正确处理您的 DAG,请检查 DAG 处理器日志。如果出现问题,您可能会在 DAG 处理器或调度器日志中看到以下日志条目:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • 检查调度器日志,查看调度器是否正常运行。如果出现问题,您可能会在调度器日志中看到以下日志条目:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

解决方案

  • 修复了所有 DAG 解析错误。DAG 处理器会解析多个 DAG;在极少数情况下,解析某个 DAG 时出错可能会对其他 DAG 解析产生负面影响。

  • 如果解析 DAG 花费的时间超过 [core]dagrun_import_timeout 中定义的秒数,请增加此超时时间。

  • 如果解析所有 DAG 花费的时间超过 [core]dag_file_processor_timeout 中定义的秒数,请增加此超时时间。

  • 如果您的 DAG 解析需要很长时间,这也可能意味着它未以最佳方式实现。例如,如果它读取多个环境变量,或执行对外部服务或 Airflow 数据库的调用。请尽可能避免在 DAG 的全局部分中执行此类操作。

  • 为调度器增加 CPU 和内存资源,以提高其运行速度。

  • 调整调度器的数量

  • 增加 DAG 处理器进程数量,以便更快地完成解析。您可以通过增加 [scheduler]parsing_process 的值来实现此目的。

  • 降低 DAG 解析频率

  • 降低 Airflow 数据库的负载

Airflow 数据库负载过重的症状

如需了解详情,请参阅 Airflow 数据库负载压力的症状

后续步骤