DAG 问题排查

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面提供常见工作流的问题排查步骤和信息 问题。

许多 DAG 执行问题是由非最佳环境性能导致的。 您可以按照优化 环境性能和费用指南。

某些 DAG 执行问题可能是由 Airflow 调度器引起的 无法正常工作或无法达到最佳效果。请遵循 调度器问题排查说明 来解决这些问题。

排查工作流问题

要开始排查问题,请按如下所述操作:

  1. 检查 Airflow 日志

    您可以通过替换

    Airflow 2

    logging logging_level 默认值为 INFO。 设置为 DEBUG 可获取更详尽的日志消息。

    Airflow 1

    core logging_level 默认值为 INFO。 设置为 DEBUG 可获取更详尽的日志消息。
  2. 检查 Monitoring 信息中心

  3. 查看 Cloud Monitoring

  4. 在 Google Cloud 控制台中,检查 环境的组件

  5. Airflow 网页界面中,执行以下操作: 请在 DAG 的 Graph View 中检查 失败的任务实例。

    webserver dag_orientation LRTBRLBT

调试运算符故障

要调试某个操作器的故障,请执行以下操作:

  1. 检查任务特定的错误
  2. 检查 Airflow 日志
  3. 查看 Cloud Monitoring
  4. 查看特定于运营商的日志。
  5. 修正错误。
  6. 将该 DAG 上传dags/ 文件夹。
  7. 在 Airflow 网页界面中 清除过去的状态 DAG
  8. 恢复或运行该 DAG。

对任务执行进行问题排查

Airflow 是一个分布式系统,包含许多实体,例如调度器、执行器、 通过任务队列和 Airflow 相互通信的 worker 数据库并发送信号(如 SIGTERM)。下图显示了 Airflow 组件之间的互连概览。

Airflow 组件之间的交互
图 1. Airflow 组件之间的交互(点击可放大)

在 Airflow 等分布式系统中 或底层基础架构可能会出现间歇性问题; 这可能会导致任务失败并被重新安排 或任务可能无法成功完成(例如,僵尸 任务或卡在执行中的任务)。Airflow 具有 并自动恢复正常运行。已关注 以下部分介绍了 Airflow 在执行任务期间出现的常见问题: 僵尸任务、毒药和 SIGTERM 信号。

排查僵尸任务

Airflow 会检测任务与执行的进程之间的两种不匹配情况 任务:

  • 僵尸任务是指应该运行但并未运行的任务 。如果任务的进程已终止或不 响应(如果 Airflow 工作器未及时报告任务状态) 因为它过载,或者执行任务的虚拟机已关闭。 Airflow 会定期查找此类任务,并且失败或重试该任务。 具体取决于任务的设置。

    探索僵尸任务

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-scheduler")
    textPayload:"Detected zombie job"
  • 不死任务是指不应运行的任务。Airflow 发现 并终止这些任务。

下面列出了执行僵尸任务的最常见原因和解决方案。

Airflow 工作器内存不足

每个 Airflow 工作器最多可运行 [celery]worker_concurrency 个任务实例 。如果这些任务实例的累计内存消耗量 超出 Airflow 工作器的内存限制,其上会随机生成一个进程 以释放资源

发现 Airflow 工作器内存不足事件

resource.type="k8s_node"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
log_id("events")
jsonPayload.message:"Killed process"
jsonPayload.message:("airflow task" OR "celeryd")

解决方案

Airflow 工作器被逐出

Pod 逐出是在 Kubernetes 上运行工作负载的正常部分。如果 Pod 已用完存储空间或释放,GKE 会逐出 Pod 为优先级更高的工作负载分配资源

发现 Airflow 工作器逐出

resource.type="k8s_pod"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
resource.labels.pod_name:"airflow-worker"
log_id("events")
jsonPayload.reason="Evicted"

解决方案

Airflow 工作器已终止

Airflow 工作器可能会在外部移除。如果当前正在运行的任务 这些代码在正常终止期内完成,那么这些代码将会被中断,并且 最终会被检测为僵尸。

发现 Airflow 工作器 Pod 终止

resource.type="k8s_cluster"
resource.labels.cluster_name="GKE_CLUSTER_NAME"
protoPayload.methodName:"pods.delete"
protoPayload.response.metadata.name:"airflow-worker"

可能的场景和解决方案

  • Airflow 工作器会在环境修改期间重启,例如: 升级或软件包安装:

    探索 Composer 环境修改

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("cloudaudit.googleapis.com%2Factivity")

    您可以在没有运行任何关键任务时执行此类操作,也可以启用 任务重试。

  • 各种组件在维护期间可能暂时不可用 操作:

    发现 GKE 维护操作

    resource.type="gke_nodepool"
    resource.labels.cluster_name="GKE_CLUSTER_NAME"
    protoPayload.metadata.operationType="UPGRADE_NODES"

    您可以指定维护窗口,以尽可能缩短 与关键任务执行重叠。

  • 在早于 2.4.5 的 Cloud Composer 2 版本中,终止 Airflow 工作器可能会忽略 SIGTERM 信号并继续执行任务:

    了解如何通过 Composer 自动伸缩功能进行缩减

    resource.type="cloud_composer_environment"
    resource.labels.environment_name="ENVIRONMENT_NAME"
    log_id("airflow-worker-set")
    textPayload:"Workers deleted"

    您可以升级到更高版本的 Cloud Composer 问题已得到解决。

Airflow 工作器处于高负载状态

Airflow 工作器可用的 CPU 和内存资源量是有限的 由环境配置决定如果利用率接近上限 这会导致在任务执行过程中出现资源争用和不必要的延迟 执行。在极端情况下,即在较长时间内缺少资源 就可能导致执行僵尸任务

解决方案

Airflow 数据库负载过重

各种 Airflow 组件使用数据库来相互通信, 用于存储任务实例的检测信号。服务器上的资源短缺 数据库会导致查询时间变长,并且可能会影响任务执行。

解决方案

Airflow 数据库暂时不可用

Airflow 工作器可能需要一些时间来检测并妥善处理间歇性中断 例如临时连接问题可能会超出默认值 僵尸检测阈值。

发现 Airflow 检测信号超时

resource.type="cloud_composer_environment"
resource.labels.environment_name="ENVIRONMENT_NAME"
log_id("airflow-worker")
textPayload:"Heartbeat time limit exceeded"

解决方案

毒药问题排查

毒药丸是 Airflow 用于关闭 Airflow 任务的一种机制。

Airflow 在以下情况下使用“毒药丸”:

  • 调度器终止未按时完成的任务。
  • 任务超时或执行时间过长。

Airflow 使用毒药丸时,可以在执行该任务的 Airflow 工作器日志中看到以下日志条目:

  INFO - Subtask ... WARNING - State of this instance has been externally set
  to success. Taking the poison pill.
  INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
  INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.

可能的解决方案:

  • 检查任务代码是否存在可能导致任务运行时间过长的错误。
  • (Cloud Composer 2) 为 Airflow 增加 CPU 和内存 提高任务执行速度
  • 提高 [celery_broker_transport_options]visibility-timeout Airflow 配置 选项。

    因此,调度程序需要更长时间等待任务完成, 然后才能将任务视为僵尸任务。这一选项 适用于需要执行数小时的耗时任务。如果 值过低(例如,3 小时),则调度程序会认为 作为“挂起”运行 5 或 6 小时的任务(僵尸任务)。

  • 提高 [core]killed_task_cleanup_time Airflow 的值 配置选项。

    值越大, Airflow 工作器完成其任务的时间就越长 优雅地。如果该值过低,Airflow 任务可能会中断 突然停止,没有足够的时间优雅地完成工作。

对 SIGTERM 信号进行问题排查

SIGTERM 信号:由 Linux 使用, Kubernetes、Airflow 调度器和 Celery 来终止负责 运行 Airflow 工作器或 Airflow 任务。

在环境中发送 SIGTERM 信号可能有以下几种原因:

  • 一项任务变成了僵尸任务,必须被停止。

  • 调度器发现有一项任务重复,并向对方发送“毒药丸”并 SIGTERM 向任务发出信号以停止任务。

  • 横向 Pod 自动扩缩中,GKE 控制平面发送 SIGTERM 信号以移除不再存在的 Pod 所需的资源。

  • 调度程序可以向 DagFileProcessorManager 进程发送 SIGTERM 信号。 调度程序使用此类 SIGTERM 信号来管理 DagFileProcessorManager 进程生命周期,可以放心地忽略它。

    示例:

    Launched DagFileProcessorManager with pid: 353002
    Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: []
    Sending the signal Signals.SIGTERM to group 353002
    Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
    
  • 检测信号回调和退出回调之间的竞态条件, local_task_job:用于监控任务的执行情况。如果检测信号 但其无法辨别是否将任务标记为成功 任务本身成功完成,或者 Airflow 被告知考虑该任务 成功。尽管如此,它仍会终止任务运行程序,而不会等待 退出。

    您可以放心地忽略此类 SIGTERM 信号。该任务已在 整个 DAG 运行的执行将不会 。

    日志条目 Received SIGTERM. 是常规 退出和终止任务的成功状态。

    <ph type="x-smartling-placeholder">
    </ph> 检测信号与退出回调之间的竞态条件
    图 2.检测信号与退出回调之间的竞态条件(点击可放大)
  • Airflow 组件使用的资源(CPU、内存)超出了 集群节点。

  • GKE 服务负责执行维护操作 向要升级的节点上运行的 Pod 发送 SIGTERM 信号。 当任务实例通过 SIGTERM 终止时,您可以查看以下日志 执行任务的 Airflow 工作器日志中的条目:

{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception

可能的解决方案:

当运行任务的虚拟机内存不足时,就会出现此问题。这不是 与 Airflow 配置相关 虚拟机。

增加内存取决于 Cloud Composer 版本 资源。例如:

  • 在 Cloud Composer 2 中,您可以为 Airflow 分配更多 CPU 和内存资源 worker。

  • 对于 Cloud Composer 1,您可以使用 性能更高的机器类型

  • 在这两个版本的 Cloud Composer 中,您可以降低 [celery]worker_concurrency 并发 Airflow 配置选项。 此选项决定了 Airflow 工作器。

如需详细了解如何优化 Cloud Composer 2 环境,请参阅 优化环境性能和费用

用于发现 Pod 重启或逐出原因的 Cloud Logging 查询

Cloud Composer 的环境使用 GKE 集群作为计算基础架构 层。在本部分中,您可以找到一些实用的查询, 找出 Airflow 工作器或 Airflow 调度器重启或逐出的原因。

可以通过以下方式调整下面显示的查询:

  • 您可以在 Cloud Logging 中指定感兴趣的时间轴; 例如过去 6 小时、过去 3 天

  • 您应指定 Cloud Composer 的 CLUSTER_NAME

  • 您还可以将搜索范围限制为特定 Pod,方法是添加 POD_NAME

发现重启的容器

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
  

将结果限制在特定 Pod 的替代查询:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"will be restarted"
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
  

发现容器因内存不足事件而关停

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    

将结果限制在特定 Pod 的替代查询:

    resource.type="k8s_node"
    log_id("events")
    (jsonPayload.reason:("OOMKilling" OR "SystemOOM")
      OR jsonPayload.message:("OOM encountered" OR "out of memory"))
    severity=WARNING
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

发现已停止执行的容器

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    

将结果限制在特定 Pod 的替代查询:

    resource.type="k8s_node"
    log_id("kubelet")
    jsonPayload.MESSAGE:"ContainerDied"
    severity=DEFAULT
    resource.labels.cluster_name="CLUSTER_NAME"
    "POD_NAME"
    

更新或升级操作对 Airflow 任务执行的影响

更新或升级操作会中断当前正在执行的 Airflow 任务, 除非有任务在“可延期模式”下执行

我们建议在预计影响极小的情况下执行这些操作 Airflow 任务执行,并在您的 DAG 和任务。

排查 KubernetesExecutor 任务问题

CeleryKubernetesExecutor 是 Cloud Composer 3 中的一种执行器 可同时使用 CeleryExecutor 和 KubernetesExecutor 。

如需了解详情,请参阅使用 CeleryKubernetesExecutor 页面 有关对使用 KubernetesExecutor 执行的任务进行问题排查的信息。

常见问题

以下部分介绍了一些常见 DAG 问题的症状和可行修复措施。

Airflow 任务被“Negsignal.SIGKILL”中断

有时,您的任务使用的内存可能比分配 Airflow 工作器的内存多。 在这种情况下,它可能会被 Negsignal.SIGKILL 中断。系统 并避免发送此信号 其他 Airflow 任务的执行在 Airflow 工作器日志中,您可能会看到 以下日志条目:

{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL

Negsignal.SIGKILL 也可能会显示为代码 -9

可能的解决方案:

  • Airflow 工作器的较低 worker_concurrency

  • 如果您使用的是 Cloud Composer 2,请增加 Airflow 工作器的内存。

  • 如果您使用的是 Cloud Composer 1,请升级到 Cloud Composer 集群。

  • 优化任务以减少使用的内存。

  • 使用 Cloud Composer 管理 Cloud Composer 中的资源密集型任务 KubernetesPodOperatorGKEStartPodOperator 任务隔离和自定义资源分配。

由于 DAG 解析错误,任务失败但未发出日志

有时可能会存在细微的 DAG 错误,从而导致出现 Airflow 调度器和 DAG 处理器能够 和分别解析 DAG 文件,但 Airflow 工作器无法执行任务 因为 python DAG 文件中存在编程错误。这可能会 会导致 Airflow 任务被标记为 Failed 的情况 并且没有任何关于其执行的日志。

解决方案:

  • 在 Airflow 工作器日志中验证以下情况 与缺少 DAG 或 DAG 解析错误相关的 Airflow 工作器。

  • 增加与 DAG 解析相关的参数:

  • 另请参阅 检查 DAG 处理器日志

任务因资源压力而失败,但未发出日志

症状:在执行任务期间,Airflow 工作器的子进程负责 Airflow 任务执行突然中断。 Airflow 工作器日志中显示的错误可能与以下错误类似:

...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task    R = retval = fun(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__    return self.run(*args, **kwargs)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command    _execute_in_fork(command_to_exec)  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...

解决方案:

任务因 Pod 逐出而失败,但未发出日志

Google Kubernetes Engine Pod Kubernetes Pod 生命周期和 Pod 逐出。任务 工作器高峰和协同调度是导致 Pod 逐出的两个最常见的原因 Cloud Composer。

当特定 pod 过度使用某个节点的资源(相对于该节点配置的资源消耗预期)时,可能会发生 pod 逐出。对于 例如,在一个 Pod 中运行多个内存占用量大的任务时,可能会发生逐出, 它们的总负载会导致运行此 Pod 的节点超出 内存使用限制

如果 Airflow 工作器 pod 被逐出,则该 pod 上运行的所有任务实例都会中断,之后被 Airflow 标记为失败。

日志被缓冲。如果工作器 pod 在缓冲区刷新之前被逐出,则不会发出日志。如果任务失败且没有日志,则表示 Airflow 工作器已因内存不足 (OOM) 重启。可能存在一些日志 (即使未发出 Airflow 日志)。

要查看日志,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 前往日志标签页。

  4. 所有日志 -> 下查看各个工作器的日志 Airflow 日志 ->工作器 ->(单个工作器)

DAG 执行存在内存限制。每个任务的执行都以两个 Airflow 进程开始,即任务执行进程与任务监控进程。每个节点最多可以执行 6 项并发任务(也就是说,Airflow 模块大约需要加载 12 个进程)。任务的执行可能会使用更多内存,具体取决于 DAG 的性质。

具体情况:

  1. 在 Google Cloud 控制台中,前往工作负载页面。

    进入“工作负载”

  2. 如果存在显示 Evictedairflow-worker pod,请点击每个被逐出的 pod 并查看窗口顶部的 The node was low on resource: memory 消息。

修复:

  • 在 Cloud Composer 1 中,使用以下命令创建一个新的 Cloud Composer 环境 比当前机器更大的机器类型 类型。
  • 在 Cloud Composer 2 中,提高内存限制 适用于 Airflow 工作器。
  • 检查 airflow-worker Pod 中的日志,了解可能的驱逐原因。有关 如需了解如何从各个 Pod 提取日志,请参阅 排查已部署的工作负载的问题
  • 确保 DAG 中的任务遵循幂等原则且可重试。
  • 避免将不必要的文件下载到 Airflow 工作器的本地文件系统。

    Airflow 工作器的本地文件系统容量有限。例如,在 Cloud Composer 2,则一个工作器可以拥有 1 GB 到 10 GB 的存储空间。当 存储空间耗尽,Airflow 工作器 Pod 会被 GKE 控制平面这会导致被逐出的所有任务失败 worker 正在执行。

    下面列举了一些有问题的操作示例:

    • 下载文件或对象并将其存储在本地 Airflow 中 worker。请改为将这些对象直接存储在合适的服务(例如 Cloud Storage 存储桶)中。
    • 通过 Airflow 工作器访问 /data 文件夹中的大型对象。 Airflow 工作器将对象下载到其本地文件系统中。请改为实现 DAG,以便在 Airflow 工作器 Pod 之外处理大文件。

DAG 加载导入超时

具体情况:

  • 在 Airflow 网页界面中,DAG 列表页面顶部的红色提醒消息 框中显示 Broken DAG: [/path/to/dagfile] Timeout
  • 在 Cloud Monitoring 中:airflow-scheduler 日志包含相关条目 类似于:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

修复:

替换 dag_file_processor_timeout Airflow 配置选项,并留出更多时间进行 DAG 解析:

core dag_file_processor_timeout 新超时值

DAG 执行未在预期时间内结束

具体情况:

有时 DAG 运行并不会结束,因为 Airflow 任务卡住而 DAG 运行 持续时间比预期长。在正常情况下,Airflow 任务不会保留 无限期地处于排队或运行状态,因为 Airflow 有超时和 清除程序有助于避免这种情况

修复:

  • 使用 dagrun_timeout 参数例如:dagrun_timeout=timedelta(minutes=120)。因此,每个 DAG 运行都必须 在 DAG 运行超时时间内完成,且未完成的任务会标记为 为 FailedUpstream Failed。如需详细了解 Airflow 任务状态,请参阅 Apache Airflow 文档

  • 使用 任务执行超时 参数,用于定义基于 Apache 运行的任务的默认超时时间 Airflow 运算符。

未执行 DAG 运行

具体情况:

如果 DAG 的计划日期是动态设置的,可能会导致各种 意外的副作用。例如:

  • DAG 永远不会执行,也不会执行。

  • 过往 DAG 运行被标记为已执行且已成功完成(尽管未执行) 。

如需了解详情,请参阅 Apache Airflow 文档

修复:

  • 遵循 Apache Airflow 文档中的建议。

  • 为 DAG 设置静态 start_date。作为一种可选方案,您可以使用 catchup=False 停用针对过去日期的 DAG 运行。

  • 请避免使用 datetime.now()days_ago(<number of days>),除非您 我们应该认识到这种方法的副作用。

提升进出 Airflow 数据库的网络流量

您的环境的 GKE 集群和 Airflow 数据库之间的流量网络量取决于 DAG 数量、DAG 中的任务数量以及 DAG 访问 Airflow 数据库中的数据的方式。以下因素可能会影响网络用量:

  • 对 Airflow 数据库的查询。如果您的 DAG 执行了大量查询,则会生成大量流量。示例:在继续其他任务、查询 XCom 表、转储 Airflow 数据库内容之前检查任务的状态。

  • 任务数量过多。要安排的任务越多,产生的网络流量就越多。此考虑因素适用于 DAG 中的任务总数和调度频率。在 Airflow 调度器安排 DAG 运行时,它会向 Airflow 数据库发出查询并产生流量。

  • Airflow 网页界面会向 Airflow 数据库发出查询,因此会产生网络流量。大量使用包含图表、任务和图示的页面可能会产生大量网络流量。

DAG 导致 Airflow Web 服务器崩溃或导致其返回 502 gateway timeout 错误

有多种不同原因可能会导致 Web 服务器故障。查看 airflow-webserver 登录 Cloud Logging,以确定导致 502 gateway timeout 错误。

重量级计算负载

本部分仅适用于 Cloud Composer 1。

避免在 DAG 解析时运行重量级计算。

与工作器节点和调度器节点不同,它们的机器类型可以 具有更高的 CPU 和内存容量,那么 Web 服务器会使用固定的机器类型, 如果解析时计算过于复杂,可能会导致 DAG 解析失败 重量级。

请注意,Web 服务器具有 2 个 vCPU 和 2 GB 内存core-dagbag_import_timeout 的默认值为 30 秒。此超时值决定了 Airflow 在将 Python 模块载入 dags/ 文件夹时所花费的时长上限。

权限不正确

本部分仅适用于 Cloud Composer 1。

Web 服务器不是使用与工作器和调度器相同的服务账号运行。因此,工作器和调度器可能可以访问 Web 服务器无法访问的用户管理资源。

我们建议您避免在 DAG 解析期间访问非公开资源。在某些情况下,如果需要访问此类资源,您需要向 Web 服务器的服务账号授予相应权限。该服务账号名称来源于您的 Web 服务器网域。例如,如果域名 为 example-tp.appspot.com,服务账号为 example-tp@appspot.gserviceaccount.com.

DAG 错误

本部分仅适用于 Cloud Composer 1。

Web 服务器在 App Engine 上运行,并且独立于环境的 GKE 集群。Web 服务器会解析 DAG 定义文件;如果 DAG 中存在错误,则可能发生 502 gateway timeout。如果存在以下情况,则 Airflow 在没有正常运行的 Web 服务器的情况下 有问题的 DAG 不会中断 GKE 中运行的任何进程。 在这种情况下,您可以使用 gcloud composer environments run 检索 从您的环境中获取详细信息,并作为临时解决方法, 不可用。

在其他情况下,您可以在 GKE 中运行 DAG 解析,并查找抛出致命 Python 异常或发生超时(默认为 30 秒)的 DAG。 要进行问题排查,请连接到 Airflow 工作器容器中的远程 shell,并测试是否存在语法错误。 如需了解详情,请参阅测试 DAG

处理 dags 和 plugins 文件夹中的大量 DAG 和插件

系统会从以下来源同步 /dags/plugins 文件夹的内容: Airflow 工作器的本地文件系统存储数据 调度器

存储在这些文件夹中的数据越多,执行 同步。要解决此类情况,请执行以下操作:

  • 限制 /dags/plugins 文件夹中的文件数量。仅存储 所需文件的数量。

  • 如果可能,请增加可供 Airflow 调度器使用的磁盘空间,并 worker。

  • 如果可能,请增加 Airflow 调度器和工作器的 CPU 和内存, 以加快同步操作执行速度。

  • 如果 DAG 过多,请将 DAG 拆分为多个批次, 将它们放入 ZIP 归档文件中,并将这些归档文件部署到 /dags 文件夹中。 此方法可加快 DAG 同步过程。Airflow 组件 先解压缩 zip 归档文件,然后再处理 DAG。

  • 通过程序化方式生成 DAG 也可能是一种限制 存储在 /dags 文件夹中的 DAG 文件的数量。 请参阅程序化 DAG 部分,以避免 以编程方式生成的 DAG 调度和执行方面的问题。

请勿安排同时以程序化方式生成的 DAG

从 DAG 文件以编程方式生成 DAG 对象是一种高效的方法 编写许多只有细微差别的类似 DAG。

切勿立即安排所有此类 DAG 以执行。那里 Airflow 工作器很可能没有足够的 CPU 和内存 以执行在同一时间执行的所有任务。

为避免在安排程序化 DAG 时出现问题,请执行以下操作:

  • 提高工作器并发性和纵向扩容环境,以便 同时执行多个任务。
  • 生成 DAG 时,使其时间表随时间均匀分布, 可以避免同时调度上百个任务 有时间执行所有计划任务。

访问 Airflow Web 服务器时出现 504 错误

请参阅访问 Airflow 界面时出错 504

在任务执行期间或紧接其后抛出 Lost connection to Postgres server during query 异常

Lost connection to Postgres server during query 个例外情况 通常发生在满足以下条件时:

  • DAG 使用 PythonOperator 或自定义运算符。
  • DAG 向 Airflow 数据库发出查询。

如果通过可调用的函数进行了多次查询,则回溯可能会错误地指向 Airflow 代码中的 self.refresh_from_db(lock_for_update=True) 行;它是任务执行后的第一个数据库查询。导致出现异常的实际原因发生在这之前,即 SQLAlchemy 会话未正确关闭时。

SQLAlchemy 会话限定在线程范围内,并且在稍后可在 Airflow 代码中继续的可调用函数会话中创建。如果大量 查询之间的延迟,则连接可能已 由 Postgres 服务器关闭。Cloud Composer 环境中的连接超时设置为大约 10 分钟。

修复:

  • 使用 airflow.utils.db.provide_session 修饰器。此修饰器在 session 参数中提供到 Airflow 数据库的有效会话,并在函数结束时正确关闭会话。
  • 请勿使用单个长时间运行的函数。请改为将所有数据库查询移动到单独的函数中,以使多个函数具有 airflow.utils.db.provide_session 修饰器。在这种情况下,会话会在检索查询结果后自动关闭。

控制 DAG、任务和同一 DAG 的并行执行时间

如果您想控制特定 DAG 的单个 DAG 执行的时长 则您可以使用 dagrun_timeout DAG 参数 是这样。例如,如果您希望系统运行单个 DAG(不管是 执行完成(成功或失败)不得超过 1 小时, 然后将此参数设置为 3600 秒

您还可以控制单个 Airflow 任务的运行时长。待办事项 因此,您可以使用 execution_timeout

如果您想控制 那么您可以使用 [core]max-active-runs-per-dag 为此,请使用 Airflow 配置选项

如果您希望在给定时刻仅运行一个 DAG 实例,请将 将 max-active-runs-per-dag 参数设置为 1

影响 DAG 和插件同步到调度器、工作器和 Web 服务器的问题

Cloud Composer 会同步 /dags/plugins 文件夹的内容 调度器和工作器/dags/plugins 文件夹中的某些对象 可能会阻止此同步正常运行,或者至少会降低其运行速度。

  • /dags 文件夹会同步到调度器和工作器。此文件夹未同步 网络服务器,或者在 Cloud Composer 1 中启用 DAG Serialization

  • /plugins 文件夹会同步到调度器、工作器和网络服务器。

您可能会遇到以下问题:

  • 您上传的是使用 压缩转码/dags/plugins 文件夹中。如果您使用 gsutil cp -Z 命令上传 将数据传输到存储桶中

    解决方案:删除使用了压缩转码的对象,然后重新上传 复制到存储桶中

  • 其中一个对象名为“.”,此类对象不会同步到 调度器和工作器,它可能会完全停止同步。

    解决方案:重命名有问题的对象。

  • 文件夹和 DAG Python 文件具有相同的名称,例如 a.py。 在这种情况下,DAG 文件没有正确同步到 Airflow 组件。

    解决方案:移除与 DAG Python 文件同名的文件夹。

  • /dags/plugins 文件夹中的某个对象包含 / 符号 。此类对象可能会误导同步进程 因为 / 符号表示对象是文件夹,而不是文件。

    解决方案:从有问题的对象的名称中移除 / 符号。

  • 请勿将不必要的文件存储在 /dags/plugins 文件夹中。

    有时,您实现的 DAG 和插件会附带 例如存储这些组件的测试的文件。这些 文件会同步到工作器和调度器,这会影响 将这些文件复制到调度器、工作器和 Web 服务器。

    解决方案:不要在 /dags/plugins 个文件夹。

调度器和工作器生成了 Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...' 个错误

之所以出现此问题,是因为对象 Cloud Storage 中的重叠命名空间 调度器和工作器都使用传统的文件系统。例如,可以 将同名的文件夹和对象添加到环境的 存储桶。当存储桶同步到环境的调度器和工作器时, 系统就会生成此错误,这可能会导致任务失败。

要解决此问题,请确保 环境的存储桶中。例如,如果 /dags/misc(一个文件)和 /dags/misc/example_file.txt(另一个文件)位于存储桶中,则出现错误 由调度器生成

连接到 Airflow 元数据数据库时发生暂时性中断

Cloud Composer 在分布式云基础架构上运行, 也就是说,有时可能会出现一些暂时性问题, 会中断 Airflow 任务的执行。

在这种情况下,您可能会在 Airflow 工作器的日志:

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"

"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"

此类间歇性问题也可能由维护操作引起 Cloud Composer 环境。

此类错误通常是间歇性的,并且如果您的 Airflow 任务具有幂等性, 并且您已经配置了重试,那么应该不会对这些错误产生影响。您还可以 可以考虑定义维护期

导致此类错误的另一个原因是,您的 环境的集群。在这种情况下,您可以扩大或优化 环境即说明 扩缩环境或 有关优化环境的说明。

DAG 运行被标记为成功,但没有已执行的任务

如果 DAG 运行作业 execution_date 早于 DAG 的 start_date,则 您可能会看到有些 DAG 运行没有运行任何任务,但仍被标记为成功。

<ph type="x-smartling-placeholder">
</ph> 成功的 DAG 在没有执行的任务的情况下运行
图 3.成功的 DAG 运行但未执行的任务(点击可放大)

原因

在以下某种情况下,可能会出现这种情况:

  • 不匹配是由于 DAG 和 execution_datestart_date。例如,当 使用 pendulum.parse(...) 设置 start_date

  • 例如,DAG 的 start_date 设置为动态值 airflow.utils.dates.days_ago(1)

解决方案

  • 请确保 execution_datestart_date 使用的是同一时区。

  • 指定一个静态 start_date,并将其与 catchup=False 结合使用,以避免 开始日期已过的正在运行的 DAG。

DAG 在 Airflow 界面或 DAG 界面中不可见,并且调度器未对其进行调度

DAG 处理器会解析每个 DAG,然后调度器才能对其进行调度 Airflow 界面DAG 界面

以下 Airflow 配置选项定义了解析 DAG 的超时:

如果某个 DAG 在 Airflow 界面或 DAG 界面中不可见:

  • 如果 DAG 处理器能够正确处理,请检查 DAG 处理器日志 您的 DAG。如果出现问题,您可能会看到以下日志条目 在 DAG 处理器或调度器日志中:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
  • 检查调度器日志,查看调度器是否正常运行。如果 问题,您可能会在调度器日志中看到以下日志条目:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496

解决方案

  • 修复所有 DAG 解析错误。DAG 处理器解析多个 DAG 在极少数情况下,如果一个 DAG 解析错误, 其他 DAG

  • 如果 DAG 解析花费的时间超过 [core]dagrun_import_timeout, 然后延长此超时。

  • 如果解析所有 DAG 花费的时间超过定义的秒数 在 [core]dag_file_processor_timeout, 然后延长此超时。

  • 如果您的 DAG 需要很长时间才能解析,这也可能意味着它不是 以最佳方式实施。例如,如果读取命令中的 环境变量,或者对外部服务或 Airflow 执行调用 数据库。请尽可能避免在 DAG 的全局部分。

  • 增加调度程序的 CPU 和内存资源,提高其运行速度。

  • 调整调度器的数量

  • 增加 DAG 处理器进程的数量,以便完成解析 。为此,您可以将 [scheduler]parsing_process.

  • 降低 DAG 解析频率

  • 减少 Airflow 数据库的负载

Airflow 数据库负载过重的症状

如需了解详情,请参阅 Airflow 数据库面临负载压力的症状

后续步骤