使用 Cloud Monitoring 监控环境

Cloud Composer 1 | Cloud Composer 2

您可以将 Cloud MonitoringCloud Logging 与 Cloud Composer 搭配使用。

Cloud Monitoring 可帮助您了解云应用的性能、正常运行时间和总体运行状况。Cloud Monitoring 会从 Cloud Composer 中收集并提取指标、事件和元数据,并在信息中心和图表中生成数据分析。您可以使用 Cloud Monitoring 来了解 Cloud Composer 环境的性能和运行状况以及 Airflow 指标。

Logging 可捕获环境集群中的调度器和工作器容器生成的日志。这些日志包含有助于调试的系统级和 Airflow 依赖项信息。如需了解如何查看日志,请参阅查看 Airflow 日志

准备工作

  • 如需访问 Cloud Composer 环境的日志和指标,您需要具备以下权限:

    • 对日志和指标的只读权限logging.viewermonitoring.viewer
    • 对日志(包括私密日志)的只读权限logging.privateLogViewer
    • 对指标的读写权限monitoring.editor

    如需详细了解 Cloud Composer 的其他权限和角色,请参阅访问权限控制

  • 为避免出现重复的日志记录,请为 Google Kubernetes Engine 停用 Cloud Logging。

  • Cloud Logging 会针对 Google Cloud 项目中发生的每个状态和事件生成一个条目。您可以使用排除项过滤器来减少日志量,包括 Cloud Logging 为 Cloud Composer 生成的日志。

    排除 jobs.py 中的日志可能会导致健康检查失败和 CrashLoopBackOff 错误。您必须在排除过滤器中添加 -jobs.py,以免将其排除。

  • Monitoring 无法为每分钟执行多次的 DAG 和任务绘制计数值,也无法为失败任务绘制指标。

环境指标

您可以使用环境指标来检查 Cloud Composer 环境的资源使用情况和运行状况。

环境健康状况

如需检查环境的健康状况,您可以使用以下健康状况指标:composer.googleapis.com/environment/healthy

Cloud Composer 会运行一个名为 airflow_monitoring 的活跃性 DAG,该 DAG 按时间表运行并报告环境健康状况,如下所示:

  • 如果活跃性 DAG 运行成功完成,则运行状况为 True
  • 如果活跃性 DAG 运行失败,则运行状况为 False

活跃性 DAG 存储在 dags/ 文件夹中,并可在 Airflow 界面中查看。活跃性 DAG 的频率和内容不可更改,且不得修改。对活跃性 DAG 的更改不会保留。

环境的依赖项检查

Cloud Composer 会定期检查环境是否可以访问其操作所需的服务,以及它是否具有与这些服务交互的足够权限。环境运行所需的服务示例包括 Artifact Registry、Cloud Logging 和 Cloud Monitoring。

以下指标可用于检查环境的依赖项:

依赖项指标 API 说明
依赖项检查次数 composer.googleapis.com/environment/health/dependency_check_count 此指标跟踪对环境操作所需的服务执行可达性检查的次数。
依赖项权限检查次数 composer.googleapis.com/environment/health/dependency_permissions_check_count 此指标跟踪对环境操作所需的服务执行权限检查的次数。

数据库健康状况

如需检查数据库的运行状况,您可以使用以下运行状况指标:composer.googleapis.com/environment/database_health

Airflow 监控 pod 每分钟会对数据库执行一次 ping 操作。如果可以建立 SQL 连接,则报告运行状况为 True;如果不能建立 SQL 连接,则报告 False

数据库指标

Cloud Composer 环境使用的 Airflow 元数据数据库提供以下环境指标。您可以使用这些指标来监控环境的数据库实例的性能和资源使用情况。

例如,如果您的环境接近资源限制,您可能需要升级环境的 Cloud SQL 机器类型。或者,您可能想要通过执行数据库清理来优化与 Airflow 元数据数据库使用相关的费用,使存储空间低于特定阈值

数据库指标 API 说明
数据库 CPU 使用率 composer.googleapis.com/environment/database/cpu/usage_time
数据库 CPU 核心数 composer.googleapis.com/environment/database/cpu/reserved_cores
数据库 CPU 利用率 composer.googleapis.com/environment/database/cpu/utilization
数据库内存用量 composer.googleapis.com/environment/database/memory/bytes_used
数据库内存配额 composer.googleapis.com/environment/database/memory/quota
数据库内存利用率 composer.googleapis.com/environment/database/memory/utilization
数据库磁盘使用量 composer.googleapis.com/environment/database/disk/bytes_used
数据库磁盘配额 composer.googleapis.com/environment/database/disk/quota
数据库磁盘利用率 composer.googleapis.com/environment/database/disk/utilization
数据库连接限制 composer.googleapis.com/environment/database/network/max_connections
数据库连接 composer.googleapis.com/environment/database/network/connections
可用于故障切换的数据库 composer.googleapis.com/environment/database/available_for_failover 如果环境的 Cloud SQL 实例处于高可用性模式并已准备好进行故障切换,则该值为 True
数据库自动故障切换请求数 composer.googleapis.com/environment/database/auto_failover_request_count 环境的 Cloud SQL 实例的自动故障切换请求总数。

调度器指标

名称 API 说明
活跃调度器 composer.googleapis.com/environment/active_schedulers 活跃调度器实例的数量。

触发器指标

以下触发器指标专为 Cloud Composer 提供:

名称 API 说明
活跃触发器 composer.googleapis.com/environment/active_triggerers 活跃触发器实例的数量。

此外,以下 Airflow 指标可通过 Cloud Composer 指标获得:

名称 API Airflow 中的名称 说明
正在运行的触发器总数 composer.googleapis.com/workload/triggerer/num_running_triggers triggers.running 每个触发器实例正在运行的触发器数量。
屏蔽型触发器 composer.googleapis.com/environment/trigger/blocking_count triggers.blocked_main_thread 阻塞主线程的触发器数量(可能是因为不是完全异步)。
失败的触发器 composer.googleapis.com/environment/trigger/failed_count triggers.failed 由于发生错误而未能触发事件的触发器数量。
成功的触发器 composer.googleapis.com/environment/trigger/succeeded_count triggers.succeeded 至少触发了一个事件的触发器数量。

网络服务器指标

以下环境指标适用于 Cloud Composer 环境使用的 Airflow Web 服务器。您可以使用这些指标来检查环境中的 Airflow Web 服务器实例的性能和资源使用情况。

例如,如果 Web 服务器机器类型总是接近资源限制,您可能需要升级该机器类型

名称 API 说明
Web 服务器 CPU 使用率 composer.googleapis.com/environment/web_server/cpu/usage_time
Web 服务器 CPU 配额 composer.googleapis.com/environment/web_server/cpu/reserved_cores
Web 服务器内存用量 composer.googleapis.com/environment/web_server/memory/bytes_used
Web 服务器内存配额 composer.googleapis.com/environment/web_server/memory/quota
活跃 Web 服务器 composer.googleapis.com/environment/active_webservers 活跃 Web 服务器实例的数量。

DAG 指标

为帮助您监控 DAG 运行的效率并确定导致高延迟的任务,我们提供了以下 DAG 指标。

DAG 指标 API
DAG 运行次数 composer.googleapis.com/workflow/run_count
每次 DAG 运行的时长 composer.googleapis.com/workflow/run_duration
任务运行次数 composer.googleapis.com/workflow/task/run_count
每次任务运行的时长 composer.googleapis.com/workflow/task/run_duration

Cloud Monitoring 只会显示已完成运行的工作流和任务(无论成功还是失败)的指标。而对于正在运行的工作流和任务,以及没有工作流活动的情况,Stackdriver 不会显示任何数据

Celery Executor 指标

您可以使用以下 Celery 执行程序指标。这些指标可帮助您确定环境中的工作器资源是否足够。

Celery Executor 指标 API
队列中的任务数 composer.googleapis.com/environment/task_queue_length
在线 Celery 工作器数量 composer.googleapis.com/environment/num_celery_workers

Airflow 指标

您可以使用以下 Airflow 指标。这些指标对应于 Airflow 提供的指标

名称 API Airflow 中的名称 说明
Celery 任务非零退出代码 composer.googleapis.com/environment/celery/execute_command_failure_count celery.execute_command.failure Celery 任务中的非零退出代码数量。
Celery 任务发布超时 composer.googleapis.com/environment/celery/task_timeout_error_count celery.task_timeout_error 将任务发布到 Celery 代理时引发的 AirflowTaskTimeout 错误数。
序列化 DAG 提取时长 composer.googleapis.com/environment/collect_db_dag_duration collect_db_dags 从数据库中提取所有序列化 DAG 所用的时间。
DAG 刷新错误 composer.googleapis.com/environment/dag_callback/exception_count dag.callback_exceptions DAG 回调引发的异常数。如果发生这种情况,则表示 DAG 回调无法正常运行。
DAG 刷新错误 composer.googleapis.com/environment/dag_file/refresh_error_count dag_file_refresh_error 加载任何 DAG 文件时失败的次数。
DAG 文件加载时间 composer.googleapis.com/environment/dag_processing/last_duration dag_processing.last_duration.<dag_file> 加载特定 DAG 文件所用的时间。
自 DAG 文件处理以来经过的时间 composer.googleapis.com/environment/dag_processing/last_run_elapsed_time dag_processing.last_run.seconds_ago.<dag_file> 自上次处理 DAG 文件后经过的秒数。
DagFileProcessorManager 停顿计数 composer.googleapis.com/environment/dag_processing/manager_stall_count dag_processing.manager_stalls 停滞的 DagFileProcessorManager 进程的数量。
DAG 解析错误 composer.googleapis.com/environment/dag_processing/parse_error_count dag_processing.import_errors 解析 DAG 文件时生成的错误数。
运行 DAG 解析过程 composer.googleapis.com/environment/dag_processing/processes dag_processing.processes 当前正在运行的 DAG 解析进程的数量。
处理器超时 composer.googleapis.com/environment/dag_processing/processor_timeout_count dag_processing.processor_timeouts 因耗时过长而被终止的文件处理器数量。
扫描和导入所有 DAG 文件所用的时间 composer.googleapis.com/environment/dag_processing/total_parse_time dag_processing.total_parse_time 一次扫描和导入所有 DAG 文件所用的总时间。
当前的 DAG 文件包大小 composer.googleapis.com/environment/dagbag_size dagbag_size 调度器根据其配置运行扫描时找到的 DAG 数量。
服务等级协议 (SLA) 失败的电子邮件通知 composer.googleapis.com/environment/email/sla_notification_failure_count sla_email_notification_failure 尝试传送服务等级协议 (SLA) 失败的电子邮件通知的次数。
执行器上的空槽 composer.googleapis.com/environment/executor/open_slots executor.open_slots 执行器上的开放槽数。
执行器上的已加入队列的任务 composer.googleapis.com/environment/executor/queued_tasks executor.queued_tasks 执行器上已加入队列的任务数。
在执行器上运行任务 composer.googleapis.com/environment/executor/running_tasks executor.running_tasks 执行器上运行的任务数量。
任务实例成功/失败 composer.googleapis.com/environment/finished_task_instance_count ti_failuresti_successes 整体任务实例成功/失败。
已开始/已完成的作业 composer.googleapis.com/environment/job/count <job_name>_start<job_name>_end 已开始/已完成的作业数,例如 SchedulerJob、LocalTaskJob。
作业检测信号失败 composer.googleapis.com/environment/job/heartbeat_failure_count <job_name>_heartbeat_failure 作业的失败检测信号次数。
按操作器创建的任务 composer.googleapis.com/environment/operator/created_task_instance_count task_instance_created-<operator_name> 为给定运算符创建的任务实例数。
运算符执行 composer.googleapis.com/environment/operator/finished_task_instance_count operator_failures_<operator_name>operator_successes_<operator_name> 每个操作员的已完成任务实例数
池中的空槽 composer.googleapis.com/environment/pool/open_slots pool.open_slots.<pool_name> 池中的开放槽数。
池中已加入队列的槽 composer.googleapis.com/environment/pool/queued_slots pool.queued_slots.<pool_name> 池中已加入队列的槽数。
池中正在运行的槽 composer.googleapis.com/environment/pool/running_slots pool.running_slots.<pool_name> 池中正在运行的槽数。
池中的饥饿任务 composer.googleapis.com/environment/pool/starving_tasks pool.starving_tasks.<pool_name> 池中饥饿任务的数量。
在调度器的关键部分花费的时间 composer.googleapis.com/environment/scheduler/critical_section_duration scheduler.critical_section_duration 调度器循环关键部分花费的时间。一次只能有一个调度器进入此循环。
严重部分锁定失败 composer.googleapis.com/environment/scheduler/critical_section_lock_failure_count scheduler.critical_section_busy 调度器进程尝试锁定关键部分(需要将任务发送到执行器)并发现关键部分被其他进程锁定的次数。
外部终止的任务 composer.googleapis.com/environment/scheduler/task/externally_killed_count scheduler.tasks.killed_externally 外部终止的任务数。
孤立任务 composer.googleapis.com/environment/scheduler/task/orphan_count scheduler.orphaned_tasks.clearedscheduler.orphaned_tasks.adopted 调度器清除/合并的孤立任务数。
运行/耗尽资源/可执行任务 composer.googleapis.com/environment/scheduler/tasks scheduler.tasks.runningscheduler.tasks.starvingscheduler.tasks.executable 正在运行的任务/正在耗尽的任务数/可执行的任务数。
调度器检测信号 composer.googleapis.com/environment/scheduler_heartbeat_count scheduler_heartbeat 调度程序检测信号。
SLA 回调失败通知 composer.googleapis.com/environment/sla_callback_notification_failure_count sla_callback_notification_failure 服务等级协议 (SLA) 未命中的回调通知尝试次数。
智能传感器触发异常失败 composer.googleapis.com/environment/smart_sensor/exception_failures smart_sensor_operator.exception_failures 在上一个智能传感器触发循环中因异常而导致的失败次数。
智能传感器探查基础架构故障 composer.googleapis.com/environment/smart_sensor/infra_failures smart_sensor_operator.infra_failures 上一个智能传感器触发循环中的基础架构故障次数。
智能传感器触发异常 composer.googleapis.com/environment/smart_sensor/poked_exception smart_sensor_operator.poked_exception 上一个智能传感器 Ping 循环中的异常数。
智能传感器成功探查任务 composer.googleapis.com/environment/smart_sensor/poked_success smart_sensor_operator.poked_success 在上一个弹跳循环中,智能传感器最新完成的任务数。
智能传感器触碰任务 composer.googleapis.com/environment/smart_sensor/poked_tasks smart_sensor_operator.poked_tasks 在上一个 Ping 循环中,智能传感器触碰的任务数。
之前已成功完成的任务实例 composer.googleapis.com/environment/task_instance/previously_succeeded_count previously_succeeded 之前成功的任务实例数。
杀死僵尸任务 composer.googleapis.com/environment/zombie_task_killed_count zombies_killed 已终止的僵尸任务数量。
DAG 运行时长 composer.googleapis.com/workflow/dag/run_duration dagrun.duration.success.<dag_id>dagrun.duration.failed.<dag_id> DagRun 达到成功/失败状态所用的时间。
DAG 依赖项检查时长 composer.googleapis.com/workflow/dependency_check_duration dagrun.dependency-check.<dag_id> 检查 DAG 依赖项所用的时间。此指标与环境的依赖项和权限检查指标不同,适用于 DAG
DAG 运行时间表延迟 composer.googleapis.com/workflow/schedule_delay dagrun.schedule_delay.<dag_id> 预定的 DagRun 开始日期与实际的 DagRun 开始日期之间的延迟时间。
已完成的任务 composer.googleapis.com/workflow/task_instance/finished_count ti.finish.<dag_id>.<task_id>.<state> 给定 DAG 中已完成的任务数量。
任务实例运行时长 composer.googleapis.com/workflow/task_instance/run_duration dag.<dag_id>.<task_id>.duration 完成任务所用的时间。
已启动的任务 composer.googleapis.com/workflow/task_instance/started_count ti.start.<dag_id>.<task_id> 给定 DAG 中已启动的任务数。
从 DAG 中移除的任务 composer.googleapis.com/workflow/task/removed_from_dag_count task_removed_from_dag.<dag_id> 为给定 DAG 移除的任务数(即 DAG 中已不存在任务)。
任务已恢复到 DAG composer.googleapis.com/workflow/task/restored_to_dag_count task_restored_to_dag.<dag_id> 为指定 DAG 恢复的任务数量(即之前在数据库中处于“已移除”状态的任务实例会添加到 DAG 文件中)。
任务安排延迟 composer.googleapis.com/workflow/task/schedule_delay dagrun.schedule_delay.<dag_id> 第一个任务 start_date 和 dagrun 预期开始之间经过的时间。

对 Cloud Composer 环境使用 Monitoring

控制台

您可以使用 Metrics Explorer 显示与您的环境和 DAG 相关的指标:

  • Cloud Composer Environment 资源包含环境指标。

    如需显示特定环境的指标,请按 environment_name 标签过滤指标。您还可以按其他标签(例如环境位置或映像版本)进行过滤。

  • Cloud Composer 工作流资源包含 DAG 指标。

    如需显示特定 DAG 或任务的指标,请按 workflow_nametask_name 标签过滤指标。您还可以按其他标签(例如任务状态或 Airflow 操作器名称)进行过滤。

API 和 gcloud

您可以通过 Cloud Monitoring API 和 gcloud monitoring dashboards 命令创建和管理自定义信息中心及微件。如需了解详情,请参阅按 API 管理信息中心

如需详细了解资源、指标和过滤条件,请参阅 Cloud Monitoring API 的参考文档:

使用 Cloud Monitoring 提醒

您可以创建提醒政策来监控指标的值,当这些指标违反条件时便会通知您。

  1. 在 Google Cloud 控制台的导航面板中,选择 Monitoring,然后选择  提醒

    进入提醒

  2. 如果您尚未创建通知渠道并希望收到通知,请点击修改通知渠道并添加通知渠道。添加渠道后,返回到提醒页面。
  3. 提醒页面中,点击创建政策
  4. 如需选择指标,请展开选择指标菜单,然后执行以下操作:
    1. 如需将菜单限制为相关条目,请在过滤栏中输入 Cloud Composer。如果过滤菜单后没有显示任何结果,请停用仅显示活跃的资源和指标切换开关。
    2. 对于 Resource type,选择 Cloud Composer EnvironmentCloud Composer Workflow
    3. 选择指标类别指标,然后选择应用
  5. 点击下一步
  6. 配置提醒触发器页面中的设置决定了何时触发提醒。选择条件类型,并在必要时指定阈值。如需了解详情,请参阅创建指标阈值提醒政策
  7. 点击下一步
  8. 可选:如需将通知添加到您的提醒政策中,请点击通知渠道。在对话框中,从菜单中选择一个或多个通知渠道,然后点击确定
  9. 可选:更新突发事件自动关闭持续时间。此字段用于确定在缺少指标数据的情况下 Monitoring 何时关闭突发事件。
  10. 可选:点击文档,然后添加您希望包含在通知消息中的任何信息。
  11. 点击提醒名称,然后输入提醒政策的名称。
  12. 点击 Create Policy(创建政策)。
如需了解详情,请参阅提醒政策

后续步骤