Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页面介绍 Airflow 调度程序常见问题和问题排查步骤和信息。
确定问题的来源
如需开始进行问题排查,请确定问题是出现在 DAG 解析时间还是在执行时处理任务的时候。如需详细了解解析时间和执行时间,请参阅 DAG 解析时间与 DAG 执行时间之间的差异。
检查 DAG 处理器日志
如果您有复杂的 DAG,由调度器运行的 DAG 处理器可能无法解析您的所有 DAG。这可能会导致许多问题,这些问题具有以下症状。
症状:
如果 DAG 处理器在解析 DAG 时遇到问题,则可能会导致以下问题同时出现。与静态 DAG 相比,如果 DAG 是动态生成的,这些问题的影响可能会更大。
DAG 在 Airflow 界面和 DAG 界面中不可见。
DAG 不会安排执行时间。
DAG 处理器日志中存在错误,例如:
dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR - Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
或
dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR - Processor for /home/airflow/gcs/dags/dag-example.py exited with return code 1.
Airflow 调度器出现问题,导致调度器重启。
调度执行的 Airflow 任务会被取消,并且无法解析的 DAG 的 DAG 运行作业可能会被标记为
failed
。例如:airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1 manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag 'dag-example'. Marking it as removed.
解决方案:
增加与 DAG 解析相关的参数:
将 dagbag-import-timeout 增加到至少 120 秒(如有需要,可以更长)。
将 dag-file-processor-timeout 增加到至少 180 秒(如果需要,可以更长)。此值必须高于
dagbag-import-timeout
。
更正或移除会导致 DAG 处理器出现问题的 DAG。
检查 DAG 解析时间
如需验证问题是否发生在 DAG 解析时间,请执行以下步骤。
控制台
在 Google Cloud 控制台中,您可以使用 Monitoring 页面和 Logs 标签页来检查 DAG 解析时间。
使用 Cloud Composer“监控”页面检查 DAG 解析时间:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。系统随即会打开监控页面。
在 Monitoring 标签页中,查看 DAG 运行部分中的所有 DAG 文件的总解析时间图表,并找出可能的问题。
使用 Cloud Composer 日志标签页检查 DAG 解析时间:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。系统随即会打开监控页面。
前往日志标签页,然后从所有日志导航树中选择 DAG 处理器管理器部分。
查看
dag-processor-manager
日志并确定可能存在的问题。
gcloud - Airflow 1
使用带有 -r
标志的 list_dags
命令可查看所有 DAG 的解析时间。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
list_dags -- -r
您需要进行如下替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。
该命令的输出类似于以下内容:
-------------------------------------------------------------------
DagBag loading stats for /home/airflow/gcs/dags
-------------------------------------------------------------------
Number of DAGs: 5
Total task number: 13
DagBag parsing time: 0.6765180000000001
-----------+----------+---------+----------+-----------------------
file | duration | dag_num | task_num | dags
-----------+----------+---------+----------+-----------------------
/dag_1.py | 0.6477 | 1 | 2 | ['dag_1']
/dag_2.py | 0.018652 | 1 | 2 | ['dag_2']
/dag_3.py | 0.004024 | 1 | 6 | ['dag_3']
/dag_4.py | 0.003476 | 1 | 2 | ['dag_4']
/dag_5.py | 0.002666 | 1 | 1 | ['dag_5']
-----------+----------+---------+----------+-----------------------
查找 DagBag 解析时间值。如果值较大,则可能表示某个 DAG 未以最佳方式实施。从输出表中,您可以确定哪些 DAG 的解析时间较长。
gcloud - Airflow 2
使用 dags report
命令可查看所有 DAG 的解析时间。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags report
您需要进行如下替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。
该命令的输出类似于以下内容:
Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file | duration | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py | 0:00:00.038334 | 2 | 10 | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1 | 1 | airflow_monitoring
查找表格中列出的每个 DAG 的时长值。如果值较大,则可能表示某个 DAG 未以最佳方式实施。从输出表中,您可以确定哪些 DAG 的解析时间较长。
监控正在运行和已加入队列的任务
如需检查是否有任务卡在队列中,请执行以下步骤。
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。 环境详情页面会打开。
转到监控标签页。
在 Monitoring 标签页中,查看 DAG 运行部分中的 Airflow 任务图表,以确定潜在问题。Airflow 任务是指在 Airflow 中处于队列状态的任务,它们可以进入 Celery 或 Kubernetes Executor 代理队列。“Celery 队列中的任务数”是指已进入 Celery 代理队列的任务实例数。
DAG 解析时排查问题
以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。
Cloud Composer 1 和 Airflow 1 中的 DAG 解析和调度
Airflow 2 显著提高了 DAG 解析效率。如果您遇到与 DAG 解析和调度相关的性能问题,请考虑迁移到 Airflow 2。
在 Cloud Composer 1 中,调度程序与其他 Cloud Composer 组件一起在集群节点上运行。因此,与其他节点相比,单个集群节点的负载可能会更高或更低。调度程序的性能(DAG 解析和调度)可能会因调度程序运行的节点而异。此外,由于升级或维护操作,运行调度程序的单个节点可能会发生变化。Cloud Composer 2 解决了这个问题,您可以向调度程序分配 CPU 和内存资源,并且调度程序的性能不依赖于集群节点的负载。
受限线程数量
仅允许 DAG 处理器管理器(用于处理 DAG 文件的调度器部分)使用有限数量的线程可能会影响您的 DAG 解析时间。
如需解决此问题,请替换以下 Airflow 配置选项:
对于 Airflow 1.10.12 及更低版本,请替换
max_threads
参数:部分 键 值 备注 scheduler
max_threads
NUMBER_OF_CORES_IN_MACHINE - 1
将 NUMBER_OF_CORES_IN_MACHINE
替换为工作器节点中核心的数量
。对于 Airflow 1.10.14 及更高版本,请替换
parsing_processes
参数:部分 键 值 备注 scheduler
parsing_processes
NUMBER_OF_CORES_IN_MACHINE - 1
将 NUMBER_OF_CORES_IN_MACHINE
替换为工作器节点中核心的数量
。
任务的数量和时间分布
已知 Airflow 在安排大量小任务时会出现问题。在这种情况下,您应该选择使用较少的整合任务。
同时调度大量 DAG 或任务也可能是问题的来源。如需避免此问题,请随着时间推移更平均地分配您的任务。
在正在运行和已加入队列的任务中排查问题
以下部分介绍正在运行和已加入队列的任务的一些常见问题的症状和可能的修复。
任务队列过长
在某些情况下,调度器的任务队列可能会过长。如需了解如何优化工作器和 celcel 参数,请参阅将 Cloud Composer 环境与您的业务一起扩缩。
使用 Airflow 调度程序的时间表功能
从 Airflow 2.2 开始,您可以使用名为 TimeTable 的新功能为 DAG 定义时间表。
您可以使用以下任一方法定义时间表:
受限集群资源
本部分仅适用于 Cloud Composer 1。
如果环境的 GKE 集群太小,无法处理所有 DAG 和任务,您可能会遇到性能问题。在这种情况下,请尝试以下解决方案之一:
- 使用性能更高的机器类型创建一个新环境,并将 DAG 迁移到该环境中。
- 创建更多 Cloud Composer 环境并在它们之间拆分 DAG。
- 按照升级 GKE 节点的机器类型中的说明更改 GKE 节点的机器类型。由于此过程容易出错,因此这是最不建议的选项。
- 升级在您的环境中运行 Airflow 数据库的 Cloud SQL 实例的机器类型,例如,使用
gcloud composer environments update
命令。Airflow 数据库性能不佳可能是调度器较慢的原因。
避免在维护窗口内进行任务调度
您可以为环境定义特定的维护窗口。在这些时间段内,会出现 Cloud SQL 和 GKE 的维护事件。
让 Airflow 调度器忽略不必要的文件
您可以通过跳过 DAGs 文件夹中的不必要文件来提高 Airflow 调度器的性能。Airflow 调度器会忽略 .airflowignore
文件中指定的文件和文件夹。
如需让 Airflow 调度器忽略不必要的文件,请执行以下操作:
- 创建
.airflowignore
文件。 - 在此文件中,列出应忽略的文件和文件夹。
- 将此文件上传到环境的存储分区中的
/dags
文件夹。
如需详细了解 .airflowignore
文件格式,请参阅 Airflow 文档。
Airflow 调度器进程暂停 DAG
Airflow 用户会暂停 DAG 以避免执行。这样可以节省 Airflow 工作器处理周期。
Airflow 调度器会继续解析已暂停的 DAG。如果您确实想要提高 Airflow 调度器性能,请使用 .airflowignore
或从 DAGs 文件夹中删除已暂停的 DAG。
在 DAG 中使用“wait_for_downstream”
如果您将 DAG 中的参数 wait_for_downstream
设置为 True
,那么如果您希望某个任务成功,则该任务的所有直接下行任务都必须成功。这意味着,由于执行之前的 DAG 运行中的任务,属于特定 DAG 运行的任务的执行可能会变慢。如需了解详情,请参阅 Airflow 文档。
排队时间过长的任务将被取消并重新安排
如果 Airflow 任务在队列中保留的时间过长,则调度器会再次重新安排它以执行(在低于 2.3.1 的 Airflow 版本中,如果任务符合重试条件,系统还会将其标记为失败并重试)。
如需观察此类情况的症状,一种方法是查看队列任务数量的图表(Cloud Composer 界面中的“监控”标签页),如果此图表中的峰值在约两小时内没有下降,则任务很可能会重新调度(没有日志),然后调度程序日志中会出现“Adopted tasks were still pending ...”(已采用的任务仍处于待处理状态)日志条目。在这种情况下,您可能会在 Airflow 任务日志中看到“Log file not found...”消息,因为该任务未执行。
一般来说,预期此任务失败,并且计划任务的下一个实例应该根据时间表执行。如果您在 Cloud Composer 环境中观察到许多此类情况,则可能表示您的环境中没有足够的 Airflow 工作器来处理所有计划任务。
解决方法:如需解决此问题,您需要确保 Airflow 工作器始终具有容量来运行已加入队列的任务。例如,您可以增加工作器数量或提高工作器并发 (worker_concurrency) 值。您还可以调整并行性或池,以防止队列任务超出您具有的容量。
有时,过时的任务可能会阻止特定 DAG 的执行
在一般情况下,Airflow 调度器应该能够处理这样的情况:队列中存在过时任务,并且由于某些原因导致无法正确执行它们(例如,过时任务所属的 DAG 被删除)。
如果调度器未清除这些过时的任务,则您可能需要手动删除这些任务。例如,您可以在 Airflow 界面中执行此操作 - 您可以导航到菜单 > 浏览器 > 任务实例,找到属于过时 DAG 的已加入队列的任务并将其删除。
如需解决此问题,请将您的环境升级到 Cloud Composer 2.1.12 或更高版本。
Cloud Composer 方法 [scheduler]min_file_process_interval
参数
Cloud Composer 更改了 Airflow 调度器使用 [scheduler]min_file_process_interval
的方式。
Airflow 1
对于使用 Airflow 1 的 Cloud Composer,用户可以将 [scheduler]min_file_process_interval
的值设置为 0 到 600 秒。高于 600 秒的值将产生与将 [scheduler]min_file_process_interval
设置为 600 秒相同的结果。
Airflow 2
在 Airflow 2 中,[scheduler]min_file_process_interval
只能与 1.19.9 和 2.0.26 或更高版本搭配使用
低于 1.19.9 和 2.0.26 的 Cloud Composer 版本
在这些版本中,系统会忽略
[scheduler]min_file_process_interval
。Cloud Composer 版本 1.19.9 或 2.0.26 或更高版本
在调度所有 DAG 的次数达到一定数目后,Airflow 调度器会重启,并且
[scheduler]num_runs
参数用于控制调度器执行此操作的次数。当调度程序到达[scheduler]num_runs
调度循环时,它会重启 - 调度程序是无状态组件,此类重启是对调度程序可能遇到的任何问题的自动修复机制。如果未指定,系统会应用[scheduler]num_runs
的默认值,即 5000。[scheduler]min_file_process_interval
可用于配置 DAG 解析的频率,但此参数不得超过调度器在调度 DAG 时执行[scheduler]num_runs
循环所需的时间。
扩缩 Airflow 配置
Airflow 提供了 Airflow 配置选项,可用于控制 Airflow 可以同时执行的任务数量和 DAG 数量。如需设置这些配置选项,请根据您的环境替换相应值。
-
参数
[celery]worker_concurrency
控制 Airflow 工作器可以同时执行的最大任务数。如果将此参数的值乘以您的 Cloud Composer 环境中的 Airflow 工作器的数量,将得出在您的环境中,在给定的时刻可执行的最大任务数。此数字受[core]parallelism
Airflow 配置选项的限制,之后将进一步说明此内容。在 Cloud Composer 2 环境中,
[celery]worker_concurrency
的默认值会自动计算对于 Airflow 版本 2.3.3 及更高版本,
[celery]worker_concurrency
会设置为 32、12 * worker_CPU 和 8 * worker_memory 中的最小值。对于 Airflow 版本 2.2.5 或更低版本,
[celery]worker_concurrency
设为 12 * 工作器的 CPU 数量。
-
[core]max_active_runs_per_dag
Airflow 配置选项控制每个 DAG 的活跃 DAG 运行次数的上限。如果次数达到上限,则调度器不会继续创建更多 DAG 运行。如果此参数设置不正确,您可能会遇到调度器限制 DAG 执行的问题,因为它无法在给定时刻创建更多 DAG 运行实例。
-
[core]max_active_tasks_per_dag
Airflow 配置选项可控制每个 DAG 中可以并发运行的任务实例数上限。它是 DAG 级别的参数。如果此参数设置不正确,您可能会遇到单个 DAG 实例执行缓慢的问题,因为在给定时间可执行的 DAG 任务数有限。
解决方案:提高
[core]max_active_tasks_per_dag
。 最大并行数量和池大小
[core]parallelism
Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。这是整个 Airflow 设置的全局参数。
任务会排入队列并在池中执行。Cloud Composer 环境仅使用一个池。此池的大小可控制调度器在给定时间可将多少任务加入执行队列。如果池太小,则即使尚未达到阈值(
[core]parallelism
配置选项和[celery]worker_concurrency
配置选项之和乘以 Airflow 工作器数量所得的值),相应调度器也无法将任务加入执行队列。您可以在 Airflow 界面中配置池大小(菜单 > 管理 > 池)。将池大小调整为您的环境中预期的最大并行数量。
通常,
[core]parallelism
设置为工作器数量上限和 [celery]worker_concurrency 的乘积。
由于 DAG 处理器超时,调度器无法调度 DAG
如需详细了解此问题,请参阅排查 DAG 问题。
在达到 dagrun_timeout
后将任务标记为失败
如果 DAG 运行作业未在 dagrun_timeout
(DAG 参数)内完成,调度程序会将未完成的任务(正在运行、已调度和已加入队列的任务)标记为失败。
解决方案:
延长
dagrun_timeout
以满足超时要求。
Airflow 数据库承受负载压力的症状
有时,您可能会在 Airflow 调度程序日志中看到以下警告日志条目:
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
您可能还会在 Airflow 工作器日志中观察到类似的症状:
对于 MySQL:
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
对于 PostgreSQL:
psycopg2.OperationalError: connection to server at ... failed
此类错误或警告可能是 Airflow 数据库因调度器或其他 Airflow 组件(例如工作器、触发器和 Web 服务器)同时执行的打开连接数量或查询数量过多而导致的。
可能的解决方案:
扩容 Airflow 数据库:
- (Cloud Composer 1)更改存储您环境的 Airflow 数据库的 Cloud SQL 实例的机器类型。
- (Cloud Composer 2)调整环境大小。
减少调度器的数量。在大多数情况下,一个或两个调度器就足以解析和调度 Airflow 任务;除非有充分的理由,否则不建议配置超过两个调度器。
避免在 Airflow DAG 中使用全局变量:Cloud Composer 环境变量和 Airflow 变量。
将 [scheduler]scheduler-heartbeat-sec 设置为更大的值,例如 15 秒或更长时间。
将 [scheduler]job-heartbeat-sec 设置为更大的值,例如 30 秒或更长时间。
将 [scheduler]scheduler_health_check_threshold 设置为等于
[scheduler]job-heartbeat-sec
乘以4
的值。
Web 服务器显示“调度器似乎未运行”警告
调度程序会定期向 Airflow 数据库报告其心跳。Airflow Web 服务器会根据这些信息确定调度程序是否处于活动状态。
有时,如果调度程序负载过重,则可能无法每隔 [scheduler]scheduler-heartbeat-sec 报告一次心跳。
在这种情况下,Airflow Web 服务器可能会显示以下警告:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
可能的解决方案:
增加调度器的 CPU 和内存资源。
优化 DAG,以加快其解析和调度速度,并避免消耗过多调度器资源。
避免在 Airflow DAG 中使用全局变量:Cloud Composer 环境变量和 Airflow 变量。
提高 [scheduler]scheduler-health-check-threshold 的值,以便 Web 服务器等待更长时间后再报告调度程序不可用。
针对在回填 DAG 期间遇到的问题的解决方法
有时,您可能需要重新运行已执行的 DAG。您可以使用 Airflow 命令行工具按如下方式执行此操作:
Airflow 1
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
如需仅重新运行特定 DAG 的失败任务,还应使用 --rerun_failed_tasks
参数。
Airflow 2
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
如需仅重新运行特定 DAG 的失败任务,还应使用 --rerun-failed-tasks
参数。
您需要进行如下替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。START_DATE
,其中包含start_date
DAG 参数的值,格式为YYYY-MM-DD
。END_DATE
,其中包含end_date
DAG 参数的值,格式为YYYY-MM-DD
。DAG_NAME
替换为 DAG 名称。
回填操作有时可能会导致死锁情况,即由于任务上存在锁定,无法进行回填。例如:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
在某些情况下,您可以使用以下解决方法来解决死锁问题:
将
[core]schedule-after-task-execution
替换为False
,以替换 mini-Scheduler。针对较短的日期范围运行回填。例如,设置
START_DATE
和END_DATE
以指定仅 1 天的时段。