Cloud Composer 1 | Cloud Composer 2
您可以将 Cloud Monitoring 和 Cloud Logging 与 Cloud Composer 搭配使用。
Cloud Monitoring 可帮助您了解云应用的性能、正常运行时间和总体运行状况。Cloud Monitoring 会从 Cloud Composer 中收集并提取指标、事件和元数据,并在信息中心和图表中生成数据分析。您可以使用 Cloud Monitoring 来了解 Cloud Composer 环境的性能和运行状况以及 Airflow 指标。
Logging 可捕获环境集群中的调度器和工作器容器生成的日志。这些日志包含有助于调试的系统级和 Airflow 依赖项信息。如需了解如何查看日志,请参阅查看 Airflow 日志。
准备工作
如需访问 Cloud Composer 环境的日志和指标,您需要具备以下权限:
- 对日志和指标的只读权限:
logging.viewer
和monitoring.viewer
- 对日志(包括私密日志)的只读权限:
logging.privateLogViewer
- 对指标的读写权限:
monitoring.editor
如需详细了解 Cloud Composer 的其他权限和角色,请参阅访问权限控制。
- 对日志和指标的只读权限:
为避免出现重复的日志记录,请为 Google Kubernetes Engine 停用 Cloud Logging。
Cloud Logging 会针对 Google Cloud 项目中发生的每个状态和事件生成一个条目。您可以使用排除项过滤器来减少日志量,包括 Cloud Logging 为 Cloud Composer 生成的日志。
排除
jobs.py
中的日志可能会导致健康检查失败和CrashLoopBackOff
错误。您必须在排除过滤器中添加-jobs.py
,以免将其排除。Monitoring 无法为每分钟执行多次的 DAG 和任务绘制计数值,也无法为失败任务绘制指标。
环境指标
您可以使用环境指标来检查 Cloud Composer 环境的资源使用情况和运行状况。
环境健康状况
如需检查环境的健康状况,您可以使用以下健康状况指标:composer.googleapis.com/environment/healthy
。
Cloud Composer 会运行一个名为 airflow_monitoring
的活跃性 DAG,该 DAG 按时间表运行并报告环境健康状况,如下所示:
- 如果活跃性 DAG 运行成功完成,则运行状况为
True
。 - 如果活跃性 DAG 运行失败,则运行状况为
False
。
活跃性 DAG 存储在 dags/
文件夹中,并可在 Airflow 界面中查看。活跃性 DAG 的频率和内容不可更改,且不得修改。对活跃性 DAG 的更改不会保留。
环境的依赖项检查
Cloud Composer 会定期检查环境是否可以访问其操作所需的服务,以及它是否具有与这些服务交互的足够权限。环境运行所需的服务示例包括 Artifact Registry、Cloud Logging 和 Cloud Monitoring。
以下指标可用于检查环境的依赖项:
依赖项指标 | API | 说明 |
---|---|---|
依赖项检查次数 | composer.googleapis.com/environment/health/dependency_check_count |
此指标跟踪对环境操作所需的服务执行可达性检查的次数。 |
依赖项权限检查次数 | composer.googleapis.com/environment/health/dependency_permissions_check_count |
此指标跟踪对环境操作所需的服务执行权限检查的次数。 |
数据库健康状况
如需检查数据库的运行状况,您可以使用以下运行状况指标:composer.googleapis.com/environment/database_health
。
Airflow 监控 pod 每分钟会对数据库执行一次 ping 操作。如果可以建立 SQL 连接,则报告运行状况为 True
;如果不能建立 SQL 连接,则报告 False
。
数据库指标
Cloud Composer 环境使用的 Airflow 元数据数据库提供以下环境指标。您可以使用这些指标来监控环境的数据库实例的性能和资源使用情况。
例如,如果您的环境接近资源限制,您可能需要升级环境的 Cloud SQL 机器类型。或者,您可能想要通过执行数据库清理来优化与 Airflow 元数据数据库使用相关的费用,使存储空间低于特定阈值。
数据库指标 | API | 说明 |
---|---|---|
数据库 CPU 使用率 |
composer.googleapis.com/environment/database/cpu/usage_time
|
|
数据库 CPU 核心数 |
composer.googleapis.com/environment/database/cpu/reserved_cores
|
|
数据库 CPU 利用率 |
composer.googleapis.com/environment/database/cpu/utilization
|
|
数据库内存用量 |
composer.googleapis.com/environment/database/memory/bytes_used
|
|
数据库内存配额 |
composer.googleapis.com/environment/database/memory/quota
|
|
数据库内存利用率 |
composer.googleapis.com/environment/database/memory/utilization
|
|
数据库磁盘使用量 |
composer.googleapis.com/environment/database/disk/bytes_used
|
|
数据库磁盘配额 |
composer.googleapis.com/environment/database/disk/quota
|
|
数据库磁盘利用率 |
composer.googleapis.com/environment/database/disk/utilization
|
|
数据库连接限制 |
composer.googleapis.com/environment/database/network/max_connections
|
|
数据库连接 |
composer.googleapis.com/environment/database/network/connections
|
|
可用于故障切换的数据库 |
composer.googleapis.com/environment/database/available_for_failover
|
如果环境的 Cloud SQL 实例处于高可用性模式并已准备好进行故障切换,则该值为 True 。 |
数据库自动故障切换请求数 |
composer.googleapis.com/environment/database/auto_failover_request_count
|
环境的 Cloud SQL 实例的自动故障切换请求总数。 |
调度器指标
名称 | API | 说明 |
---|---|---|
活跃调度器 |
composer.googleapis.com/environment/active_schedulers
|
活跃调度器实例的数量。 |
触发器指标
以下触发器指标专为 Cloud Composer 提供:
名称 | API | 说明 |
---|---|---|
活跃触发器 |
composer.googleapis.com/environment/active_triggerers
|
活跃触发器实例的数量。 |
此外,以下 Airflow 指标可通过 Cloud Composer 指标获得:
名称 | API | Airflow 中的名称 | 说明 |
---|---|---|---|
正在运行的触发器总数 |
composer.googleapis.com/workload/triggerer/num_running_triggers
|
triggers.running
|
每个触发器实例正在运行的触发器数量。 |
屏蔽型触发器 |
composer.googleapis.com/environment/trigger/blocking_count
|
triggers.blocked_main_thread
|
阻塞主线程的触发器数量(可能是因为不是完全异步)。 |
失败的触发器 |
composer.googleapis.com/environment/trigger/failed_count
|
triggers.failed
|
由于发生错误而未能触发事件的触发器数量。 |
成功的触发器 |
composer.googleapis.com/environment/trigger/succeeded_count
|
triggers.succeeded
|
至少触发了一个事件的触发器数量。 |
网络服务器指标
以下环境指标适用于 Cloud Composer 环境使用的 Airflow Web 服务器。您可以使用这些指标来检查环境中的 Airflow Web 服务器实例的性能和资源使用情况。
例如,如果 Web 服务器机器类型总是接近资源限制,您可能需要升级该机器类型。
名称 | API | 说明 |
---|---|---|
Web 服务器 CPU 使用率 |
composer.googleapis.com/environment/web_server/cpu/usage_time
|
|
Web 服务器 CPU 配额 |
composer.googleapis.com/environment/web_server/cpu/reserved_cores
|
|
Web 服务器内存用量 |
composer.googleapis.com/environment/web_server/memory/bytes_used
|
|
Web 服务器内存配额 |
composer.googleapis.com/environment/web_server/memory/quota
|
|
活跃 Web 服务器 |
composer.googleapis.com/environment/active_webservers
|
活跃 Web 服务器实例的数量。 |
DAG 指标
为帮助您监控 DAG 运行的效率并确定导致高延迟的任务,我们提供了以下 DAG 指标。
DAG 指标 | API |
---|---|
DAG 运行次数 |
composer.googleapis.com/workflow/run_count |
每次 DAG 运行的时长 |
composer.googleapis.com/workflow/run_duration |
任务运行次数 |
composer.googleapis.com/workflow/task/run_count |
每次任务运行的时长 |
composer.googleapis.com/workflow/task/run_duration |
Cloud Monitoring 只会显示已完成运行的工作流和任务(无论成功还是失败)的指标。而对于正在运行的工作流和任务,以及没有工作流活动的情况,Stackdriver 不会显示任何数据。
Celery Executor 指标
您可以使用以下 Celery 执行程序指标。这些指标可帮助您确定环境中的工作器资源是否足够。
Celery Executor 指标 | API |
---|---|
队列中的任务数 |
composer.googleapis.com/environment/task_queue_length |
在线 Celery 工作器数量 |
composer.googleapis.com/environment/num_celery_workers |
Airflow 指标
您可以使用以下 Airflow 指标。这些指标对应于 Airflow 提供的指标。
名称 | API | Airflow 中的名称 | 说明 |
---|---|---|---|
Celery 任务非零退出代码 |
composer.googleapis.com/environment/celery/execute_command_failure_count
|
celery.execute_command.failure
|
Celery 任务中的非零退出代码数量。 |
Celery 任务发布超时 |
composer.googleapis.com/environment/celery/task_timeout_error_count
|
celery.task_timeout_error
|
将任务发布到 Celery 代理时引发的 AirflowTaskTimeout 错误数。 |
序列化 DAG 提取时长 |
composer.googleapis.com/environment/collect_db_dag_duration
|
collect_db_dags
|
从数据库中提取所有序列化 DAG 所用的时间。 |
DAG 刷新错误 |
composer.googleapis.com/environment/dag_callback/exception_count
|
dag.callback_exceptions
|
DAG 回调引发的异常数。如果发生这种情况,则表示 DAG 回调无法正常运行。 |
DAG 刷新错误 |
composer.googleapis.com/environment/dag_file/refresh_error_count
|
dag_file_refresh_error
|
加载任何 DAG 文件时失败的次数。 |
DAG 文件加载时间 |
composer.googleapis.com/environment/dag_processing/last_duration
|
dag_processing.last_duration.<dag_file>
|
加载特定 DAG 文件所用的时间。 |
自 DAG 文件处理以来经过的时间 |
composer.googleapis.com/environment/dag_processing/last_run_elapsed_time
|
dag_processing.last_run.seconds_ago.<dag_file>
|
自上次处理 DAG 文件后经过的秒数。 |
DagFileProcessorManager 停顿计数 |
composer.googleapis.com/environment/dag_processing/manager_stall_count
|
dag_processing.manager_stalls
|
停滞的 DagFileProcessorManager 进程的数量。 |
DAG 解析错误 |
composer.googleapis.com/environment/dag_processing/parse_error_count
|
dag_processing.import_errors
|
解析 DAG 文件时生成的错误数。 |
运行 DAG 解析过程 |
composer.googleapis.com/environment/dag_processing/processes
|
dag_processing.processes
|
当前正在运行的 DAG 解析进程的数量。 |
处理器超时 |
composer.googleapis.com/environment/dag_processing/processor_timeout_count
|
dag_processing.processor_timeouts
|
因耗时过长而被终止的文件处理器数量。 |
扫描和导入所有 DAG 文件所用的时间 |
composer.googleapis.com/environment/dag_processing/total_parse_time
|
dag_processing.total_parse_time
|
一次扫描和导入所有 DAG 文件所用的总时间。 |
当前的 DAG 文件包大小 |
composer.googleapis.com/environment/dagbag_size
|
dagbag_size
|
调度器根据其配置运行扫描时找到的 DAG 数量。 |
服务等级协议 (SLA) 失败的电子邮件通知 |
composer.googleapis.com/environment/email/sla_notification_failure_count
|
sla_email_notification_failure
|
尝试传送服务等级协议 (SLA) 失败的电子邮件通知的次数。 |
执行器上的空槽 |
composer.googleapis.com/environment/executor/open_slots
|
executor.open_slots
|
执行器上的开放槽数。 |
执行器上的已加入队列的任务 |
composer.googleapis.com/environment/executor/queued_tasks
|
executor.queued_tasks
|
执行器上已加入队列的任务数。 |
在执行器上运行任务 |
composer.googleapis.com/environment/executor/running_tasks
|
executor.running_tasks
|
执行器上运行的任务数量。 |
任务实例成功/失败 |
composer.googleapis.com/environment/finished_task_instance_count
|
ti_failures 、ti_successes
|
整体任务实例成功/失败。 |
已开始/已完成的作业 |
composer.googleapis.com/environment/job/count
|
<job_name>_start 、<job_name>_end
|
已开始/已完成的作业数,例如 SchedulerJob、LocalTaskJob。 |
作业检测信号失败 |
composer.googleapis.com/environment/job/heartbeat_failure_count
|
<job_name>_heartbeat_failure
|
作业的失败检测信号次数。 |
按操作器创建的任务 |
composer.googleapis.com/environment/operator/created_task_instance_count
|
task_instance_created-<operator_name>
|
为给定运算符创建的任务实例数。 |
运算符执行 |
composer.googleapis.com/environment/operator/finished_task_instance_count
|
operator_failures_<operator_name> 、operator_successes_<operator_name>
|
每个操作员的已完成任务实例数 |
池中的空槽 |
composer.googleapis.com/environment/pool/open_slots
|
pool.open_slots.<pool_name>
|
池中的开放槽数。 |
池中已加入队列的槽 |
composer.googleapis.com/environment/pool/queued_slots
|
pool.queued_slots.<pool_name>
|
池中已加入队列的槽数。 |
池中正在运行的槽 |
composer.googleapis.com/environment/pool/running_slots
|
pool.running_slots.<pool_name>
|
池中正在运行的槽数。 |
池中的饥饿任务 |
composer.googleapis.com/environment/pool/starving_tasks
|
pool.starving_tasks.<pool_name>
|
池中饥饿任务的数量。 |
在调度器的关键部分花费的时间 |
composer.googleapis.com/environment/scheduler/critical_section_duration
|
scheduler.critical_section_duration
|
调度器循环关键部分花费的时间。一次只能有一个调度器进入此循环。 |
严重部分锁定失败 |
composer.googleapis.com/environment/scheduler/critical_section_lock_failure_count
|
scheduler.critical_section_busy
|
调度器进程尝试锁定关键部分(需要将任务发送到执行器)并发现关键部分被其他进程锁定的次数。 |
外部终止的任务 |
composer.googleapis.com/environment/scheduler/task/externally_killed_count
|
scheduler.tasks.killed_externally
|
外部终止的任务数。 |
孤立任务 |
composer.googleapis.com/environment/scheduler/task/orphan_count
|
scheduler.orphaned_tasks.cleared 、scheduler.orphaned_tasks.adopted
|
调度器清除/合并的孤立任务数。 |
运行/耗尽资源/可执行任务 |
composer.googleapis.com/environment/scheduler/tasks
|
scheduler.tasks.running 、scheduler.tasks.starving 、scheduler.tasks.executable
|
正在运行的任务/正在耗尽的任务数/可执行的任务数。 |
调度器检测信号 |
composer.googleapis.com/environment/scheduler_heartbeat_count
|
scheduler_heartbeat
|
调度程序检测信号。 |
SLA 回调失败通知 |
composer.googleapis.com/environment/sla_callback_notification_failure_count
|
sla_callback_notification_failure
|
服务等级协议 (SLA) 未命中的回调通知尝试次数。 |
智能传感器触发异常失败 |
composer.googleapis.com/environment/smart_sensor/exception_failures
|
smart_sensor_operator.exception_failures
|
在上一个智能传感器触发循环中因异常而导致的失败次数。 |
智能传感器探查基础架构故障 |
composer.googleapis.com/environment/smart_sensor/infra_failures
|
smart_sensor_operator.infra_failures
|
上一个智能传感器触发循环中的基础架构故障次数。 |
智能传感器触发异常 |
composer.googleapis.com/environment/smart_sensor/poked_exception
|
smart_sensor_operator.poked_exception
|
上一个智能传感器 Ping 循环中的异常数。 |
智能传感器成功探查任务 |
composer.googleapis.com/environment/smart_sensor/poked_success
|
smart_sensor_operator.poked_success
|
在上一个弹跳循环中,智能传感器最新完成的任务数。 |
智能传感器触碰任务 |
composer.googleapis.com/environment/smart_sensor/poked_tasks
|
smart_sensor_operator.poked_tasks
|
在上一个 Ping 循环中,智能传感器触碰的任务数。 |
之前已成功完成的任务实例 |
composer.googleapis.com/environment/task_instance/previously_succeeded_count
|
previously_succeeded
|
之前成功的任务实例数。 |
杀死僵尸任务 |
composer.googleapis.com/environment/zombie_task_killed_count
|
zombies_killed
|
已终止的僵尸任务数量。 |
DAG 运行时长 |
composer.googleapis.com/workflow/dag/run_duration
|
dagrun.duration.success.<dag_id> 、dagrun.duration.failed.<dag_id>
|
DagRun 达到成功/失败状态所用的时间。 |
DAG 依赖项检查时长 |
composer.googleapis.com/workflow/dependency_check_duration
|
dagrun.dependency-check.<dag_id>
|
检查 DAG 依赖项所用的时间。此指标与环境的依赖项和权限检查指标不同,适用于 DAG |
DAG 运行时间表延迟 |
composer.googleapis.com/workflow/schedule_delay
|
dagrun.schedule_delay.<dag_id>
|
预定的 DagRun 开始日期与实际的 DagRun 开始日期之间的延迟时间。 |
已完成的任务 |
composer.googleapis.com/workflow/task_instance/finished_count
|
ti.finish.<dag_id>.<task_id>.<state>
|
给定 DAG 中已完成的任务数量。 |
任务实例运行时长 |
composer.googleapis.com/workflow/task_instance/run_duration
|
dag.<dag_id>.<task_id>.duration
|
完成任务所用的时间。 |
已启动的任务 |
composer.googleapis.com/workflow/task_instance/started_count
|
ti.start.<dag_id>.<task_id>
|
给定 DAG 中已启动的任务数。 |
从 DAG 中移除的任务 |
composer.googleapis.com/workflow/task/removed_from_dag_count
|
task_removed_from_dag.<dag_id>
|
为给定 DAG 移除的任务数(即 DAG 中已不存在任务)。 |
任务已恢复到 DAG |
composer.googleapis.com/workflow/task/restored_to_dag_count
|
task_restored_to_dag.<dag_id>
|
为指定 DAG 恢复的任务数量(即之前在数据库中处于“已移除”状态的任务实例会添加到 DAG 文件中)。 |
任务安排延迟 |
composer.googleapis.com/workflow/task/schedule_delay
|
dagrun.schedule_delay.<dag_id>
|
第一个任务 start_date 和 dagrun 预期开始之间经过的时间。 |
对 Cloud Composer 环境使用 Monitoring
控制台
您可以使用 Metrics Explorer 显示与您的环境和 DAG 相关的指标:
API 和 gcloud
您可以通过 Cloud Monitoring API 和 gcloud monitoring dashboards
命令创建和管理自定义信息中心及微件。如需了解详情,请参阅按 API 管理信息中心。
如需详细了解资源、指标和过滤条件,请参阅 Cloud Monitoring API 的参考文档:
使用 Cloud Monitoring 提醒
您可以创建提醒政策来监控指标的值,当这些指标违反条件时便会通知您。
-
在 Google Cloud 控制台的导航面板中,选择 Monitoring,然后选择 notifications 提醒:
- 如果您尚未创建通知渠道并希望收到通知,请点击修改通知渠道并添加通知渠道。添加渠道后,返回到提醒页面。
- 在提醒页面中,点击创建政策。
- 如需选择指标,请展开选择指标菜单,然后执行以下操作:
- 如需将菜单限制为相关条目,请在过滤栏中输入
Cloud Composer
。如果过滤菜单后没有显示任何结果,请停用仅显示活跃的资源和指标切换开关。 - 对于 Resource type,选择 Cloud Composer Environment 或 Cloud Composer Workflow。
- 选择指标类别和指标,然后选择应用。
- 如需将菜单限制为相关条目,请在过滤栏中输入
- 点击下一步。
- 配置提醒触发器页面中的设置决定了何时触发提醒。选择条件类型,并在必要时指定阈值。如需了解详情,请参阅创建指标阈值提醒政策。
- 点击下一步。
- 可选:如需将通知添加到您的提醒政策中,请点击通知渠道。在对话框中,从菜单中选择一个或多个通知渠道,然后点击确定。
- 可选:更新突发事件自动关闭持续时间。此字段用于确定在缺少指标数据的情况下 Monitoring 何时关闭突发事件。
- 可选:点击文档,然后添加您希望包含在通知消息中的任何信息。
- 点击提醒名称,然后输入提醒政策的名称。
- 点击 Create Policy(创建政策)。