Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
您可以将 Cloud Monitoring 和 Cloud Logging 与 Cloud Composer 搭配使用。
Cloud Monitoring 可帮助您了解云应用的性能、正常运行时间和总体运行状况。Cloud Monitoring 会从 Cloud Composer 中收集并提取指标、事件和元数据,并在信息中心和图表中生成数据洞见。您可以使用 Cloud Monitoring 来了解 Cloud Composer 环境的性能和运行状况以及 Airflow 指标。
Logging 可捕获环境集群中的调度器和工作器容器生成的日志。这些日志包含有助于调试的系统级和 Airflow 依赖项信息。如需了解如何查看日志,请参阅查看 Airflow 日志。
准备工作
如需访问 Cloud Composer 环境的日志和指标,您需要具备以下权限:
- 对日志和指标的只读权限:
logging.viewer
和monitoring.viewer
- 对日志(包括私密日志)的只读权限:
logging.privateLogViewer
- 对指标的读写权限:
monitoring.editor
如需详细了解 Cloud Composer 的其他权限和角色,请参阅访问权限控制。
- 对日志和指标的只读权限:
为避免出现重复的日志记录,请为 Google Kubernetes Engine 停用 Cloud Logging。
Cloud Logging 会针对 Google Cloud 项目中发生的每个状态和事件生成一个条目。您可以使用排除项过滤器来减少日志量,包括 Cloud Logging 为 Cloud Composer 生成的日志。
从
jobs.py
中排除日志可能会导致健康检查失败和CrashLoopBackOff
错误。您必须在排除项过滤器中添加-jobs.py
,以防止它被排除。Monitoring 不能为每分钟执行多次的 DAG 和任务绘制计数值图,也不能为失败任务绘制指标图。
环境指标
您可以使用环境指标来检查 Cloud Composer 环境的资源使用情况和健康状况。
环境健康状况
如需检查环境的健康状况,您可以使用以下健康状况指标:composer.googleapis.com/environment/healthy
。
Cloud Composer 运行一个名为 airflow_monitoring
的活跃性 DAG,该 DAG 按时间表运行,并按如下方式报告环境运行状况:
- 如果活跃性 DAG 运行成功完成,则运行状况为
True
。 - 如果活跃性 DAG 运行失败,则运行状况为
False
。
活跃性 DAG 存储在 dags/
文件夹中,并显示在 Airflow 界面中。活跃性 DAG 的频率和内容不可更改,也不得修改。对活跃性 DAG 的更改不会保留。
环境的依赖项检查
Cloud Composer 会定期检查环境是否可以访问其操作所需的服务,以及它是否具有足够的权限来与这些服务进行交互。环境运行所需的服务示例包括 Artifact Registry、Cloud Logging 和 Cloud Monitoring。
以下指标可用于环境的依赖项检查:
依赖项指标 | API | 说明 |
---|---|---|
依赖性检查的次数 | composer.googleapis.com/environment/health/dependency_check_count |
此指标跟踪对环境运行所需的服务执行可达性检查的次数。 |
依赖项权限检查次数 | composer.googleapis.com/environment/health/dependency_permissions_check_count |
此指标跟踪对环境操作所需服务执行权限检查的次数。 |
数据库健康状况
如需检查数据库的运行状况,可以使用以下运行状况指标:composer.googleapis.com/environment/database_health
。
Airflow 监控 Pod 每分钟对数据库执行一次 ping 操作,如果可以建立 SQL 连接,则报告健康状况为 True
,否则报告为 False
。
数据库指标
以下环境指标适用于 Cloud Composer 环境使用的 Airflow 元数据数据库。您可以使用这些指标来监控环境的数据库实例的性能和资源使用情况。
例如,如果您的环境接近资源限制,您可能需要增加环境的大小。或者,您可能需要执行数据库清理来优化数据库的大小。
数据库指标 | API | 说明 |
---|---|---|
数据库 CPU 用量 |
composer.googleapis.com/environment/database/cpu/usage_time
|
|
数据库 CPU 核心数 |
composer.googleapis.com/environment/database/cpu/reserved_cores
|
|
数据库 CPU 利用率 |
composer.googleapis.com/environment/database/cpu/utilization
|
|
数据库内存用量 |
composer.googleapis.com/environment/database/memory/bytes_used
|
|
数据库内存配额 |
composer.googleapis.com/environment/database/memory/quota
|
|
数据库内存利用率 |
composer.googleapis.com/environment/database/memory/utilization
|
|
数据库磁盘用量 |
composer.googleapis.com/environment/database/disk/bytes_used
|
|
数据库磁盘配额 |
composer.googleapis.com/environment/database/disk/quota
|
|
数据库磁盘利用率 |
composer.googleapis.com/environment/database/disk/utilization
|
|
数据库连接限制 |
composer.googleapis.com/environment/database/network/max_connections
|
|
数据库连接 |
composer.googleapis.com/environment/database/network/connections
|
|
可用于故障切换的数据库 |
composer.googleapis.com/environment/database/available_for_failover
|
如果环境的 Cloud SQL 实例处于高可用性模式且已准备好进行故障切换,则为 True 。 |
数据库自动故障切换请求数 |
composer.googleapis.com/environment/database/auto_failover_request_count
|
环境的 Cloud SQL 实例的自动故障切换请求总数。 |
工作器指标
以下环境指标适用于 Cloud Composer 2 环境使用的 Airflow 工作器。
此指标用于自动扩缩环境中的工作器数量。Pod 横向自动扩缩器会设置此指标,然后 Airflow 工作器设置控制器环境组件使用此指标根据此指标的值增加或减少 Airflow 工作器数量。
工作器指标 | API |
---|---|
缩放系数目标 |
composer.googleapis.com/environment/worker/scale_factor_target |
调度器指标
名称 | API | 说明 |
---|---|---|
活跃调度器 |
composer.googleapis.com/environment/active_schedulers
|
活跃调度器实例数。 |
触发器指标
以下触发器指标专为 Cloud Composer 提供:
名称 | API | 说明 |
---|---|---|
活跃触发器 |
composer.googleapis.com/environment/active_triggerers
|
处于活跃状态的触发器实例的数量。 |
此外,您还可以通过 Cloud Composer 指标获得以下 Airflow 指标:
名称 | API | Airflow 中的名称 | 说明 |
---|---|---|---|
正在运行的触发器总数 |
composer.googleapis.com/workload/triggerer/num_running_triggers
|
triggers.running
|
每个触发器实例正在运行的触发器数。 |
屏蔽型触发器 |
composer.googleapis.com/environment/trigger/blocking_count
|
triggers.blocked_main_thread
|
阻塞主线程的触发器数量(可能是因为未完全异步)。 |
失败的触发器 |
composer.googleapis.com/environment/trigger/failed_count
|
triggers.failed
|
在触发事件之前因出现错误而失败的触发器数量。 |
成功的触发器 |
composer.googleapis.com/environment/trigger/succeeded_count
|
triggers.succeeded
|
至少触发了一个事件的触发器数量。 |
网络服务器指标
以下环境指标适用于 Cloud Composer 环境使用的 Airflow Web 服务器。您可以使用这些指标检查环境的 Airflow Web 服务器实例的性能和资源使用情况。
例如,如果 Web 服务器不断接近资源限制,您可能需要增加 Web 服务器的规模和性能参数。
名称 | API | 说明 |
---|---|---|
Web 服务器 CPU 用量 |
composer.googleapis.com/environment/web_server/cpu/usage_time
|
|
Web 服务器 CPU 配额 |
composer.googleapis.com/environment/web_server/cpu/reserved_cores
|
|
Web 服务器内存用量 |
composer.googleapis.com/environment/web_server/memory/bytes_used
|
|
Web 服务器内存配额 |
composer.googleapis.com/environment/web_server/memory/quota
|
|
活跃 Web 服务器 |
composer.googleapis.com/environment/active_webservers
|
活跃 Web 服务器实例的数量。 |
DAG 指标
为了帮助您监控 DAG 运行效率并识别导致高延迟的任务,我们提供了以下 DAG 指标。
DAG 指标 | API |
---|---|
DAG 运行次数 |
composer.googleapis.com/workflow/run_count |
每个 DAG 运行的时长 |
composer.googleapis.com/workflow/run_duration |
任务运行次数 |
composer.googleapis.com/workflow/task/run_count |
每次任务运行的时长 |
composer.googleapis.com/workflow/task/run_duration |
Cloud Monitoring 只会显示已完成运行的工作流和任务(无论成功还是失败)的指标。而对于正在运行的工作流和任务,以及没有工作流活动的情况,Stackdriver 不会显示任何数据。
Celery Executor 指标
您可以使用以下 Celery 执行程序指标。这些指标可帮助您确定环境中的工作器资源是否足够。
Celery Executor 指标 | API |
---|---|
队列中的任务数 |
composer.googleapis.com/environment/task_queue_length |
在线 Celery 工作器数量 |
composer.googleapis.com/environment/num_celery_workers |
Airflow 指标
您可以使用以下 Airflow 指标。这些指标对应于 Airflow 提供的指标。
名称 | API | Airflow 中的名称 | 说明 |
---|---|---|---|
Celery 任务非零退出代码 |
composer.googleapis.com/environment/celery/execute_command_failure_count
|
celery.execute_command.failure
|
Celery 任务中的非零退出代码数量。 |
Celery 任务发布超时 |
composer.googleapis.com/environment/celery/task_timeout_error_count
|
celery.task_timeout_error
|
将任务发布到 Celery Broker 时引发的 AirflowTaskTimeout 错误数。 |
序列化 DAG 提取时长 |
composer.googleapis.com/environment/collect_db_dag_duration
|
collect_db_dags
|
从数据库中提取所有序列化 DAG 所用的时间。 |
DAG 刷新错误 |
composer.googleapis.com/environment/dag_callback/exception_count
|
dag.callback_exceptions
|
DAG 回调引发的异常数。如果发生这种情况,则表示 DAG 回调无法正常使用。 |
DAG 刷新错误 |
composer.googleapis.com/environment/dag_file/refresh_error_count
|
dag_file_refresh_error
|
加载任何 DAG 文件时发生的失败次数。 |
DAG 文件加载时间 |
composer.googleapis.com/environment/dag_processing/last_duration
|
dag_processing.last_duration.<dag_file>
|
加载特定 DAG 文件所用的时间。 |
距离 DAG 文件处理的时间 |
composer.googleapis.com/environment/dag_processing/last_run_elapsed_time
|
dag_processing.last_run.seconds_ago.<dag_file>
|
自上次处理 DAG 文件以来经过的秒数。 |
DagFileProcessorManager 失速计数 |
composer.googleapis.com/environment/dag_processing/manager_stall_count
|
dag_processing.manager_stalls
|
已停止的 DagFileProcessorManager 进程的数量。 |
DAG 解析错误 |
composer.googleapis.com/environment/dag_processing/parse_error_count
|
dag_processing.import_errors
|
解析 DAG 文件时生成的错误数量。 |
运行 DAG 解析进程 |
composer.googleapis.com/environment/dag_processing/processes
|
dag_processing.processes
|
当前正在运行的 DAG 解析进程的数量。 |
处理器超时 |
composer.googleapis.com/environment/dag_processing/processor_timeout_count
|
dag_processing.processor_timeouts
|
因用时过长而终止的文件处理器数量。 |
扫描和导入所有 DAG 文件所花费的时间 |
composer.googleapis.com/environment/dag_processing/total_parse_time
|
dag_processing.total_parse_time
|
扫描和导入一次所有 DAG 文件所用的总时间。 |
当前 DAG 包大小 |
composer.googleapis.com/environment/dagbag_size
|
dagbag_size
|
调度器根据其配置运行扫描时发现的 DAG 数量。 |
未能按照服务等级协议 (SLA) 接收电子邮件通知 |
composer.googleapis.com/environment/email/sla_notification_failure_count
|
sla_email_notification_failure
|
尝试发送服务等级协议失败的电子邮件通知的次数。 |
在执行器上打开槽 |
composer.googleapis.com/environment/executor/open_slots
|
executor.open_slots
|
执行器上的开放槽数。 |
执行器上的已加入队列的任务 |
composer.googleapis.com/environment/executor/queued_tasks
|
executor.queued_tasks
|
执行器上已加入队列的任务数。 |
在执行器上运行任务 |
composer.googleapis.com/environment/executor/running_tasks
|
executor.running_tasks
|
执行器上正在运行的任务数。 |
任务实例成功/失败 |
composer.googleapis.com/environment/finished_task_instance_count
|
ti_failures 、ti_successes
|
总体任务实例成功/失败次数。 |
已开始/已完成的作业 |
composer.googleapis.com/environment/job/count
|
<job_name>_start 、<job_name>_end
|
已开始/已完成的作业(例如 SchedulerJob、LocalTaskJob)的数量。 |
作业检测信号失败 |
composer.googleapis.com/environment/job/heartbeat_failure_count
|
<job_name>_heartbeat_failure
|
作业的失败检测信号次数。 |
按操作员创建的任务 |
composer.googleapis.com/environment/operator/created_task_instance_count
|
task_instance_created-<operator_name>
|
为给定操作器创建的任务实例数。 |
运算符执行 |
composer.googleapis.com/environment/operator/finished_task_instance_count
|
operator_failures_<operator_name> 、operator_successes_<operator_name>
|
每个操作器的已完成任务实例数 |
池中的开放槽 |
composer.googleapis.com/environment/pool/open_slots
|
pool.open_slots.<pool_name>
|
池中开放槽的数量。 |
池中已加入队列的槽 |
composer.googleapis.com/environment/pool/queued_slots
|
pool.queued_slots.<pool_name>
|
池中已加入队列的槽数。 |
池中的运行槽 |
composer.googleapis.com/environment/pool/running_slots
|
pool.running_slots.<pool_name>
|
池中正在运行的槽数。 |
资源池中的任务匮乏 |
composer.googleapis.com/environment/pool/starving_tasks
|
pool.starving_tasks.<pool_name>
|
池中资源匮乏的任务数。 |
在调度器的关键部分花费的时间 |
composer.googleapis.com/environment/scheduler/critical_section_duration
|
scheduler.critical_section_duration
|
花费在调度器循环关键部分的时间。一次只能有一个调度器进入此循环。 |
关键部分锁定失败 |
composer.googleapis.com/environment/scheduler/critical_section_lock_failure_count
|
scheduler.critical_section_busy
|
调度器进程尝试锁定关键部分(需要将任务发送到执行器)并发现该关键部分被其他进程锁定的次数。 |
外部终止的任务 |
composer.googleapis.com/environment/scheduler/task/externally_killed_count
|
scheduler.tasks.killed_externally
|
在外部终止的任务数。 |
孤立任务 |
composer.googleapis.com/environment/scheduler/task/orphan_count
|
scheduler.orphaned_tasks.cleared 、scheduler.orphaned_tasks.adopted
|
调度程序清除/采用的孤立任务的数量。 |
正在运行/已耗尽/可执行任务 |
composer.googleapis.com/environment/scheduler/tasks
|
scheduler.tasks.running 、scheduler.tasks.starving 、scheduler.tasks.executable
|
正在运行/即将耗尽/可执行任务的数量。 |
调度器检测信号 |
composer.googleapis.com/environment/scheduler_heartbeat_count
|
scheduler_heartbeat
|
调度器检测信号。 |
服务等级协议 (SLA) 回调通知失败 |
composer.googleapis.com/environment/sla_callback_notification_failure_count
|
sla_callback_notification_failure
|
尝试发送失败的服务等级协议 (SLA) 未命中回调通知的次数。 |
智能传感器触发异常失败 |
composer.googleapis.com/environment/smart_sensor/exception_failures
|
smart_sensor_operator.exception_failures
|
由上一个智能传感器弹跳循环中的异常导致的失败次数。 |
智能传感器捕获基础架构故障 |
composer.googleapis.com/environment/smart_sensor/infra_failures
|
smart_sensor_operator.infra_failures
|
上一个智能传感器弹出循环中的基础架构故障次数。 |
智能传感器弹出操作例外情况 |
composer.googleapis.com/environment/smart_sensor/poked_exception
|
smart_sensor_operator.poked_exception
|
上一个智能传感器弹跳循环中的异常数。 |
智能传感器成功戳中任务 |
composer.googleapis.com/environment/smart_sensor/poked_success
|
smart_sensor_operator.poked_success
|
智能传感器在上一个“戳记”循环中新成功的任务数。 |
智能传感器戳记任务 |
composer.googleapis.com/environment/smart_sensor/poked_tasks
|
smart_sensor_operator.poked_tasks
|
智能传感器在上一个“戳记”循环中卡住的任务数。 |
之前成功的任务实例 |
composer.googleapis.com/environment/task_instance/previously_succeeded_count
|
previously_succeeded
|
之前成功的任务实例数。 |
已终止的僵尸任务 |
composer.googleapis.com/environment/zombie_task_killed_count
|
zombies_killed
|
已终止的僵尸任务数量。 |
DAG 运行时长 |
composer.googleapis.com/workflow/dag/run_duration
|
dagrun.duration.success.<dag_id> 、dagrun.duration.failed.<dag_id>
|
DagRun 达到成功/失败状态所用的时间。 |
DAG 依赖项检查时长 |
composer.googleapis.com/workflow/dependency_check_duration
|
dagrun.dependency-check.<dag_id>
|
检查 DAG 依赖项所用的时间。此指标不同于环境的依赖项和权限检查指标,适用于 DAG |
DAG 运行时间表延迟 |
composer.googleapis.com/workflow/schedule_delay
|
dagrun.schedule_delay.<dag_id>
|
预定的 DagRun 开始日期与实际 DagRun 开始日期之间的延迟时间。 |
已完成的任务 |
composer.googleapis.com/workflow/task_instance/finished_count
|
ti.finish.<dag_id>.<task_id>.<state>
|
给定 DAG 中已完成的任务数。 |
任务实例运行时长 |
composer.googleapis.com/workflow/task_instance/run_duration
|
dag.<dag_id>.<task_id>.duration
|
完成一项任务所用的时间。 |
已开始的任务 |
composer.googleapis.com/workflow/task_instance/started_count
|
ti.start.<dag_id>.<task_id>
|
给定 DAG 中已启动的任务的数量。 |
从 DAG 中移除的任务 |
composer.googleapis.com/workflow/task/removed_from_dag_count
|
task_removed_from_dag.<dag_id>
|
为指定 DAG 移除的任务数量(即 DAG 中不再存在的任务)。 |
任务已恢复到 DAG |
composer.googleapis.com/workflow/task/restored_to_dag_count
|
task_restored_to_dag.<dag_id>
|
为给定 DAG 恢复的任务数(即添加到 DAG 文件中之前在数据库中处于“已移除”状态的任务实例)。 |
任务调度延迟 |
composer.googleapis.com/workflow/task/schedule_delay
|
dagrun.schedule_delay.<dag_id>
|
从第一个任务 start_date 到 dagrun 预期启动时间之间经过的时间。 |
对 Cloud Composer 环境使用 Monitoring
控制台
您可以使用 Metrics Explorer 显示与您的环境和 DAG 相关的指标:
API 和 gcloud
您可以通过 Cloud Monitoring API 和 gcloud monitoring dashboards
命令创建和管理自定义信息中心及微件。如需了解详情,请参阅按 API 管理信息中心。
如需详细了解资源、指标和过滤条件,请参阅 Cloud Monitoring API 的参考文档:
使用 Cloud Monitoring 提醒
您可以创建提醒政策来监控指标的值,当这些指标违反条件时便会通知您。
-
在 Google Cloud 控制台中,转到 notifications 提醒页面:
如果您使用搜索栏查找此页面,请选择子标题为监控的结果。
- 如果您尚未创建通知渠道并希望收到通知,请点击修改通知渠道并添加通知渠道。添加渠道后,返回到提醒页面。
- 在提醒页面中,点击创建政策。
- 如需选择指标,请展开选择指标菜单,然后执行以下操作:
- 如需将菜单限制为相关条目,请在过滤栏中输入
Cloud Composer
。如果过滤菜单后没有显示任何结果,请停用仅显示活跃的资源和指标切换开关。 - 对于 Resource type,选择 Cloud Composer Environment 或 Cloud Composer Workflow。
- 选择指标类别和指标,然后选择应用。
- 如需将菜单限制为相关条目,请在过滤栏中输入
- 点击下一步。
- 配置提醒触发器页面中的设置决定了何时触发提醒。 选择条件类型,并在必要时指定阈值。如需了解详情,请参阅创建指标阈值提醒政策。
- 点击下一步。
- 可选:如需将通知添加到您的提醒政策中,请点击通知渠道。在对话框中,从菜单中选择一个或多个通知渠道,然后点击确定。
- 可选:更新突发事件自动关闭持续时间。此字段用于确定在缺少指标数据的情况下 Monitoring 何时关闭突发事件。
- 可选:点击文档,然后添加您希望包含在通知消息中的任何信息。
- 点击提醒名称,然后输入提醒政策的名称。
- 点击 Create Policy(创建政策)。