在 Monitoring 信息中心内,使用关键指标监控环境的健康状况和性能

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何使用 Monitoring 信息中心内的关键指标监控 Cloud Composer 环境的整体运行状况和性能。

简介

本教程重点介绍重要的 Cloud Composer 监控指标,这些指标可让您全面了解环境级别的运行状况和性能。

Cloud Composer 提供了多个描述环境总体状态的指标。本教程中的监控准则基于 Cloud Composer 环境的 Monitoring 信息中心内提供的指标。

在本教程中,您将了解作为环境性能和运行状况问题的主要指标的关键指标,以及将每个指标解读为纠正措施以保持环境运行状况的准则。您还将为每个指标设置提醒规则,运行示例 DAG,并使用这些指标和提醒来优化环境的性能。

目标

费用

本教程使用 Google Cloud 的以下收费组件:

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

须知事项

本部分介绍了在开始学习本教程之前需要执行的操作。

创建和配置项目

在本教程中,您需要一个 Google Cloud 项目。通过以下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建项目

    转到项目选择器

  2. 确保您的项目已启用结算功能。 了解如何查看项目是否已启用结算功能

  3. 请确保您的 Google Cloud 项目用户具有以下角色,以便创建必要的资源:

    • Environment and Storage Object Administrator (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)
    • Monitoring 编辑器 (roles/monitoring.editor)

为您的项目启用 API

启用 Cloud Composer API。

启用 API

创建 Cloud Composer 环境

创建一个 Cloud Composer 2 环境

在此过程中,您要向 Composer Service Agent 帐号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色。Cloud Composer 使用此帐号在您的 Google Cloud 项目中执行操作。

探索有关环境级别的运行状况和性能的关键指标

本教程重点介绍关键指标,这些指标可让您全面了解环境的整体运行状况和性能。

Google Cloud 控制台中的 Monitoring 信息中心包含各种指标和图表,可让您监控环境中的趋势并识别 Airflow 组件和 Cloud Composer 资源存在的问题。

每个 Cloud Composer 环境都有自己的 Monitoring 信息中心。

请熟悉以下关键指标,并在 Monitoring 信息中心内找到每个指标:

  1. 在 Google Cloud 控制台中,转到环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到监控标签页。

  4. 选择概览部分,在信息中心内找到环境概览项,并观察环境运行状况(Airflow 监控 DAG)指标。

    • 此时间轴显示了 Cloud Composer 环境的运行状况。环境运行状况条的绿色表示环境运行状况良好,而环境运行状况不佳则以红色表示。

    • Cloud Composer 每隔几分钟就会执行一个名为 airflow_monitoring 的活跃性 DAG。如果活跃性 DAG 运行成功完成,则运行状况为 True。如果活跃性 DAG 运行失败(例如,由于 Pod 逐出、外部进程终止或维护),则运行状况为 False

  5. 选择 SQL 数据库部分,在信息中心内找到数据库健康状况项,然后观察数据库健康状况指标。

    • 此时间轴显示与环境的 Cloud SQL 实例的连接状态。绿色的数据库运行状况条表示连接,而连接失败时则用红色表示。

    • Airflow 监控 Pod 会定期对数据库执行 ping 操作,如果可以建立连接,则将运行状况报告为 True;如果不能建立连接,则报告 False 状态。

  6. 数据库运行状况项中,观察数据库 CPU 使用率数据库内存使用率指标。

    • 数据库 CPU 使用率图表显示了您环境中的 Cloud SQL 数据库实例 CPU 核心用量与可用数据库 CPU 总数限制的对比情况。

    • 数据库内存用量图表显示了您的环境中的 Cloud SQL 数据库实例的内存用量与可用数据库内存总量限制的对比情况。

  7. 选择调度器部分,在信息中心内找到调度器检测信号项,然后观察调度器检测信号指标。

    • 此时间轴显示 Airflow 调度器的运行状况。检查红色区域以确定 Airflow 调度器问题。如果您的环境中有多个调度器,只要至少有一个调度器在响应,检测信号状态就处于良好状态。

    • 如果在当前时间前超过 30 秒(默认值)收到最后一个检测信号,则调度程序会被视为运行状况不佳。

  8. 选择 DAG 统计信息部分,在信息中心内找到终止的僵尸任务项,然后观察终止的僵尸任务指标。

    • 此图表显示了较短时间内终止的僵尸任务数量。僵尸任务通常是由 Airflow 进程的外部终止(例如任务的进程终止)引起的。

    • Airflow 调度器会定期终止僵尸任务,如此图表所示。

  9. 选择工作器部分,在信息中心内找到工作器容器重启项,然后观察工作器容器重启指标。

    • 一张图表显示了各个工作器容器的重启总次数。容器重启次数过多可能会影响使用它作为依赖项的服务或其他下游服务的可用性。

了解针对关键指标的基准和可能的纠正措施

以下列表介绍了可以指出问题的基准值,并提供了解决这些问题可采取的纠正措施。

  • 环境运行状况(Airflow 监控 DAG)

    • 在 4 小时长的时间范围内,成功率低于 90%

    • 故障可能意味着由于环境过载或故障而导致 Pod 逐出或工作器终止。环境运行状况时间轴上的红色区域通常与各个环境组件的其他运行状况条中的红色区域相关。通过查看 Monitoring 信息中心内的其他指标来确定根本原因。

  • 数据库健康状况

    • 在 4 小时长的时间范围内,成功率低于 95%

    • 失败表示与 Airflow 数据库的连接存在问题,如果数据库过载(例如,CPU 或内存用量较高,或者连接到数据库时的延迟时间较长),数据库崩溃或停机可能会导致数据库崩溃或停机。这些症状通常是由非最佳 DAG 引起的,例如,当 DAG 使用许多全局定义的 Airflow 或环境变量时。查看 SQL 数据库资源使用情况指标,找出根本原因。您还可以检查调度器日志,查找与数据库连接相关的错误

  • 数据库 CPU 和内存用量

    • 在 12 小时的时段内,平均 CPU 或内存使用率超过 80%

    • 数据库可能过载。分析 DAG 运行与数据库 CPU 或内存使用量峰值之间的相关性。

  • 调度程序检测信号

    • 在 4 小时长的时间范围内,成功率低于 90%

    • 为调度器分配更多资源,或将调度器数量从 1 增加到 2(推荐)。

  • 已终止的僵尸任务

    • 每 24 小时有多个僵尸任务

    • 执行僵尸任务的最常见原因是环境的集群中的 CPU 或内存资源短缺。查看工作器资源使用情况图并为工作器分配更多资源,或增加僵尸任务超时,使调度程序等待更长时间才能将任务视为僵尸。

  • 工作器容器重启

    • 每 24 小时多次重启

    • 最常见的原因是工作器内存不足或存储空间不足。调查工作器资源消耗情况,并为工作器分配更多内存或存储空间。如果资源不足不是原因所在,请查看排查工作器重启事件问题,并使用 Logging 查询找出工作器重启的原因。

创建通知渠道

按照创建通知渠道中的说明来创建电子邮件通知渠道。

如需详细了解通知渠道,请参阅管理通知渠道

创建提醒政策

根据本教程前面部分中提供的基准创建提醒政策,以持续监控指标的值,并在这些指标违反条件时接收通知。

控制台

您可以通过点击相应项角落的铃铛图标为 Monitoring 信息中心中显示的每个指标设置提醒:

为 Monitoring 信息中心内显示的指标创建提醒
图 1.为监控信息中心内显示的指标创建提醒(点击可放大)
  1. 在 Monitoring 信息中心中找到要监控的每个指标,然后点击指标项一角的铃铛图标。此时将打开创建提醒政策页面。

  2. 转换数据部分中,执行以下操作:

    1. 按照指标的提醒政策配置中的说明,配置在每个时序内部分。

    2. 点击下一步,然后按照指标的提醒政策配置所述配置配置提醒触发器部分。

  3. 点击下一步

  4. 配置通知。展开通知渠道菜单,然后选择您在上一步中创建的通知渠道。

  5. 点击 OK(确定)。

  6. 为提醒政策命名部分中,填写提醒政策名称字段。为每个指标使用描述性名称。使用“为提醒政策命名”值,如指标的提醒政策配置中所述。

  7. 点击下一步

  8. 查看提醒政策,然后点击创建政策

环境运行状况(Airflow 监控 DAG)指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 运行状况良好
  • API:composer.googleapis.com/environment/healthy
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:true 分数
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值排名:低于阈值
    • 阈值:90
    • 条件名称:环境健康状况
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 环境运行状况

数据库运行状况指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库运行状况良好
  • API:composer.googleapis.com/environment/database_health
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:true 分数
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值排名:低于阈值
    • 阈值:95
    • 条件名称:数据库健康状况
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 数据库运行状况

数据库 CPU 使用率指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库 CPU 利用率
  • API:composer.googleapis.com/environment/database/cpu/utilization
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:12
    • 自定义单位:小时
    • 滚动窗口函数:平均值
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值排名:高于阈值
    • 阈值:80
    • 条件名称:数据库 CPU 使用条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 数据库 CPU 使用率

数据库 CPU 使用率指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库内存利用率
  • API:composer.googleapis.com/environment/database/memory/utilization
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:12
    • 自定义单位:小时
    • 滚动窗口函数:平均值
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值排名:高于阈值
    • 阈值:80
    • 条件名称:数据库内存使用条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 数据库内存用量

调度器检测信号指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 调度器检测信号
  • API:composer.googleapis.com/environment/scheduler_heartband_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:计数
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值排名:低于阈值
    • 阈值:216

      1. 您可以通过在 Metrics Explorer 查询编辑器中运行汇总值 _scheduler_heartbeat_count_mean 的查询来获取此数字。
    • 条件名称:调度器检测信号条件

  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 调度器检测信号

“终止的僵尸任务数量”指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 僵尸任务已终止
  • API:composer.googleapis.com/environment/zombie_task_killed_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:1 天
    • 滚动窗口函数:sum
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值排名:高于阈值
    • 阈值:1
    • 条件名称:僵尸任务条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow Zombie Tasks

工作器容器重启指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 僵尸任务已终止
  • API:composer.googleapis.com/environment/zombie_task_killed_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:1 天
    • 滚动窗口函数:sum
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值排名:高于阈值
    • 阈值:1
    • 条件名称:僵尸任务条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow Zombie Tasks

Terraform

运行一个 Terraform 脚本,该脚本会创建一个电子邮件通知渠道,并根据各自的基准针对本教程中提供的关键指标上传提醒政策:

  1. 将示例 Terraform 文件保存到您的本地计算机上。
  2. 替换以下内容:

    • PROJECT_ID:项目的项目 ID。例如 example-project
    • EMAIL_ADDRESS:触发提醒时必须通知的电子邮件地址。
    • ENVIRONMENT_NAME:您的 Cloud Composer 环境的名称。例如 example-composer-environment
    • CLUSTER_NAME:您的环境集群名称,可在 Google Cloud 控制台中的“环境配置”>“资源”>“GKE 集群”下找到。
resource "google_monitoring_notification_channel" "basic" {
  project      = "PROJECT_ID"
  display_name = "Test Notification Channel"
  type         = "email"
  labels = {
    email_address = "EMAIL_ADDRESS"
  }
  # force_delete = false
}

resource "google_monitoring_alert_policy" "environment_health_metric" {
  project      = "PROJECT_ID"
  display_name = "Airflow Environment Health"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Environment health condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/healthy\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 0.9
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_FRACTION_TRUE"
      }
    }
  }

}

resource "google_monitoring_alert_policy" "database_health_metric" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database Health"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database health condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database_health\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 0.95
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_FRACTION_TRUE"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_database_cpu_usage" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database CPU Usage"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database CPU usage condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database/cpu/utilization\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 80
      aggregations {
        alignment_period   = "43200s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_database_memory_usage" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database Memory Usage"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database memory usage condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database/memory/utilization\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 80
      aggregations {
        alignment_period   = "43200s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_scheduler_heartbeat" {
  project      = "PROJECT_ID"
  display_name = "Airflow Scheduler Heartbeat"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Scheduler heartbeat condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/scheduler_heartbeat_count\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 216 // Threshold is 90% of the average for composer.googleapis.com/environment/scheduler_heartbeat_count metric in an idle environment
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_COUNT"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_zombie_task" {
  project      = "PROJECT_ID"
  display_name = "Airflow Zombie Tasks"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Zombie tasks condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/zombie_task_killed_count\" AND  resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 1
      aggregations {
        alignment_period   = "86400s"
        per_series_aligner = "ALIGN_SUM"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_worker_restarts" {
  project      = "PROJECT_ID"
  display_name = "Airflow Worker Restarts"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Worker container restarts condition"
    condition_threshold {
      filter     = "resource.type = \"k8s_container\" AND (resource.labels.cluster_name = \"CLUSTER_NAME\" AND resource.labels.container_name = monitoring.regex.full_match(\"airflow-worker|base\") AND resource.labels.pod_name = monitoring.regex.full_match(\"airflow-worker-.*|airflow-k8s-worker-.*\")) AND metric.type = \"kubernetes.io/container/restart_count\""

      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 1
      aggregations {
        alignment_period   = "86400s"
        per_series_aligner = "ALIGN_RATE"
      }
    }
  }
}

测试提醒政策

本部分介绍如何测试创建的提醒政策和解读结果。

上传示例 DAG

本教程中提供的示例 DAG memory_consumption_dag.py 模拟了密集型工作器内存利用率。该 DAG 包含 4 个任务,每个任务都会将数据写入示例字符串,这会产生 380 MB 的内存。示例 DAG 安排每 2 分钟运行一次,将在您上传到 Composer 环境后自动开始运行。

将以下示例 DAG 上传到您在先前步骤中创建的环境:

from datetime import datetime
import sys
import time

from airflow import DAG
from airflow.operators.python import PythonOperator

def ram_function():
    data = ""
    start = time.time()
    for i in range(38):
        data += "a" * 10 * 1000**2
        time.sleep(0.2)
        print(f"{i}, {round(time.time() - start, 4)}, {sys.getsizeof(data) / (1000 ** 3)}")
    print(f"Size={sys.getsizeof(data) / (1000 ** 3)}GB")
    time.sleep(30 - (time.time() - start))
    print(f"Complete in {round(time.time() - start, 2)} seconds!")

with DAG(
    dag_id="memory_consumption_dag",
    start_date=datetime(2023, 1, 1, 1, 1, 1),
    schedule="1/2 * * * *",
    catchup=False,
) as dag:
    for i in range(4):
        PythonOperator(
            task_id=f"task_{i+1}",
            python_callable=ram_function,
            retries=0,
            dag=dag,
        )

解读 Monitoring 中的提醒和指标

示例 DAG 开始运行后,等待大约 10 分钟,并评估测试结果:

  1. 请查看您的电子邮件邮箱,确认您是否收到了来自 Google Cloud Alerting 且主题行以 [ALERT] 开头的通知。此消息的内容包含提醒政策突发事件详情。

  2. 点击电子邮件通知中的查看突发事件按钮。您将被重定向到 Metrics Explorer。查看提醒突发事件的详细信息:

    提醒突发事件的详细信息
    图 2.提醒突发事件的详细信息(点击可放大)

    突发事件指标图表显示您创建的指标超出了阈值 1,这意味着 Airflow 检测到并终止了超过 1 项僵尸任务。

  3. 在 Cloud Composer 环境中,转到 Monitoring 标签页,打开 DAG 统计信息部分,找到 Zombie tasks killed 图表:

    僵尸任务图表
    图 3.僵尸任务图(点击可放大)

    该图表显示,Airflow 在运行示例 DAG 后的仅仅 10 分钟内就终止了大约 20 项僵尸任务。

  4. 根据基准测试和纠正操作,僵尸任务最常见的原因是缺少工作器内存或 CPU。通过分析工作器资源利用率,确定僵尸任务的根本原因。

    打开 Monitoring 信息中心内的“Workers”部分,并查看工作器 CPU 和内存用量指标:

    工作器 CPU 和内存用量指标
    图 4. 工作器 CPU 和内存用量指标(点击可放大)

    工作器总 CPU 使用率图表显示工作器 CPU 使用率始终低于可用总限制的 50%,因此可用 CPU 就足够了。工作器总内存用量图表显示,运行示例 DAG 导致达到可分配的内存限制,约为图表上显示的总内存限制的 75%(GKE 会在每个节点上预留前 4 GiB 内存的 25%,并在每个节点上额外预留 100 MiB 内存来处理 Pod 逐出)。

    您可以这样结论:工作器缺少内存资源,无法成功运行示例 DAG。

优化您的环境并评估其性能

根据对工作器资源利用率的分析,您需要为工作器分配更多内存,以使 DAG 中的所有任务能够成功执行。

  1. 在 Composer 环境中,打开 DAG 标签页,点击示例 DAG 的名称 (memory_consumption_dag),然后点击暂停 DAG

  2. 分配额外的工作器内存:

    1. 在“环境配置”标签页中,找到资源 > 工作负载配置,然后点击修改

    2. Worker 项中,提高内存限制。在本教程中,请使用 3.25 GB。

    3. 保存更改并等待几分钟,让 worker 重启。

  3. 打开 DAG 标签页,点击示例 DAG 的名称 (memory_consumption_dag),然后点击取消暂停 DAG

转到 Monitoring,验证在更新工作器资源限制后是否未出现新的僵尸任务:

内存限制发生更改后的僵尸任务图表
图 5.内存限制更改后的僵尸任务图表(点击可放大)

摘要

在本教程中,您了解了关键的环境级运行状况和性能指标、如何为每个指标设置提醒政策,以及如何将每个指标解读为纠正措施。然后,您运行了一个示例 DAG,借助提醒和 Monitoring 图表确定了环境运行状况问题的根本原因,并通过为工作器分配了更多内存来优化您的环境。但是,建议您优化 DAG 以从源头减少工作器资源消耗,因为不可能将资源增加到超过特定阈值。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

控制台

  1. 删除 Cloud Composer 环境。在此过程中,您还需要删除环境的存储桶。
  2. 删除您在 Cloud Monitoring 中创建的每个提醒政策

Terraform

  1. 确保您的 Terraform 脚本不包含项目仍需要的资源的条目。例如,您可能希望使某些 API 保持启用状态,并且仍分配 IAM 权限(如果您将此类定义添加到 Terraform 脚本中)。
  2. 运行 terraform destroy
  3. 手动删除环境的存储桶。Cloud Composer 不会自动将其删除。您可以通过 Google Cloud 控制台或 Google Cloud CLI 执行此操作。

后续步骤