使用 Cloud Monitoring 监控环境

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

您可以使用 Cloud Monitoring 和 将 Cloud Logging 与 Cloud Composer 搭配使用。

Cloud Monitoring 可帮助您了解云应用的性能、正常运行时间以及总体运行状况。Cloud Monitoring 会从 Cloud Composer 中收集并提取指标、事件和元数据,并在信息中心和图表中生成数据分析。您可以使用 Cloud Monitoring 来了解 Cloud Composer 环境的性能和运行状况以及 Airflow 指标。

Logging 可捕获环境集群中的调度器和工作器容器生成的日志。这些日志包含系统级和 Airflow 依赖项信息,可协助您进行调试。如需了解如何查看日志,请参阅查看 Airflow 日志

准备工作

  • 如需访问 Cloud Composer 环境的相关日志和指标,您需要具备以下权限:

    • 对日志和指标的只读权限logging.viewermonitoring.viewer
    • 对日志(包括私密日志)的只读权限logging.privateLogViewer
    • 对指标的读写权限monitoring.editor

    如需详细了解 Cloud Composer 的其他权限和角色,请参阅访问权限控制

  • 为避免出现重复的日志记录,请为 Google Kubernetes Engine 停用 Cloud Logging。

  • Cloud Logging 会针对 Google Cloud 项目中发生的每个状态和事件生成一个条目。您可以使用 排除项过滤器 来减少日志量,包括 Cloud Logging 为 Cloud Composer 生成的内容

    jobs.py 中排除日志可能会导致健康检查失败和 CrashLoopBackOff 错误。您必须在排除过滤器中添加“-jobs.py” 以防止其被排除。

  • Monitoring 无法为每分钟执行多次的 DAG 和任务绘制计数值图,也无法为失败的任务绘制指标图。

环境指标

您可以使用环境指标来检查 Cloud Composer 环境。

环境运行状况

如需检查环境的运行状况,您可以使用以下运行状况指标:composer.googleapis.com/environment/healthy

Cloud Composer 运行一个名为 airflow_monitoring 的活跃性 DAG, 该命令会按时间表运行,并按如下方式报告环境健康状况:

  • 如果活跃性 DAG 运行成功完成,则运行状况为 True
  • 如果活跃性 DAG 运行失败,则运行状况为 False

活跃性 DAG 存储在 dags/ 文件夹中,并可在以下位置查看: Airflow 界面活跃性 DAG 的频率和内容不可更改,也不得修改。对活跃性 DAG 的更改不会保留。

环境的依赖项检查

Cloud Composer 会定期检查环境是否可以访问 及其运行所需的服务,并且它具有足够的权限 互动环境运行所需的服务示例包括 Artifact Registry、Cloud Logging 和 Cloud Monitoring。

以下指标可用于环境的依赖项检查:

依赖项指标 API 说明
依赖项检查次数 composer.googleapis.com/environment/health/dependency_check_count 此指标跟踪对环境运行所需服务执行可单手操作性检查的次数。
依赖项权限检查次数 composer.googleapis.com/environment/health/dependency_permissions_check_count 此指标跟踪对以下文件执行权限检查的次数: 环境运行所需的服务。

数据库运行状况

如需检查数据库的运行状况,您可以使用以下运行状况指标:composer.googleapis.com/environment/database_health

Airflow 监控 Pod 每分钟对数据库执行一次 ping 操作并报告健康状况 如果可以建立 SQL 连接,则状态为 TrueFalse

数据库指标

以下环境指标适用于 Airflow 元数据 Cloud Composer 环境使用的数据库。您可以使用这些指标来监控环境的数据库实例的性能和资源使用情况。

例如,如果环境接近资源限制,您可能需要升级环境的 Cloud SQL 机器类型。或者,您可能希望优化 支付与 Airflow 元数据数据库的使用相关的费用 数据库清理 以便将存储空间控制在一定的阈值以下

数据库指标 API 说明
数据库 CPU 使用率 composer.googleapis.com/environment/database/cpu/usage_time
数据库 CPU 核心 composer.googleapis.com/environment/database/cpu/reserved_cores
数据库 CPU 利用率 composer.googleapis.com/environment/database/cpu/utilization
数据库内存用量 composer.googleapis.com/environment/database/memory/bytes_used
数据库内存配额 composer.googleapis.com/environment/database/memory/quota
数据库内存利用率 composer.googleapis.com/environment/database/memory/utilization
数据库磁盘使用量 composer.googleapis.com/environment/database/disk/bytes_used
数据库磁盘配额 composer.googleapis.com/environment/database/disk/quota
数据库磁盘利用率 composer.googleapis.com/environment/database/disk/utilization
数据库连接限制 composer.googleapis.com/environment/database/network/max_connections
数据库连接 composer.googleapis.com/environment/database/network/connections
可用于故障切换的数据库 composer.googleapis.com/environment/database/available_for_failover 如果环境的 Cloud SQL 实例处于高风险级别,则为 True 并且已准备好进行故障切换
数据库自动故障切换请求数 composer.googleapis.com/environment/database/auto_failover_request_count 环境的 Cloud SQL 实例的自动故障切换请求总数。

调度器指标

名称 API 说明
活跃调度器 composer.googleapis.com/environment/active_schedulers 活跃调度器实例数。

触发器指标

以下触发器指标专为 Cloud Composer 提供:

名称 API 说明
活跃触发器数 composer.googleapis.com/environment/active_triggerers 活跃触发器实例数。

此外,您还可以通过 Cloud Composer 指标:

名称 API Airflow 中的名称 说明
正在运行的触发器总数 composer.googleapis.com/workload/triggerer/num_running_triggers triggers.running 每个触发器实例的正在运行的触发器数量。
屏蔽型触发器 composer.googleapis.com/environment/trigger/blocking_count triggers.blocked_main_thread 阻塞了主线程的触发器数量(可能是因为未完全异步)。
失败的触发器 composer.googleapis.com/environment/trigger/failed_count triggers.failed 在触发事件之前因出错而失败的触发器数量。
成功的触发器 composer.googleapis.com/environment/trigger/succeeded_count triggers.succeeded 至少触发过一个事件的触发器的数量。

网络服务器指标

以下环境指标适用于 Airflow Web 服务器 Cloud Composer 环境。您可以使用这些指标 检查环境的 Airflow Web 的性能和资源使用情况 服务器实例。

例如,如果网络服务器机器类型资源经常用尽,您可能需要升级网络服务器机器类型

名称 API 说明
网络服务器 CPU 使用率 composer.googleapis.com/environment/web_server/cpu/usage_time
Web 服务器 CPU 配额 composer.googleapis.com/environment/web_server/cpu/reserved_cores
Web 服务器内存用量 composer.googleapis.com/environment/web_server/memory/bytes_used
网络服务器内存配额 composer.googleapis.com/environment/web_server/memory/quota
活跃 Web 服务器 composer.googleapis.com/environment/active_webservers 活跃的 Web 服务器实例数量。

DAG 指标

为了帮助您监控 DAG 运行效率并识别导致高延迟的任务,我们提供了以下 DAG 指标。

DAG 指标 API
DAG 运行次数 composer.googleapis.com/workflow/run_count
每个 DAG 运行的时长 composer.googleapis.com/workflow/run_duration
任务运行次数 composer.googleapis.com/workflow/task/run_count
每次任务运行的时长 composer.googleapis.com/workflow/task/run_duration

Cloud Monitoring 只会显示已完成运行的工作流和任务(无论成功还是失败)的指标。而对于正在运行的工作流和任务,以及没有工作流活动的情况,Stackdriver 不会显示任何数据

Celery Executor 指标

您可以使用以下 Celery 执行程序指标。这些指标可帮助您确定环境中的工作器资源是否足够。

Celery Executor 指标 API
队列中的任务数 composer.googleapis.com/environment/task_queue_length
在线 Celery 工作器数量 composer.googleapis.com/environment/num_celery_workers

Airflow 指标

您可以使用以下 Airflow 指标。这些指标 对应于 Airflow 提供的指标

名称 API Airflow 中的名称 说明
Celery 任务非零退出代码 composer.googleapis.com/environment/celery/execute_command_failure_count celery.execute_command.failure Celery 任务中的非零退出代码数量。
Celery 任务发布超时 composer.googleapis.com/environment/celery/task_timeout_error_count celery.task_timeout_error 向 Celery 代理发布任务时出现的 AirflowTaskTimeout 错误数量。
序列化 DAG 提取时长 composer.googleapis.com/environment/collect_db_dag_duration collect_db_dags 从数据库中提取所有序列化 DAG 所用的时间。
DAG 刷新错误 composer.googleapis.com/environment/dag_callback/exception_count dag.callback_exceptions 从 DAG 回调引发的异常数量。出现这种情况意味着 DAG 回调无法正常运行。
DAG 刷新错误 composer.googleapis.com/environment/dag_file/refresh_error_count dag_file_refresh_error 加载任何 DAG 文件时发生的失败次数。
DAG 文件加载时间 composer.googleapis.com/environment/dag_processing/last_duration dag_processing.last_duration.<dag_file> 加载特定 DAG 文件所需的时间。
DAG 文件处理完成后经过的时间 composer.googleapis.com/environment/dag_processing/last_run_elapsed_time dag_processing.last_run.seconds_ago.<dag_file> 自上次处理 DAG 文件后经过的秒数。
DagFileProcessorManager 失速计数 composer.googleapis.com/environment/dag_processing/manager_stall_count dag_processing.manager_stalls 已暂停的 DagFileProcessorManager 进程的数量。
DAG 解析错误 composer.googleapis.com/environment/dag_processing/parse_error_count dag_processing.import_errors 解析 DAG 文件时生成的错误数量。
运行 DAG 解析进程 composer.googleapis.com/environment/dag_processing/processes dag_processing.processes 当前正在运行的 DAG 解析进程的数量。
处理器超时 composer.googleapis.com/environment/dag_processing/processor_timeout_count dag_processing.processor_timeouts 因用时过长而被终止的文件处理器数量。
扫描和导入所有 DAG 文件所用的时间 composer.googleapis.com/environment/dag_processing/total_parse_time dag_processing.total_parse_time 扫描和导入一次所有 DAG 文件所用的总时间。
当前 DAG 包大小 composer.googleapis.com/environment/dagbag_size dagbag_size 调度器根据其配置运行扫描时发现的 DAG 数量。
未达到服务等级协议 (SLA) 规定的错过电子邮件通知 composer.googleapis.com/environment/email/sla_notification_failure_count sla_email_notification_failure 未能成功发送服务等级协议违规提醒电子邮件的尝试次数。
执行器上的打开槽数 composer.googleapis.com/environment/executor/open_slots executor.open_slots 执行器上的空闲槽数。
执行器上的已加入队列的任务 composer.googleapis.com/environment/executor/queued_tasks executor.queued_tasks 执行器上已加入队列的任务数。
在执行器上运行任务 composer.googleapis.com/environment/executor/running_tasks executor.running_tasks 执行器上正在运行的任务数。
任务实例成功/失败 composer.googleapis.com/environment/finished_task_instance_count ti_failuresti_successes 总体任务实例成功/失败次数。
已启动/已完成的作业 composer.googleapis.com/environment/job/count <job_name>_start<job_name>_end 已开始/已完成的作业(例如 SchedulerJob、LocalTaskJob)的数量。
作业检测信号失败 composer.googleapis.com/environment/job/heartbeat_failure_count <job_name>_heartbeat_failure 作业的失败检测信号次数。
每个运维人员创建的任务数量 composer.googleapis.com/environment/operator/created_task_instance_count task_instance_created-<operator_name> 为给定操作器创建的任务实例数。
运算符执行 composer.googleapis.com/environment/operator/finished_task_instance_count operator_failures_<operator_name>operator_successes_<operator_name> 每个运算符的已完成任务实例数
池中的空闲槽 composer.googleapis.com/environment/pool/open_slots pool.open_slots.<pool_name> 池中开放槽的数量。
池中的队列槽 composer.googleapis.com/environment/pool/queued_slots pool.queued_slots.<pool_name> 池中已加入队列的槽数。
池中的运行槽 composer.googleapis.com/environment/pool/running_slots pool.running_slots.<pool_name> 池中正在运行的槽数。
池中的任务匮乏 composer.googleapis.com/environment/pool/starving_tasks pool.starving_tasks.<pool_name> 池中资源匮乏的任务数。
调度程序关键部分的运行时间 composer.googleapis.com/environment/scheduler/critical_section_duration scheduler.critical_section_duration 在调度程序循环的关键部分所花费的时间。一次只能有一个调度程序进入此循环。
关键部分锁定失败 composer.googleapis.com/environment/scheduler/critical_section_lock_failure_count scheduler.critical_section_busy 调度器进程尝试锁定关键部分(需要将任务发送到执行器)并发现该关键部分被其他进程锁定的次数。
外部终止的任务 composer.googleapis.com/environment/scheduler/task/externally_killed_count scheduler.tasks.killed_externally 外部终止的任务数量。
孤立任务 composer.googleapis.com/environment/scheduler/task/orphan_count scheduler.orphaned_tasks.clearedscheduler.orphaned_tasks.adopted 调度程序清除/采用的孤立任务的数量。
正在运行/饥饿/可执行的任务 composer.googleapis.com/environment/scheduler/tasks scheduler.tasks.runningscheduler.tasks.starvingscheduler.tasks.executable 正在运行/饥饿/可执行的任务数。
调度器检测信号 composer.googleapis.com/environment/scheduler_heartbeat_count scheduler_heartbeat 调度程序心跳。
服务等级协议 (SLA) 回拨通知失败 composer.googleapis.com/environment/sla_callback_notification_failure_count sla_callback_notification_failure 未能按服务等级协议发出回电通知的尝试次数。
智能传感器探测异常失败 composer.googleapis.com/environment/smart_sensor/exception_failures smart_sensor_operator.exception_failures 由上一个智能传感器弹跳循环中的异常导致的失败次数。
智能传感器捕获基础架构故障 composer.googleapis.com/environment/smart_sensor/infra_failures smart_sensor_operator.infra_failures 上一个智能传感器探测循环中的基础架构故障数量。
智能传感器探测异常 composer.googleapis.com/environment/smart_sensor/poked_exception smart_sensor_operator.poked_exception 上一个智能传感器探测循环中的异常数量。
智能传感器成功戳中任务 composer.googleapis.com/environment/smart_sensor/poked_success smart_sensor_operator.poked_success 智能传感器在之前的 poking 循环中触发的新成功任务的数量。
智能传感器戳任务 composer.googleapis.com/environment/smart_sensor/poked_tasks smart_sensor_operator.poked_tasks 智能传感器在上一个探测循环中探测到的任务数量。
之前成功的任务实例 composer.googleapis.com/environment/task_instance/previously_succeeded_count previously_succeeded 之前成功的任务实例数。
已终止的僵尸任务 composer.googleapis.com/environment/zombie_task_killed_count zombies_killed 已终止的僵尸任务数量。
DAG 运行时长 composer.googleapis.com/workflow/dag/run_duration dagrun.duration.success.<dag_id>dagrun.duration.failed.<dag_id> DagRun 达到成功/失败状态所用的时间。
DAG 依赖项检查时长 composer.googleapis.com/workflow/dependency_check_duration dagrun.dependency-check.<dag_id> 检查 DAG 依赖项所需的时间。此指标不同于环境的依赖项和权限检查指标,适用于 DAG
DAG 运行时间表延迟 composer.googleapis.com/workflow/schedule_delay dagrun.schedule_delay.<dag_id> 预定的 DagRun 开始日期与实际的 DagRun 开始日期之间的延迟时间。
已完成的任务 composer.googleapis.com/workflow/task_instance/finished_count ti.finish.<dag_id>.<task_id>.<state> 给定 DAG 中已完成的任务数量。
任务实例运行时长 composer.googleapis.com/workflow/task_instance/run_duration dag.<dag_id>.<task_id>.duration 完成任务所需的时间。
已启动的任务 composer.googleapis.com/workflow/task_instance/started_count ti.start.<dag_id>.<task_id> 给定 DAG 中已启动的任务数量。
任务队列排队时长 composer.googleapis.com/workflow/task_instance/queued_duration dag.<dag_id>.<task_id>.queued_duration 任务在切换到“正在运行”状态之前处于“已排队”状态的时间。
从 DAG 中移除的任务 composer.googleapis.com/workflow/task/removed_from_dag_count task_removed_from_dag.<dag_id> 为指定 DAG 移除的任务数量(即 DAG 中不再存在的任务)。
任务已恢复到 DAG composer.googleapis.com/workflow/task/restored_to_dag_count task_restored_to_dag.<dag_id> 为给定 DAG 恢复的任务数量(即,之前在数据库中处于“已移除”状态的任务实例被添加到 DAG 文件中)。
任务调度延迟 composer.googleapis.com/workflow/task/schedule_delay dagrun.schedule_delay.<dag_id> 从第一个任务 start_date 到 dagrun 预期启动时间之间经过的时间。

针对 Cloud Composer 环境使用 Monitoring

控制台

您可以使用 Metrics Explorer 显示与环境和 DAG 相关的指标:

  • Cloud Composer Environment 资源包含环境指标。

    如需显示特定环境的指标,请按 environment_name 标签过滤指标。您还可以按其他标签过滤,例如环境的位置或映像版本。

  • Cloud Composer Workflow 资源包含 DAG 的指标。

    如需显示特定 DAG 或任务的指标,请执行以下操作: 过滤指标workflow_nametask_name 标签。您还可以按其他标签(例如任务状态或 Airflow 操作名称)进行过滤。

API 和 gcloud

您可以通过 Cloud Monitoring API 和 gcloud monitoring dashboards 命令创建和管理自定义信息中心和微件。如需了解详情,请参阅通过 API 管理信息中心

如需详细了解资源、指标和过滤条件,请参阅 Cloud Monitoring API 的以下功能:

使用 Cloud Monitoring 提醒

您可以创建提醒政策来监控指标的值,当这些指标违反条件时便会通知您。

  1. 在 Google Cloud 控制台中,转到 提醒页面:

    进入提醒

    如果您使用搜索栏查找此页面,请选择子标题为监控的结果。

  2. 如果您尚未创建通知渠道并希望收到通知,请点击修改通知渠道并添加通知渠道。添加渠道后,返回到提醒页面。
  3. 提醒页面中,点击创建政策
  4. 如需选择指标,请展开选择指标菜单,然后执行以下操作:
    1. 如需将菜单限制为相关条目,请在过滤栏中输入 Cloud Composer。如果过滤菜单后没有显示任何结果,请停用仅显示活跃的资源和指标切换开关。
    2. 对于资源类型,请选择 Cloud Composer EnvironmentCloud Composer Workflow
    3. 选择指标类别指标,然后选择应用
  5. 点击下一步
  6. 配置提醒触发器页面中的设置决定了何时触发提醒。 选择条件类型,并在必要时指定阈值。如需了解详情,请参阅创建指标阈值提醒政策
  7. 点击下一步
  8. 可选:如需将通知添加到您的提醒政策中,请点击通知渠道。在对话框中,从菜单中选择一个或多个通知渠道,然后点击确定
  9. 可选:更新突发事件自动关闭持续时间。此字段用于确定在缺少指标数据的情况下 Monitoring 何时关闭突发事件。
  10. 可选:点击文档,然后添加您希望包含在通知消息中的任何信息。
  11. 点击提醒名称,然后输入提醒政策的名称。
  12. 点击 Create Policy(创建政策)。
如需了解详情,请参阅提醒政策

后续步骤