Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页面介绍常见工作流问题的问题排查步骤和信息。
某些 DAG 执行问题可能是由 Airflow 调度程序无法正常或最佳运行所致。请按照调度程序问题排查说明解决这些问题。
排查工作流问题
要开始排查问题,请按如下所述操作:
检查 Airflow 日志。
您可以通过替换以下 Airflow 配置选项来提高 Airflow 的日志记录级别。
部分 键 值 logging
(在 Airflow 1 中为core
)logging_level
默认值为 INFO
。设置为DEBUG
可在日志消息中获取更多详细信息。查看监控信息中心。
查看 Cloud Monitoring。
在 Google Cloud 控制台中,检查环境的组件对应的页面上是否存在错误。
在 Airflow 网页界面中,检查相应 DAG 的 Graph View 中是否显示失败的任务实例。
部分 键 值 webserver
dag_orientation
LR
、TB
、RL
或BT
调试运算符故障
要调试某个操作器的故障,请执行以下操作:
- 检查任务特定的错误。
- 检查 Airflow 日志。
- 查看 Cloud Monitoring。
- 检查该运算符特定的日志。
- 修复错误。
- 将 DAG 上传到
/dags
文件夹。 - 在 Airflow 网页界面中,清除该 DAG 的过往状态。
- 恢复或运行该 DAG。
排查任务执行问题
Airflow 是一个分布式系统,其中包含许多实体(例如调度器、执行器、工作器),这些实体通过任务队列和 Airflow 数据库相互通信,并发送信号(例如 SIGTERM)。下图概述了 Airflow 组件之间的互连。
在 Airflow 等分布式系统中,可能会存在一些网络连接问题,或者底层基础架构可能会出现间歇性问题;这可能会导致任务失败并重新安排执行,或者任务可能无法成功完成(例如僵尸任务或执行过程中卡住的任务)。Airflow 有机制来处理此类情况,并自动恢复正常运行。以下部分介绍了 Airflow 执行任务期间发生的常见问题:僵尸任务、终止实例和 SIGTERM 信号。
排查僵尸任务问题
Airflow 会检测任务与执行任务的进程之间存在的两种不匹配情况:
僵尸任务是指应该运行但未运行的任务。如果任务的进程被终止或未响应、Airflow 工作器因过载而未及时报告任务状态,或者执行任务的虚拟机已关闭,就可能会发生这种情况。Airflow 会定期查找此类任务,并根据任务的设置失败或重试任务。
发现僵尸任务
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-scheduler") textPayload:"Detected zombie job"
僵尸任务是指不应运行的任务。Airflow 会定期查找此类任务并将其终止。
以下部分介绍了僵尸任务的最常见原因和解决方法。
Airflow 工作器内存不足
每个 Airflow 工作器最多可以同时运行 [celery]worker_concurrency
个任务实例。如果这些任务实例的累计内存用量超出 Airflow 工作器的内存限制,系统会终止其上的一个随机进程以释放资源。
发现 Airflow 工作器内存不足事件
resource.type="k8s_node" resource.labels.cluster_name="GKE_CLUSTER_NAME" log_id("events") jsonPayload.message:"Killed process" jsonPayload.message:("airflow task" OR "celeryd")
有时,Airflow 工作器的内存不足可能会导致在 SQL Alchemy 会话期间向数据库、DNS 服务器或 DAG 调用的任何其他服务发送格式错误的数据包。在这种情况下,连接的另一端可能会拒绝或丢弃来自 Airflow 工作器的连接。例如:
"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"
解决方案:
优化任务以减少内存用量,例如避免使用顶级代码。
降低
[celery]worker_concurrency
。在 Cloud Composer 1 中,升级到更大的机器类型。
Airflow 工作器被驱逐
Pod 逐出是在 Kubernetes 上运行工作负载的正常部分。如果 Pod 耗尽存储空间,或者为了为优先级更高的工作负载释放资源,GKE 会逐出 Pod。
发现 Airflow 工作器驱逐
resource.type="k8s_pod" resource.labels.cluster_name="GKE_CLUSTER_NAME" resource.labels.pod_name:"airflow-worker" log_id("events") jsonPayload.reason="Evicted"
解决方案:
- 如果驱逐是由于存储空间不足而导致的,您可以减少存储空间用量,或者在不需要临时文件时立即将其移除。或者,您也可以增加可用存储空间,或使用
KubernetesPodOperator
在专用 pod 中运行工作负载。
Airflow 工作器已终止
Airflow 工作器可能会被外部移除。如果当前正在运行的任务未在正常终止期限内完成,则会被中断,并最终可能会被检测为僵尸进程。
发现 Airflow 工作器 Pod 终止
resource.type="k8s_cluster" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.methodName:"pods.delete" protoPayload.response.metadata.name:"airflow-worker"
可能出现的情况和解决方案:
在环境修改(例如升级或软件包安装)期间,Airflow 工作器会重启:
查看 Composer 环境修改
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("cloudaudit.googleapis.com%2Factivity")
您可以在没有任何关键任务正在运行时执行此类操作,或者启用任务重试。
在维护操作期间,各种组件可能会暂时无法使用。
了解 GKE 维护操作
resource.type="gke_nodepool" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.metadata.operationType="UPGRADE_NODES"
您可以指定维护窗口,以尽量减少
与关键任务执行重叠。
Airflow 工作器负载过重
Airflow 工作器可用的 CPU 和内存资源量受环境配置的限制。如果资源利用率接近上限,可能会导致任务执行期间出现资源争用和不必要的延迟。在极端情况下,如果长时间缺少资源,可能会导致僵尸任务。
解决方案:
- 监控工作器的 CPU 和内存用量,并进行调整以避免超过 80%。
Airflow 数据库承受着繁重负载
各种 Airflow 组件使用数据库进行相互通信,尤其是存储任务实例的心跳。数据库资源不足会导致查询时间延长,并且可能会影响任务执行。
有时,Airflow 工作器的日志中会出现以下错误:
(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly
This probably means the server terminated abnormally before or while
processing the request.
解决方案:
- 避免在顶级 DAG 代码中使用大量
Variables.get
指令。请改用 Jinja 模板检索 Airflow 变量的值。 - 优化(减少)顶级 DAG 代码中 Jinja 模板中 xcom_push 和 xcom_pull 指令的使用。
- 考虑升级到较大的环境大小(中或大)。
- 减少调度器数量
- 降低 DAG 解析频率。
- 监控数据库 CPU 和内存用量。
Airflow 数据库暂时不可用
Airflow 工作器可能需要一些时间来检测并妥善处理间歇性错误(例如暂时性连接问题)。它可能会超出默认的僵尸检测阈值。
了解 Airflow 心跳超时
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker") textPayload:"Heartbeat time limit exceeded"
解决方案:
延长僵尸任务的超时时间,并替换
[scheduler]scheduler_zombie_task_threshold
Airflow 配置选项的值:部分 键 值 备注 scheduler
scheduler_zombie_task_threshold
新超时(以秒为单位) 默认值为 300
排查实例终止问题
Airflow 使用终止实例机制来关闭 Airflow 任务。此机制适用于以下情况:
- 当调度程序终止未按时完成的任务时。
- 任务超时或执行时间过长。
当 Airflow 终止任务实例时,您可以在执行了任务的 Airflow 工作器的日志中看到以下日志条目:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Terminating instance.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
可能的解决方案:
检查任务代码,看看是否存在可能会导致其运行时间过长的错误。
增加
[celery_broker_transport_options]visibility-timeout
Airflow 配置选项的值。因此,调度器会等待更长时间,以便任务完成,然后再将任务视为僵尸任务。对于耗时数小时的任务,此选项尤为有用。如果此值过低(例如 3 小时),则调度器会将运行 5 到 6 小时的任务视为“挂起”(僵尸任务)。
增加
[core]killed_task_cleanup_time
Airflow 配置选项的值。值越大,Airflow 工作器完成任务所需的时间就越长。如果该值过低,Airflow 任务可能会突然中断,没有足够的时间来正常完成工作。
排查 SIGTERM 信号问题
Linux、Kubernetes、Airflow 调度程序和 Celery 使用 SIGTERM 信号来终止负责运行 Airflow 工作器或 Airflow 任务的进程。
导致在某个环境中发送 SIGTERM 信号的原因可能有以下几种:
某个任务已变为僵尸任务,必须停止。
调度程序发现了任务的副本,并向任务发送了“终止实例”和 SIGTERM 信号以停止该任务。
在Pod 横向自动扩缩中,GKE 控制平面会发送 SIGTERM 信号来移除不再需要的 Pod。
调度程序可以向 DagFileProcessorManager 进程发送 SIGTERM 信号。调度程序使用此类 SIGTERM 信号来管理 DagFileProcessorManager 进程生命周期,因此可以安全地忽略。
示例:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
local_task_job(用于监控任务执行情况)中的心跳回调和退出回调之间存在竞态条件。如果心跳检测到某个任务被标记为成功,则无法区分是任务本身成功了,还是 Airflow 被告知将该任务视为成功。不过,它会终止任务运行程序,而不会等待其退出。
您可以放心地忽略此类 SIGTERM 信号。任务已处于成功状态,整个 DAG 运行的执行不会受到影响。
日志条目
Received SIGTERM.
是正常退出与成功状态下任务终止之间的唯一区别。图 2. 心跳回调和退出回调之间的竞态条件(点击可放大) Airflow 组件使用的资源(CPU、内存)超出了集群节点允许的范围。
GKE 服务会执行维护操作,并向即将升级的节点上运行的 Pod 发送 SIGTERM 信号。
使用 SIGTERM 终止任务实例时,您可以在执行任务的 Airflow 工作器的日志中看到以下日志条目:
{local_task_job.py:211} WARNING - State of this instance has been externally set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed with exception
可能的解决方案:
当运行任务的虚拟机内存不足时,就会出现此问题。这与 Airflow 配置无关,而是与虚拟机的可用内存量有关。
在 Cloud Composer 1 中,您可以使用性能更高的机器类型重新创建环境。
您可以降低
[celery]worker_concurrency
并发 Airflow 配置选项的值。此选项决定了给定 Airflow 工作器并发执行的任务数量。
使用 Cloud Logging 查询来了解 Pod 重启或驱逐的原因
Cloud Composer 的环境使用 GKE 集群作为计算基础架构层。在本部分中,您可以找到一些实用查询,这些查询有助于查找 Airflow 工作器或 Airflow 调度程序重启或驱逐的原因。
您可以通过以下方式调整系统进一步显示的查询:
您可以在 Cloud Logging 中指定所需的时间范围。例如,过去 6 小时、3 天,或者您也可以定义自定义时间范围。
您必须在 CLUSTER_NAME 中指定环境集群的名称。
您可以通过添加 POD_NAME 将搜索范围限定为特定 Pod。
发现已重启的容器
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
将结果限制为特定 Pod 的替代查询:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
发现因内存不足事件而关闭的容器
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
将结果限制为特定 Pod 的替代查询:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
发现已停止执行的容器
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
将结果限制为特定 Pod 的替代查询:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
更新或升级操作对 Airflow 任务执行的影响
更新或升级操作会中断当前正在执行的 Airflow 任务,除非任务以可延迟模式执行。
我们建议您在预计对 Airflow 任务执行的影响最小时执行这些操作,并在 DAG 和任务中设置适当的重试机制。
常见问题
以下部分介绍了一些常见 DAG 问题的症状和可行修复措施。
Airflow 任务被 Negsignal.SIGKILL
中断
有时,您的任务使用的内存可能超过 Airflow 工作器分配的内存。在这种情况下,它可能会被 Negsignal.SIGKILL
中断。系统发送此信号是为了避免进一步消耗内存,这可能会影响其他 Airflow 任务的执行。在 Airflow 工作器的日志中,您可能会看到以下日志条目:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
可能还会显示为代码 -9
。
可能的解决方案:
降低 Airflow 工作器的
worker_concurrency
。升级到 Cloud Composer 集群中使用的更大机器类型。
优化任务以减少使用的内存。
由于 DAG 解析错误,任务失败且没有发出日志
有时,可能会出现细微的 DAG 错误,导致 Airflow 调度器可以安排任务以供执行,DAG 处理器可以解析 DAG 文件,但 Airflow 工作器因 DAG 文件中存在编程错误而无法执行 DAG 中的任务。这可能会导致 Airflow 任务被标记为 Failed
,并且没有执行日志。
解决方案:
在 Airflow 工作器日志中,验证 Airflow 工作器是否未引发与 DAG 缺失或 DAG 解析错误相关的错误。
增加与 DAG 解析相关的参数:
将 dagbag-import-timeout 增加到至少 120 秒(如有需要,可以更长)。
将 dag-file-processor-timeout 增加到至少 180 秒(如果需要,可以更长)。此值必须高于
dagbag-import-timeout
。
另请参阅检查 DAG 处理器日志。
由于资源压力,任务失败且没有发出日志
问题:在执行任务期间,负责 Airflow 任务执行的 Airflow 工作器的子进程会突然中断。Airflow 工作器日志中显示的错误可能类似于以下内容:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
解决方案:
在 Cloud Composer 1 中,创建一个资源规模比当前机器类型更大的机器类型的新环境。考虑向您的环境添加更多节点,并降低工作器的
[celery]worker_concurrency
。如果您的环境还会生成僵尸任务,请参阅排查僵尸任务问题。
如需有关调试内存不足问题的教程,请参阅调试内存不足和存储空间不足的 DAG 问题。
由于 Pod 被驱逐,任务失败且没有发出日志
Google Kubernetes Engine pod 受到 Kubernetes pod 生命周期和 pod 逐出的约束。在 Cloud Composer 中,任务高峰和工作器的协同调度是 pod 逐出的两个最常见原因。
当特定 pod 过度使用某个节点的资源(相对于该节点配置的资源消耗预期)时,可能会发生 pod 逐出。例如,当一个 pod 中运行多个内存密集型任务时,系统可能会执行逐出,并且这些任务的组合负载会导致该 pod 运行的节点超出内存消耗限制。
如果 Airflow 工作器 pod 被逐出,则该 pod 上运行的所有任务实例都会中断,之后被 Airflow 标记为失败。
日志被缓冲。如果工作器 pod 在缓冲区刷新之前被逐出,则不会发出日志。如果任务失败且没有日志,则表示 Airflow 工作器已因内存不足 (OOM) 重启。即使未发出 Airflow 日志,Cloud Logging 中也可能存在一些日志。
要查看日志,请执行以下操作:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
前往日志标签页。
依次选择所有日志 > Airflow 日志 > 工作器,查看各个 Airflow 工作器的日志。
具体情况:
在 Google Cloud 控制台中,前往工作负载页面。
如果存在显示
Evicted
的airflow-worker
pod,请点击每个被逐出的 pod 并查看窗口顶部的The node was low on resource: memory
消息。
解决方案:
使用比当前机器类型更大的机器类型创建新的 Cloud Composer 1 环境。
检查来自
airflow-worker
pod 的日志,了解可能的逐出原因。如需详细了解如何从各个 pod 提取日志,请参阅已部署工作负载的问题排查。确保 DAG 中的任务遵循幂等原则且可重试。
避免将不必要的文件下载到 Airflow 工作器的本地文件系统。
Airflow 工作器的本地文件系统容量有限。 当存储空间用尽时,GKE 控制平面会驱逐 Airflow 工作器 pod。这会导致被驱逐的 Worker 正在执行的所有任务都失败。
存在问题的操作示例:
- 下载文件或对象,并将其存储在 Airflow 工作器的本地。请改为直接将这些对象存储在适当的服务(例如 Cloud Storage 存储分区)中。
- 从 Airflow 工作器访问
/data
文件夹中的大对象。 Airflow 工作器会将对象下载到其本地文件系统。请改为实现 DAG,以便在 Airflow 工作器 pod 之外处理大型文件。
DAG 加载导入超时
具体情况:
- 在 Airflow 网页界面中,DAG 列表页面顶部出现一个红色提醒框,其中显示
Broken DAG: [/path/to/dagfile] Timeout
。 在 Cloud Monitoring 中:
airflow-scheduler
日志包含类似于以下内容的条目:ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
修复:
替换 dag_file_processor_timeout
Airflow 配置选项,并留出更多时间来进行 DAG 解析:
部分 | 键 | 值 |
---|---|---|
core |
dag_file_processor_timeout |
新超时值 |
DAG 执行未在预期时间内结束
具体情况:
有时,DAG 运行不会结束,因为 Airflow 任务会卡住,导致 DAG 运行时间超出预期。在正常情况下,Airflow 任务不会无限期地保持在队列中或运行状态,因为 Airflow 具有超时和清理程序,有助于避免这种情况。
修复:
为 DAG 使用
dagrun_timeout
参数。例如:dagrun_timeout=timedelta(minutes=120)
。因此,每个 DAG 运行都必须在 DAG 运行超时时间内完成。未完成的任务会标记为Failed
或Upstream Failed
。如需详细了解 Airflow 任务状态,请参阅 Apache Airflow 文档。使用任务执行超时参数为基于 Apache Airflow 运算符运行的任务定义默认超时。
未执行 DAG 运行作业
具体情况:
动态设置 DAG 的安排日期可能会导致各种意外副作用。例如:
DAG 执行时间始终在未来,并且 DAG 永远不会执行。
过去的 DAG 运行会被标记为已执行且成功,即使未执行也是如此。
如需了解详情,请参阅 Apache Airflow 文档。
可能的解决方案:
请按照 Apache Airflow 文档中的建议操作。
为 DAG 设置静态
start_date
。您可以选择使用catchup=False
停用针对过去日期运行 DAG。除非您了解此方法的副作用,否则请避免使用
datetime.now()
或days_ago(<number of days>)
。
提升进出 Airflow 数据库的网络流量
您的环境的 GKE 集群和 Airflow 数据库之间的流量网络量取决于 DAG 数量、DAG 中的任务数量以及 DAG 访问 Airflow 数据库中的数据的方式。以下因素可能会影响网络用量:
对 Airflow 数据库的查询。如果您的 DAG 执行了大量查询,则会生成大量流量。示例:在继续其他任务、查询 XCom 表、转储 Airflow 数据库内容之前检查任务的状态。
任务数量过多。要安排的任务越多,产生的网络流量就越多。此考虑因素适用于 DAG 中的任务总数和调度频率。在 Airflow 调度器安排 DAG 运行时,它会向 Airflow 数据库发出查询并产生流量。
Airflow 网页界面会向 Airflow 数据库发出查询,因此会产生网络流量。大量使用包含图表、任务和图示的页面可能会产生大量网络流量。
DAG 导致 Airflow Web 服务器崩溃或导致其返回“502 gateway timeout”错误
有多种不同原因可能会导致 Web 服务器故障。检查 Cloud Logging 中的 airflow-webserver 日志,确定 502 gateway timeout
错误的原因。
重负载计算
本部分仅适用于 Cloud Composer 1。
与工作器和调度器节点不同(它们可以通过自定义机器类型来提高 CPU 和内存容量),Web 服务器使用的是固定机器类型,因此,如果解析时的计算量过于庞大,可能会导致 DAG 解析失败。
请注意,Web 服务器具有 2 个 vCPU 和 2 GB 内存。
core-dagbag_import_timeout
的默认值为 30 秒。此超时值决定了 Airflow 在将 Python 模块载入 /dags
文件夹时所花费的时长上限。
权限不正确
本部分仅适用于 Cloud Composer 1。
Web 服务器不是使用与工作器和调度器相同的服务账号运行。因此,工作器和调度器可能可以访问 Web 服务器无法访问的用户管理资源。
我们建议您避免在 DAG 解析期间访问非公开资源。在某些情况下,如果需要访问此类资源,您需要向 Web 服务器的服务账号授予相应权限。该服务账号名称来源于您的 Web 服务器网域。例如,如果网域为 example-tp.appspot.com
,则服务账号为 example-tp@appspot.gserviceaccount.com
。
DAG 错误
本部分仅适用于 Cloud Composer 1。
Web 服务器在 App Engine 上运行,并且独立于环境的 GKE 集群。Web 服务器会解析 DAG 定义文件;如果 DAG 中存在错误,则可能发生 502 gateway timeout
。如果有问题的 DAG 没有导致 GKE 中正在运行的任何进程中断,则 Airflow 会在 Web 服务器未正常发挥作用的情况下正常工作。在这种情况下,您可以使用 gcloud composer environments run
来从您的环境中检索详细信息,并可在 Web 服务器变得不可用时使用它来解决问题。
在其他情况下,您可以在 GKE 中运行 DAG 解析,并查找抛出致命 Python 异常或发生超时(默认为 30 秒)的 DAG。 要进行问题排查,请连接到 Airflow 工作器容器中的远程 shell,并测试是否存在语法错误。 如需了解详情,请参阅测试 DAG。
在 DAG 和插件文件夹中处理大量 DAG 和插件
/dags
和 /plugins
文件夹中的内容会从环境的存储分区同步到 Airflow 工作器和调度器的本地文件系统。
这些文件夹中存储的数据越多,执行同步所需的时间就越长。如需解决此类情况,请执行以下操作:
限制
/dags
和/plugins
文件夹中的文件数量。仅存储所需的最低限度文件。增加 Airflow 调度程序和工作器可用的磁盘空间。
增加 Airflow 调度器和工作器的 CPU 和内存,以加快同步操作的执行速度。
如果 DAG 数量非常多,请将 DAG 划分为批次,将其压缩为 ZIP 归档文件,然后将这些归档文件部署到
/dags
文件夹中。这种方法可以加快 DAG 同步流程。Airflow 组件会先解压缩 ZIP 归档文件,然后再处理 DAG。以编程方式生成 DAG 也可能是限制
/dags
文件夹中存储的 DAG 文件数量的方法。请参阅程序化 DAG 部分,以避免在调度和执行程序化生成的 DAG 时出现问题。
请勿同时安排以程序化方式生成的 DAG
通过程序化方式从 DAG 文件生成 DAG 对象是一种高效的方法,可用于编写许多仅存在细微差异的相似 DAG。
请务必不要立即安排所有此类 DAG 执行。Airflow 工作器很可能没有足够的 CPU 和内存资源来执行同时调度的所有任务。
为避免在调度程序化 DAG 时出现问题,请执行以下操作:
- 增加工作器并发数并扩容环境,以便环境能够同时执行更多任务。
- 生成 DAG 时,应使其时间表在时间上均匀分布,以避免同时调度数百个任务,从而让 Airflow 工作器有时间执行所有已调度的任务。
访问 Airflow Web 服务器时出现 504 错误
在任务执行期间或紧接其后,抛出查询异常时与 Postgres / MySQL 服务器的连接中断
满足以下条件时,通常会发生 Lost connection to Postgres / MySQL server during query
异常:
- DAG 使用
PythonOperator
或自定义运算符。 - DAG 向 Airflow 数据库发出查询。
如果通过可调用的函数进行了多次查询,则回溯可能会错误地指向 Airflow 代码中的 self.refresh_from_db(lock_for_update=True)
行;它是任务执行后的第一个数据库查询。导致出现异常的实际原因发生在这之前,即 SQLAlchemy 会话未正确关闭时。
SQLAlchemy 会话限定在线程范围内,并且在稍后可在 Airflow 代码中继续的可调用函数会话中创建。如果一次会话中的查询之间存在严重延迟,则表示 Postgres 或 MySQL 服务器可能已经关闭了连接。Cloud Composer 环境中的连接超时设置为大约 10 分钟。
解决方案:
- 使用
airflow.utils.db.provide_session
修饰器。此修饰器在session
参数中提供到 Airflow 数据库的有效会话,并在函数结束时正确关闭会话。 - 请勿使用单个长时间运行的函数。请改为将所有数据库查询移动到单独的函数中,以使多个函数具有
airflow.utils.db.provide_session
修饰器。在这种情况下,会话会在检索查询结果后自动关闭。
控制 DAG、任务以及同一 DAG 的并行执行的执行时间
如果您想控制特定 DAG 的单次 DAG 执行时长,可以使用 dagrun_timeout
DAG 参数来实现。例如,如果您希望单次 DAG 运行(无论执行是否成功完成)不得超过 1 小时,请将此参数设置为 3600 秒。
您还可以控制单个 Airflow 任务的运行时长。为此,您可以使用 execution_timeout
。
如果您想控制特定 DAG 的活跃 DAG 运行次数,可以使用 [core]max-active-runs-per-dag
Airflow 配置选项来实现此目的。
如果您希望在给定时间只运行 DAG 的一个实例,请将 max-active-runs-per-dag
参数设置为 1
。
影响 DAG 和插件同步到调度器、工作器和 Web 服务器的问题
Cloud Composer 会将 /dags
和 /plugins
文件夹中的内容同步到调度器和工作器。/dags
和 /plugins
文件夹中的某些对象可能会导致此同步无法正常运行或速度变慢。
/dags
文件夹会同步到调度器和工作器。如果您在 Cloud Composer 1 中启用
DAG Serialization
,此文件夹不会同步到 Web 服务器。/plugins
文件夹会同步到调度器、工作器和 Web 服务器。
您可能会遇到以下问题:
您已将使用压缩转码的 gzip 压缩文件上传到
/dags
和/plugins
文件夹。如果您在gcloud storage cp
命令中使用--gzip-local-all
标志将数据上传到存储分区,通常会发生这种情况。解决方法:删除使用压缩转码的对象,然后将其重新上传到存储分区。
其中一个对象名为“.” - 此类对象不会与调度程序和工作器同步,并且可能会完全停止同步。
解决方案:重命名对象。
文件夹和 DAG Python 文件同名,例如
a.py
。在这种情况下,DAG 文件未正确同步到 Airflow 组件。解决方法:移除与 DAG Python 文件同名的文件夹。
/dags
或/plugins
文件夹中的某个对象的名称末尾包含/
符号。此类对象可能会干扰同步流程,因为/
符号表示对象是文件夹,而不是文件。解决方法:从有问题的对象的名称中移除
/
符号。请勿在
/dags
和/plugins
文件夹中存储不必要的文件。有时,您实现的 DAG 和插件会附带其他文件,例如用于存储这些组件的测试的文件。这些文件会同步到工作器和调度器,并会影响将这些文件复制到调度器、工作器和网络服务器所需的时间。
解决方法:请勿在
/dags
和/plugins
文件夹中存储任何其他不必要的文件。
调度程序和工作器生成“Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...'”错误
之所以会出现此问题,是因为对象在 Cloud Storage 中可以具有重叠的命名空间,而调度程序和工作器同时使用传统文件系统。例如,您可以将同名文件夹和对象添加到环境的存储分区中。当存储分区同步到环境的调度程序和工作器时,系统会生成此错误,这可能会导致任务失败。
如需解决此问题,请确保环境的存储分区中没有重叠的命名空间。例如,如果 /dags/misc
(一个文件)和 /dags/misc/example_file.txt
(另一个文件)都位于同一个存储分区中,则调度程序会生成错误。
连接到 Airflow 元数据数据库时出现短暂中断
Cloud Composer 在分布式基础架构之上运行。这意味着,可能会不时出现一些暂时性问题,这些问题可能会中断 Airflow 任务的执行。
在这种情况下,您可能会在 Airflow 工作器的日志中看到以下错误消息:
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
或
"Can't connect to Postgres / MySQL server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
此类间歇性问题也可能由为 Cloud Composer 环境执行的维护操作导致。
通常,此类错误是间歇性的,如果您的 Airflow 任务是幂等的,并且您已配置重试,则不会受到影响。您还可以考虑定义维护窗口。
导致此类错误的另一个原因可能是环境的集群中缺少资源。在这种情况下,您可以按照扩缩环境或优化环境中的说明扩容或优化环境。
DAG 运行标记为成功,但没有执行任何任务
如果 DAG 运行 execution_date
早于 DAG 的 start_date
,那么您可能会看到没有任何任务运行的 DAG 运行,但仍被标记为成功。
![成功运行的 DAG,但没有执行任何任务](https://cloud.google.com/static/composer/docs/images/dag-no-taskruns.png?authuser=7&hl=zh-cn)
原因
这种情况可能会在以下某种情况下发生:
不匹配是由 DAG 的
execution_date
和start_date
之间的时区差异造成的。例如,使用pendulum.parse(...)
设置start_date
时可能会发生这种情况。DAG 的
start_date
设置为动态值,例如airflow.utils.dates.days_ago(1)
解决方案
确保
execution_date
和start_date
使用相同的时区。指定静态
start_date
并将其与catchup=False
结合使用,以避免运行具有过去开始日期的 DAG。
DAG 在 Airflow 界面或 DAG 界面中不可见,并且调度器不会对其进行调度
DAG 处理器会解析每个 DAG,之后调度器才能对这些 DAG 进行调度,并且 DAG 才能在 Airflow 界面或 DAG 界面中显示。
以下 Airflow 配置选项用于定义解析 DAG 的超时设置:
[core]dagrun_import_timeout
定义了 DAG 处理器解析单个 DAG 所需的时间。[core]dag_file_processor_timeout
定义了 DAG 处理器在解析所有 DAG 时可以花费的总时间。
如果 DAG 未显示在 Airflow 界面或 DAG 界面中,请执行以下操作:
检查 DAG 处理器日志,确认 DAG 处理器能否正确处理您的 DAG。如果出现问题,您可能会在 DAG 处理器或调度程序日志中看到以下日志条目:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for /usr/local/airflow/dags/example_dag.py with PID 21903 started at 2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
检查调度程序日志,了解调度程序是否正常运行。如果出现问题,您可能会在调度程序日志中看到以下日志条目:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it Process timed out, PID: 68496
解决方案:
修正所有 DAG 解析错误。DAG 处理器会解析多个 DAG,在极少数情况下,一个 DAG 的解析错误可能会对其他 DAG 的解析产生负面影响。
如果 DAG 的解析时间超过
[core]dagrun_import_timeout
中定义的秒数,请延长此超时时间。如果解析所有 DAG 所需的时间超过
[core]dag_file_processor_timeout
中定义的秒数,请延长此超时时间。如果 DAG 需要很长时间才能解析,也可能表示它未以最佳方式实现。例如,如果它读取许多环境变量,或者执行对外部服务或 Airflow 数据库的调用。请尽可能避免在 DAG 的全局部分执行此类操作。
增加调度器的 CPU 和内存资源,以便其更快地运行。
增加 DAG 处理器进程的数量,以加快解析速度。您可以通过增加
[scheduler]parsing_process
的值来实现此目的。
Airflow 数据库承受重负载的症状
如需了解详情,请参阅 Airflow 数据库承受负载压力的症状。