使用 Monitoring 信息中心内的关键指标监控环境的运行状况和性能

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何使用 Monitoring 信息中心内的关键指标监控 Cloud Composer 环境的整体运行状况和性能。

简介

本教程重点介绍关键的 Cloud Composer 监控指标,这些指标可让您很好地了解环境级别的运行状况和性能。

Cloud Composer 提供了多个描述环境整体状态的指标。本教程中的监控准则基于 Cloud Composer 环境的 Monitoring 信息中心内显示的指标。

在本教程中,您将了解可作为环境性能和运行状况问题的主要指标的关键指标,以及将每个指标解读为纠正措施以保持环境运行状况的准则。您还将为每个指标设置提醒规则,运行示例 DAG,并使用这些指标和提醒来优化环境的性能。

目标

费用

本教程使用 Google Cloud 的以下收费组件:

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

本部分介绍了在开始学习本教程之前需要执行的操作。

创建和配置项目

在本教程中,您需要一个 Google Cloud 项目。通过以下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建项目

    前往项目选择器

  2. 确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 请确保您的 Google Cloud 项目用户拥有以下角色以创建必要的资源:

    • 环境和存储对象管理员 (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)
    • Monitoring Editor (roles/monitoring.editor)

为您的项目启用 API

启用 Cloud Composer API。

启用 API

创建 Cloud Composer 环境

创建 Cloud Composer 2 环境

在此过程中,您要向 Composer Service Agent 帐号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色。Cloud Composer 会使用此帐号在您的 Google Cloud 项目中执行操作。

探索反映环境层面的健康状况和性能的关键指标

本教程重点介绍一些关键指标,这些指标有助于您全面了解环境的整体运行状况和性能。

Google Cloud 控制台中的 Monitoring 信息中心包含各种指标和图表,可让您监控环境中的趋势并识别 Airflow 组件和 Cloud Composer 资源的问题。

每个 Cloud Composer 环境都有自己的 Monitoring 信息中心。

熟悉以下关键指标,并在 Monitoring 信息中心内找到每个指标:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到监控标签页。

  4. 选择概览部分,找到信息中心内的环境概览项,然后观察环境运行状况(Airflow 监控 DAG)指标。

    • 此时间轴显示了 Cloud Composer 环境的运行状况。环境运行状况栏的绿色表示环境运行状况良好,而环境运行状况不佳的状态则用红色表示。

    • Cloud Composer 每隔几分钟就会执行一个名为 airflow_monitoring 的活跃性 DAG。如果活跃性 DAG 运行成功完成,则运行状况为 True。如果活跃性 DAG 运行失败(例如,由于 Pod 逐出、外部进程终止或维护),则运行状况为 False

  5. 选择 SQL 数据库部分,在信息中心内找到数据库健康状况项,然后观察数据库健康状况指标。

    • 此时间轴显示与您环境的 Cloud SQL 实例的连接状态。绿色的数据库运行状况条表示连接,而连接失败则用红色表示。

    • Airflow 监控 Pod 会定期对数据库执行 ping 操作;如果可以建立连接,则会将运行状况报告为 True;如果不能建立连接,则报告为 False

  6. 数据库健康状况项中,观察数据库 CPU 使用率数据库内存用量指标。

    • 数据库 CPU 使用率图表可指示环境中的 Cloud SQL 数据库实例占用的 CPU 核心数与可用数据库 CPU 总限制的比值。

    • 数据库内存用量图表显示了您环境中的 Cloud SQL 数据库实例的内存用量与可用数据库内存总量限制的对比情况。

  7. 选择 Schedulers(调度器)部分,在信息中心内找到 Scheduler heartbeat(调度程序检测信号)项,并观察 Scheduler heartbeat(调度程序检测信号)指标。

    • 此时间轴会显示 Airflow 调度器的运行状况。检查红色区域,以确定 Airflow 调度器问题。如果您的环境中有多个调度器,那么只要至少有一个调度器正在响应,检测信号状态就是正常的。

    • 如果上次收到检测信号的时间比当前时间早 30 秒(默认值)以上,则视为调度程序运行状况不佳。

  8. 选择 DAG 统计信息部分,在信息中心内找到终止的僵尸任务数项,然后观察终止的僵尸任务指标。

    • 此图显示了小时间段内终止的僵尸任务数量。僵尸任务通常是由于 Airflow 进程的外部终止(例如任务的进程被终止时)引起的。

    • Airflow 调度器会定期终止僵尸任务,具体体现在此图表中。

  9. 选择工作器部分,在信息中心内找到工作器容器重启项,并观察工作器容器重启指标。

    • 一个图表,显示各个工作器容器的重启总次数。容器重启次数过多可能会影响您的服务或其他将其用作依赖项的下游服务的可用性。

了解针对关键指标的基准测试和可能的纠正措施

以下列表介绍了可以指出问题的基准值,并提供了为解决这些问题而可以采取的纠正措施。

  • 环境健康状况(Airflow 监控 DAG)

    • 在 4 小时的窗口期内,成功率低于 90%

    • 故障可能意味着由于环境过载或发生故障而导致 Pod 被逐出或终止。环境运行状况时间轴上的红色区域通常与各个环境组件的其他运行状况栏中的红色区域相关。通过查看 Monitoring 信息中心内的其他指标来确定根本原因。

  • 数据库健康状况

    • 在 4 小时的窗口期内,成功率低于 95%

    • 故障表示与 Airflow 数据库的连接出现问题,这可能是由于数据库过载(例如,由于连接到数据库时 CPU 或内存用量较高,或者延迟时间较长)而导致的数据库崩溃或停机。这些症状最常见的是由于并非最佳的 DAG,例如当 DAG 使用许多全局定义的 Airflow 或环境变量时。查看 SQL 数据库资源使用情况指标,找出根本原因。您还可以检查调度器日志,查找与数据库连接相关的错误

  • 数据库 CPU 和内存用量

    • 12 小时内的平均 CPU 或内存使用率超过 80%

    • 数据库可能过载。分析 DAG 运行与数据库 CPU 或内存使用量峰值之间的相关性。

  • 调度程序检测信号

    • 在 4 小时的窗口期内,成功率低于 90%

    • 为调度器分配更多资源,或者将调度器的数量从 1 增加到 2(推荐)。

  • 已终止的僵尸任务

    • 每 24 小时有多个僵尸任务

    • 发生僵尸任务的最常见原因是环境集群中的 CPU 或内存资源不足。查看工作器资源使用情况图并为工作器分配更多资源,或增加僵尸任务超时时间,使调度程序等待更长时间才能将任务视为僵尸。

  • 工作器容器重启

    • 每 24 小时多次重启

    • 最常见的原因是工作器内存或存储空间不足。调查工作器资源消耗情况,并为工作器分配更多内存或存储空间。如果资源不足不是原因,请参阅排查工作器重启突发事件的问题,并使用 Logging 查询来发现工作器重启的原因。

创建通知渠道

按照创建通知渠道中的说明创建电子邮件通知渠道。

如需详细了解通知渠道,请参阅管理通知渠道

创建提醒政策

根据本教程前面部分提供的基准创建提醒政策,以持续监控指标的值,并在这些指标违反条件时接收通知。

控制台

您可以点击相应项一角的铃铛图标,为 Monitoring 信息中心中显示的每个指标设置提醒:

为 Monitoring 信息中心内显示的指标创建提醒
图 1.为监控信息中心内显示的指标创建提醒(点击可放大)
  1. 在 Monitoring 信息中心内找到要监控的每个指标,然后点击指标项一角的铃铛图标。此时将打开创建提醒政策页面。

  2. 转换数据部分中,执行以下操作:

    1. 按照指标的提醒政策配置中的说明,配置在每个时序内部分。

    2. 点击下一步,然后按照指标的提醒政策配置中的说明配置配置提醒触发器部分。

  3. 点击下一步

  4. 配置通知。展开通知渠道菜单,然后选择您在上一步中创建的通知渠道。

  5. 点击 OK(确定)。

  6. 为提醒政策命名部分中,填写提醒政策名称字段。为每个指标使用描述性名称。使用“为提醒政策命名”值,如指标的提醒政策配置中所述。

  7. 点击下一步

  8. 查看提醒政策,然后点击 Create policy

环境健康状况(Airflow 监控 DAG)指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 运行状况良好
  • API:composer.googleapis.com/environment/healthy
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序中:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:true 分数
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值位置:低于阈值
    • 阈值:90
    • 条件名称:环境健康状况
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow Environment Health

数据库健康状况指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库运行状况良好
  • API:composer.googleapis.com/environment/database_health
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序中:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:true 分数
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值位置:低于阈值
    • 阈值:95
    • 条件名称:数据库健康状况
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 数据库健康状况

数据库 CPU 使用率指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库 CPU 利用率
  • API:composer.googleapis.com/environment/database/cpu/utilization
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序中:

    • 滚动窗口:自定义
    • 自定义值:12
    • 自定义单位:小时
    • 滚动窗口函数:平均值
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值位置:高于阈值
    • 阈值:80
    • 条件名称:数据库 CPU 使用条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 数据库 CPU 使用率

数据库 CPU 使用率指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库内存利用率
  • API:composer.googleapis.com/environment/database/memory/utilization
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序中:

    • 滚动窗口:自定义
    • 自定义值:12
    • 自定义单位:小时
    • 滚动窗口函数:平均值
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值位置:高于阈值
    • 阈值:80
    • 条件名称:数据库内存使用条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 数据库内存用量

调度器检测信号指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 调度程序检测信号
  • API:composer.googleapis.com/environment/scheduler_heartbeat_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序中:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:计数
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值位置:低于阈值
    • 阈值:216

      1. 如需获取此数值,您可以在 Metrics Explorer 查询编辑器中运行汇总值 _scheduler_heartbeat_count_mean 的查询。
    • 条件名称:调度程序检测信号条件

  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 调度器检测信号

终止的僵尸任务数量指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 终止的僵尸任务数
  • API:composer.googleapis.com/environment/zombie_task_killed_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序中:

    • 滚动窗口:1 天
    • 滚动窗口函数:求和
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值位置:高于阈值
    • 阈值:1
    • 条件名称:僵尸任务条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow Zombie Tasks

工作器容器重启次数指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 终止的僵尸任务数
  • API:composer.googleapis.com/environment/zombie_task_killed_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序中:

    • 滚动窗口:1 天
    • 滚动窗口函数:求和
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的行为
    • 阈值位置:高于阈值
    • 阈值:1
    • 条件名称:僵尸任务条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow Zombie Tasks

Terraform

运行一个 Terraform 脚本,用于创建电子邮件通知渠道,并根据各自的基准针对本教程中提供的关键指标上传提醒政策:

  1. 将示例 Terraform 文件保存到您的本地计算机上。
  2. 替换以下内容:

    • PROJECT_ID:项目的项目 ID。例如 example-project
    • EMAIL_ADDRESS:必须在触发提醒时接收通知的电子邮件地址。
    • ENVIRONMENT_NAME:您的 Cloud Composer 环境的名称。例如 example-composer-environment
    • CLUSTER_NAME:您的环境集群名称,可在 Google Cloud 控制台中的“环境配置”>“资源”>“GKE 集群”下找到。
resource "google_monitoring_notification_channel" "basic" {
  project      = "PROJECT_ID"
  display_name = "Test Notification Channel"
  type         = "email"
  labels = {
    email_address = "EMAIL_ADDRESS"
  }
  # force_delete = false
}

resource "google_monitoring_alert_policy" "environment_health_metric" {
  project      = "PROJECT_ID"
  display_name = "Airflow Environment Health"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Environment health condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/healthy\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 0.9
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_FRACTION_TRUE"
      }
    }
  }

}

resource "google_monitoring_alert_policy" "database_health_metric" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database Health"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database health condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database_health\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 0.95
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_FRACTION_TRUE"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_database_cpu_usage" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database CPU Usage"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database CPU usage condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database/cpu/utilization\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 80
      aggregations {
        alignment_period   = "43200s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_database_memory_usage" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database Memory Usage"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database memory usage condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database/memory/utilization\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 80
      aggregations {
        alignment_period   = "43200s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_scheduler_heartbeat" {
  project      = "PROJECT_ID"
  display_name = "Airflow Scheduler Heartbeat"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Scheduler heartbeat condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/scheduler_heartbeat_count\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 216 // Threshold is 90% of the average for composer.googleapis.com/environment/scheduler_heartbeat_count metric in an idle environment
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_COUNT"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_zombie_task" {
  project      = "PROJECT_ID"
  display_name = "Airflow Zombie Tasks"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Zombie tasks condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/zombie_task_killed_count\" AND  resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 1
      aggregations {
        alignment_period   = "86400s"
        per_series_aligner = "ALIGN_SUM"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_worker_restarts" {
  project      = "PROJECT_ID"
  display_name = "Airflow Worker Restarts"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Worker container restarts condition"
    condition_threshold {
      filter     = "resource.type = \"k8s_container\" AND (resource.labels.cluster_name = \"CLUSTER_NAME\" AND resource.labels.container_name = monitoring.regex.full_match(\"airflow-worker|base\") AND resource.labels.pod_name = monitoring.regex.full_match(\"airflow-worker-.*|airflow-k8s-worker-.*\")) AND metric.type = \"kubernetes.io/container/restart_count\""

      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 1
      aggregations {
        alignment_period   = "86400s"
        per_series_aligner = "ALIGN_RATE"
      }
    }
  }
}

测试提醒政策

本部分介绍如何测试创建的提醒政策和解读结果。

上传示例 DAG

本教程中提供的示例 DAG memory_consumption_dag.py 会模拟密集的工作器内存利用率。该 DAG 包含 4 个任务,每个任务都会将数据写入示例字符串中,这会产生 380 MB 的内存。示例 DAG 安排每 2 分钟运行一次,并会在您将其上传到 Composer 环境后自动开始运行。

将以下示例 DAG上传到您在先前步骤中创建的环境:

from datetime import datetime
import sys
import time

from airflow import DAG
from airflow.operators.python import PythonOperator

def ram_function():
    data = ""
    start = time.time()
    for i in range(38):
        data += "a" * 10 * 1000**2
        time.sleep(0.2)
        print(f"{i}, {round(time.time() - start, 4)}, {sys.getsizeof(data) / (1000 ** 3)}")
    print(f"Size={sys.getsizeof(data) / (1000 ** 3)}GB")
    time.sleep(30 - (time.time() - start))
    print(f"Complete in {round(time.time() - start, 2)} seconds!")

with DAG(
    dag_id="memory_consumption_dag",
    start_date=datetime(2023, 1, 1, 1, 1, 1),
    schedule="1/2 * * * *",
    catchup=False,
) as dag:
    for i in range(4):
        PythonOperator(
            task_id=f"task_{i+1}",
            python_callable=ram_function,
            retries=0,
            dag=dag,
        )

解读 Monitoring 中的提醒和指标

示例 DAG 开始运行后,等待大约 10 分钟,并评估测试结果:

  1. 请检查您的电子邮件邮箱,确认是否收到了 Google Cloud Alerting 发送的主题行以 [ALERT] 开头的通知。此消息的内容包含提醒政策突发事件详情。

  2. 点击电子邮件通知中的查看突发事件按钮。您将被重定向到 Metrics Explorer。查看提醒突发事件的详细信息:

    提醒突发事件的详细信息
    图 2.提醒突发事件的详细信息(点击可放大)

    突发事件指标图表指示您创建的指标超出了阈值 1,这意味着 Airflow 检测到并终止了超过 1 项僵尸任务。

  3. 在 Cloud Composer 环境中,转到监控标签页,打开 DAG 统计信息部分,然后找到终止的僵尸任务图表:

    僵尸任务图表
    图 3.僵尸任务图表(点击可放大)

    该图表显示,Airflow 在运行示例 DAG 的前 10 分钟内就终止了约 20 项僵尸任务。

  4. 根据基准测试和纠正操作,僵尸任务的最常见原因是缺少工作器内存或 CPU。通过分析工作器资源利用率,确定造成僵尸任务的根本原因。

    打开 Monitoring 信息中心内的“Workers”部分,并查看工作器 CPU 和内存用量指标:

    工作器 CPU 和内存用量指标
    图 4. 工作器 CPU 和内存用量指标(点击可放大)

    工作器总 CPU 使用率图表显示工作器 CPU 使用率始终低于总可用限制的 50%,因此可用的 CPU 已足够。工作器总内存用量图表显示,运行示例 DAG 会达到可分配的内存限制,接近图表上显示的总内存限制的 75%(GKE 会在每个节点的前 4 GiB 内存中预留 25% 的内存,并在每个节点上额外预留 100 MiB 的内存来处理 Pod 逐出)。

    您可以推断出工作器缺少内存资源,无法成功运行示例 DAG。

优化您的环境并评估其性能

根据对工作器资源利用率的分析,您需要为工作器分配更多内存,以确保 DAG 中的所有任务能够成功。

  1. 在 Composer 环境中,打开 DAG 标签页,点击示例 DAG 的名称 (memory_consumption_dag),然后点击暂停 DAG

  2. 分配额外的工作器内存:

    1. 在“环境配置”标签页中,找到资源 > 工作负载配置,然后点击修改

    2. Worker 项中,提高内存限制。在本教程中,将使用 3.25 GB。

    3. 保存更改,并等待几分钟,让工作器重启。

  3. 打开 DAG 标签页,点击示例 DAG 的名称 (memory_consumption_dag),然后点击取消暂停 DAG

转到 Monitoring,验证在更新工作器资源限制后是否未出现新的僵尸任务:

内存限制更改后的僵尸任务图表
图 5.内存限制更改后的僵尸任务图表(点击可放大)

摘要

在本教程中,您了解了关键的环境级运行状况和性能指标、如何为每个指标设置提醒政策,以及如何将每个指标解读为纠正措施。然后,您运行了一个示例 DAG,借助提醒和 Monitoring 图表确定环境运行状况问题的根本原因,并通过为工作器分配更多内存来优化您的环境。但是,建议您优化 DAG 以从源头减少工作器资源消耗,因为不可能将资源增加到超过特定阈值。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

控制台

  1. 删除 Cloud Composer 环境。在此过程中,您还需要删除环境的存储桶。
  2. 删除您在 Cloud Monitoring 中创建的每个提醒政策

Terraform

  1. 确保您的 Terraform 脚本不包含项目仍需要的资源的条目。例如,您可能想要让某些 API 保持启用状态,并且仍然分配 IAM 权限(如果您已将此类定义添加到 Terraform 脚本中)。
  2. 运行 terraform destroy
  3. 手动删除环境的存储桶。Cloud Composer 不会自动将其删除。您可以通过 Google Cloud 控制台或 Google Cloud CLI 执行此操作。

后续步骤