在 Monitoring 信息中心内使用关键指标监控环境健康状况和性能

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何使用 Monitoring 信息中心内的关键指标监控 Cloud Composer 环境的整体运行状况和性能。

简介

本教程重点介绍关键的 Cloud Composer 监控指标,这些指标可让您很好地了解环境级别的运行状况和性能。

Cloud Composer 提供了多种指标,用于描述环境的总体状态。本教程中的监控准则基于 Cloud Composer 环境的 Monitoring 信息中心内显示的指标。

在本教程中,您将了解可作为环境性能和运行状况问题的主要指标的关键指标,以及将每个指标解释为保持环境健康的纠正措施的指南。您还将为每个指标设置提醒规则,运行示例 DAG,并使用这些指标和提醒来优化环境的性能。

目标

费用

本教程使用 Google Cloud 的以下收费组件:

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

本部分介绍了在开始学习本教程之前需要执行的操作。

创建和配置项目

在本教程中,您需要一个 Google Cloud 项目。按如下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建项目

    前往项目选择器

  2. 确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 请确保您的 Google Cloud 项目用户具有以下角色,以便创建必要的资源:

    • Environment and Storage Object Administrator (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)
    • Monitoring Editor (roles/monitoring.editor)

为您的项目启用 API

启用 Cloud Composer API。

启用 API

创建 Cloud Composer 环境

创建 Cloud Composer 2 环境

在此过程中,您要向 Composer Service Agent 帐号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色。Cloud Composer 使用此账号在您的 Google Cloud 项目中执行操作。

探索有关环境级健康状况和性能的关键指标

本教程重点介绍一些关键指标,这些指标可以让您全面了解环境的整体运行状况和性能。

Google Cloud 控制台中的 Monitoring 信息中心包含各种指标和图表,可让您监控环境中的趋势并识别 Airflow 组件和 Cloud Composer 资源的问题。

每个 Cloud Composer 环境都有自己的 Monitoring 信息中心。

请熟悉以下关键指标,并在 Monitoring 信息中心内找到每个指标:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到监控标签页。

  4. 选择概览部分,在信息中心内找到环境概览项,然后观察环境运行状况(Airflow 监控 DAG)指标。

    • 此时间轴显示了 Cloud Composer 环境的运行状况。环境运行状况栏的绿色表示环境良好,不健康的环境状态则以红色表示。

    • Cloud Composer 每隔几分钟就会执行一个名为 airflow_monitoring 的活跃性 DAG。如果活跃性 DAG 运行成功完成,则运行状况为 True。如果活跃性 DAG 运行失败(例如,由于 Pod 逐出、外部进程终止或维护),则运行状况为 False

  5. 选择 SQL 数据库部分,在信息中心内找到数据库健康状况项,然后观察数据库健康状况指标。

    • 此时间轴显示与您环境的 Cloud SQL 实例的连接状态。绿色的数据库运行状况条表示连接,而连接失败则显示为红色。

    • Airflow 监控 Pod 会定期对数据库执行 ping 操作。如果可以建立连接,则将运行状况报告为 True;如果无法建立连接,则报告为 False

  6. 数据库健康状况项中,观察数据库 CPU 使用率数据库内存使用率指标。

    • 数据库 CPU 使用率图表会显示环境中的 Cloud SQL 数据库实例 CPU 核心使用率与可用数据库总 CPU 限制的对比情况。

    • 数据库内存用量图表显示了您环境中的 Cloud SQL 数据库实例的内存用量与可用数据库总内存限制的对比情况。

  7. 选择调度器部分,在信息中心内找到调度器检测信号项,然后观察调度器检测信号指标。

    • 此时间轴显示 Airflow 调度器的运行状况。检查是否有红色区域,以找出 Airflow 调度器问题。如果您的环境有多个调度器,那么只要至少有一个调度器在响应,检测信号状态就是正常的。

    • 如果上次收到检测信号的时间比当前时间早 30 秒(默认值)时,则调度程序会被视为运行状况不佳。

  8. 选择 DAG 统计信息部分,在信息中心内找到已终止的僵尸任务项,然后观察终止的僵尸任务指标。

    • 此图显示了在短时间内终止的僵尸任务数量。僵尸任务通常是由 Airflow 进程的外部终止(例如任务进程被终止时)引起的。

    • Airflow 调度器会定期终止僵尸任务,如此图表所示。

  9. 选择工作器部分,在信息中心内找到工作器容器重启项,然后观察工作器容器重启指标。

    • 一张图表表示各个工作器容器的重启总次数。过多的容器重启可能会影响您的服务或其他将其用作依赖项的下游服务的可用性。

了解针对关键指标的基准和可能的纠正措施

以下列表介绍了可以指示问题的基准值,并提供您可以采取的纠正措施来解决这些问题。

  • 环境健康状况(Airflow 监控 DAG)

    • 在 4 小时的时间范围内,成功率低于 90%

    • 失败可能意味着由于环境过载或发生故障而导致 Pod 被逐出或终止工作器。环境运行状况时间轴上的红色区域通常与相应环境组件的其他运行状况条中的红色区域对应。通过查看 Monitoring 信息中心内的其他指标确定根本原因。

  • 数据库健康状况

    • 在 4 小时的时间范围内,成功率低于 95%

    • 故障意味着与 Airflow 数据库的连接出现问题,这可能是由于数据库过载(例如,由于连接到数据库的 CPU 或内存用量较高或延迟时间较长)而导致数据库崩溃或停机导致的。这些症状最常见的是由于优化程度不够理想的 DAG,例如 DAG 使用许多全局定义的 Airflow 或环境变量。查看 SQL 数据库资源使用情况指标,找出根本原因。您还可以检查调度器日志,查找与数据库连接相关的错误

  • 数据库 CPU 和内存用量

    • 12 小时内的平均 CPU 或内存使用率超过 80%

    • 数据库可能过载。分析 DAG 运行与数据库 CPU 或内存使用量峰值之间的相关性。

  • 调度程序检测信号

    • 在 4 小时的时间范围内,成功率低于 90%

    • 为调度器分配更多资源,或将调度器数量从 1 增加到 2(推荐)。

  • 已终止的僵尸任务

    • 每 24 小时有多个僵尸任务

    • 发生僵尸任务的最常见原因是环境集群中的 CPU 或内存资源不足。查看工作器资源使用情况图,并为工作器分配更多资源,或增加僵尸任务超时时间,使调度器等待更长时间才能将任务视为僵尸。

  • 工作器容器重启

    • 每 24 小时多次重启

    • 最常见的原因是工作器内存不足或存储空间不足。调查工作器资源消耗情况,并为您的工作器分配更多内存或存储空间。如果资源不足不是原因,请参阅排查工作器重启突发事件的问题,并使用 Logging 查询找出工作器重启的原因。

创建通知渠道

按照创建通知渠道中列出的说明创建电子邮件通知渠道。

如需详细了解通知渠道,请参阅管理通知渠道

创建提醒政策

根据本教程前面部分中提供的基准创建提醒政策,持续监控指标的值,并在这些指标违反条件时收到通知。

控制台

您可以点击相应项一角的铃铛图标,为 Monitoring 信息中心中显示的每个指标设置提醒:

为监控信息中心内显示的指标创建提醒
图 1.为监控信息中心上显示的指标创建提醒(点击可放大)
  1. 在 Monitoring 信息中心内找到要监控的每个指标,然后点击指标项一角的铃铛图标。此时将打开创建提醒政策页面。

  2. 转换数据部分中,执行以下操作:

    1. 按照指标的提醒政策配置所述,配置在每个时序内部分。

    2. 点击下一步,然后按照指标的提醒政策配置所述配置配置提醒触发器部分。

  3. 点击下一步

  4. 配置通知。展开通知渠道菜单,然后选择您在上一步中创建的通知渠道。

  5. 点击确定

  6. 为提醒政策命名部分中,填写提醒政策名称字段。为每个指标使用描述性名称。按照指标的提醒政策配置中所述,使用“为提醒政策命名”值。

  7. 点击下一步

  8. 查看提醒政策,然后点击创建政策

环境健康状况(Airflow 监控 DAG)指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 健康状况良好
  • API:composer.googleapis.com/environment/healthy
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:分数 true
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:低于阈值
    • 阈值:90
    • 条件名称:环境健康状况
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 环境健康状况

数据库运行状况指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库运行状况良好
  • API:composer.googleapis.com/environment/database_health
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:分数 true
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:低于阈值
    • 阈值:95
    • 条件名称:数据库健康状况
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 数据库健康状况

数据库 CPU 使用率指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库 CPU 利用率
  • API:composer.googleapis.com/environment/database/cpu/utilization
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:12
    • 自定义单位:小时
    • 滚动窗口函数:平均值
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:高于阈值
    • 阈值:80
    • 条件名称:数据库 CPU 使用条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 数据库 CPU 使用率

数据库 CPU 使用率指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库内存利用率
  • API:composer.googleapis.com/environment/database/memory/utilization
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:12
    • 自定义单位:小时
    • 滚动窗口函数:平均值
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:高于阈值
    • 阈值:80
    • 条件名称:数据库内存用量条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 数据库内存用量

调度器检测信号指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 调度器检测信号
  • API:composer.googleapis.com/environment/scheduler_heartbeat_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:计数
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:低于阈值
    • 阈值:216

      1. 如需获取此数字,您可以在 Metrics Explorer 查询编辑器中运行对值 _scheduler_heartbeat_count_mean 进行汇总的查询。
    • 条件名称:调度器检测信号条件

  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow 调度器检测信号

终止的僵尸任务数量指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 终止的僵尸任务数量
  • API:composer.googleapis.com/environment/zombie_task_killed_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:1 天
    • 滚动窗口函数:sum
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:高于阈值
    • 阈值:1
    • 条件名称:僵尸任务条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow Zombie Tasks

工作器容器重启指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 终止的僵尸任务数量
  • API:composer.googleapis.com/environment/zombie_task_killed_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:1 天
    • 滚动窗口函数:sum
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:高于阈值
    • 阈值:1
    • 条件名称:僵尸任务条件
  • 配置通知并最终确定提醒:

    • 将提醒政策命名为:Airflow Zombie Tasks

Terraform

运行一个 Terraform 脚本,用于创建电子邮件通知渠道,并根据各自的基准为本教程中提供的关键指标上传提醒政策:

  1. 将示例 Terraform 文件保存到您的本地计算机上。
  2. 请替换以下内容:

    • PROJECT_ID:您的项目的项目 ID。例如 example-project
    • EMAIL_ADDRESS:必须在触发提醒时接收通知的电子邮件地址。
    • ENVIRONMENT_NAME:您的 Cloud Composer 环境的名称。例如 example-composer-environment
    • CLUSTER_NAME:您的环境集群名称,可在 Google Cloud 控制台中的“环境配置”>“资源”>“GKE 集群”下找到。
resource "google_monitoring_notification_channel" "basic" {
  project      = "PROJECT_ID"
  display_name = "Test Notification Channel"
  type         = "email"
  labels = {
    email_address = "EMAIL_ADDRESS"
  }
  # force_delete = false
}

resource "google_monitoring_alert_policy" "environment_health_metric" {
  project      = "PROJECT_ID"
  display_name = "Airflow Environment Health"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Environment health condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/healthy\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 0.9
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_FRACTION_TRUE"
      }
    }
  }

}

resource "google_monitoring_alert_policy" "database_health_metric" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database Health"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database health condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database_health\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 0.95
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_FRACTION_TRUE"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_database_cpu_usage" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database CPU Usage"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database CPU usage condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database/cpu/utilization\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 80
      aggregations {
        alignment_period   = "43200s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_database_memory_usage" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database Memory Usage"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database memory usage condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database/memory/utilization\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 80
      aggregations {
        alignment_period   = "43200s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_scheduler_heartbeat" {
  project      = "PROJECT_ID"
  display_name = "Airflow Scheduler Heartbeat"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Scheduler heartbeat condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/scheduler_heartbeat_count\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 216 // Threshold is 90% of the average for composer.googleapis.com/environment/scheduler_heartbeat_count metric in an idle environment
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_COUNT"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_zombie_task" {
  project      = "PROJECT_ID"
  display_name = "Airflow Zombie Tasks"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Zombie tasks condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/zombie_task_killed_count\" AND  resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 1
      aggregations {
        alignment_period   = "86400s"
        per_series_aligner = "ALIGN_SUM"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_worker_restarts" {
  project      = "PROJECT_ID"
  display_name = "Airflow Worker Restarts"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Worker container restarts condition"
    condition_threshold {
      filter     = "resource.type = \"k8s_container\" AND (resource.labels.cluster_name = \"CLUSTER_NAME\" AND resource.labels.container_name = monitoring.regex.full_match(\"airflow-worker|base\") AND resource.labels.pod_name = monitoring.regex.full_match(\"airflow-worker-.*|airflow-k8s-worker-.*\")) AND metric.type = \"kubernetes.io/container/restart_count\""

      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 1
      aggregations {
        alignment_period   = "86400s"
        per_series_aligner = "ALIGN_RATE"
      }
    }
  }
}

测试提醒政策

本部分介绍如何测试创建的提醒政策和解读结果。

上传示例 DAG

本教程中提供的示例 DAG memory_consumption_dag.py 会模拟密集的工作器内存利用率。该 DAG 包含 4 个任务,每项任务都会将数据写入示例字符串,这占用 380 MB 的内存。示例 DAG 安排每 2 分钟运行一次,并将在您将它上传到 Composer 环境后自动开始运行。

将以下示例 DAG 上传到您在先前步骤中创建的环境:

from datetime import datetime
import sys
import time

from airflow import DAG
from airflow.operators.python import PythonOperator


def ram_function():
    data = ""
    start = time.time()
    for i in range(38):
        data += "a" * 10 * 1000**2
        time.sleep(0.2)
        print(f"{i}, {round(time.time() - start, 4)}, {sys.getsizeof(data) / (1000 ** 3)}")
    print(f"Size={sys.getsizeof(data) / (1000 ** 3)}GB")
    time.sleep(30 - (time.time() - start))
    print(f"Complete in {round(time.time() - start, 2)} seconds!")


with DAG(
    dag_id="memory_consumption_dag",
    start_date=datetime(2023, 1, 1, 1, 1, 1),
    schedule="1/2 * * * *",
    catchup=False,
) as dag:
    for i in range(4):
        PythonOperator(
            task_id=f"task_{i+1}",
            python_callable=ram_function,
            retries=0,
            dag=dag,
        )

解读 Monitoring 中的提醒和指标

在示例 DAG 开始运行后等待大约 10 分钟,并评估测试结果:

  1. 请查看您的电子邮件邮箱,确认您是否收到了来自 Google Cloud Alerting(主题行以 [ALERT] 开头的通知)的通知。此消息的内容包含提醒政策突发事件详情。

  2. 点击电子邮件通知中的查看突发事件按钮。您将被重定向到 Metrics Explorer。查看提醒突发事件的详细信息:

    提醒突发事件的详细信息
    图 2.提醒突发事件的详细信息(点击可放大)

    突发事件指标图表显示您创建的指标超出了阈值 1,这意味着 Airflow 检测到并终止了超过 1 项僵尸任务。

  3. 在 Cloud Composer 环境中,转到监控标签页,打开 DAG 统计信息部分,找到终止的僵尸任务图:

    僵尸任务图表
    图 3.僵尸任务图(点击可放大)

    该图表显示,Airflow 在运行示例 DAG 的前 10 分钟内就终止了约 20 项僵尸任务。

  4. 根据基准测试和纠正操作,发生僵尸任务的最常见原因是缺少工作器内存或 CPU。通过分析工作器资源利用率,确定僵尸任务的根本原因。

    打开 Monitoring 信息中心内的“Workers”部分,查看工作器 CPU 和内存用量指标:

    工作器 CPU 和内存用量指标
    图 4. 工作器 CPU 和内存用量指标(点击可放大)

    工作器总 CPU 使用率图表显示工作器 CPU 使用率始终低于可用总上限的 50%,因此可用的 CPU 已足够。“总工作器内存用量”图表显示,运行示例 DAG 会达到可分配的内存限制,接近图表上显示的总内存限制的 75%(GKE 会在每个节点上预留前 4 GiB 内存的 25%,并在每个节点上另外保留 100 MiB 的内存来处理 Pod 逐出)。

    您可以得出这样的结论:工作器缺少内存资源,无法成功运行示例 DAG。

优化您的环境并评估其性能

根据对工作器资源利用率的分析,您需要为工作器分配更多内存,以使 DAG 中的所有任务取得成功。

  1. 在 Composer 环境中,打开 DAG 标签页,点击示例 DAG 的名称 (memory_consumption_dag),然后点击暂停 DAG

  2. 分配额外的工作器内存:

    1. 在“环境配置”标签页中,找到资源 > 工作负载配置,然后点击修改

    2. Worker 项中,提高 Memory 上限。在本教程中,请使用 3.25 GB。

    3. 保存更改,并等待几分钟,让工作器重启。

  3. 打开 DAG 标签页,点击示例 DAG 的名称 (memory_consumption_dag),然后点击取消暂停 DAG

转到 Monitoring,验证在更新工作器资源限制后是否未出现任何新的僵尸任务:

内存限制更改后的僵尸任务图表
图 5.内存限制更改后的僵尸任务图表(点击可放大)

摘要

在本教程中,您了解了关键的环境级运行状况和性能指标、如何为每个指标设置提醒政策,以及如何将每个指标解释为纠正措施。然后,您运行了一个示例 DAG,借助提醒和 Monitoring 图表确定了环境运行状况问题的根本原因,并通过为工作器分配更多内存来优化您的环境。但是,建议您优化 DAG 以从根本上减少工作器资源消耗量,因为无法将资源增加到超过特定阈值。

清理

为避免系统因本教程中使用的资源向您的 Google Cloud 帐号收取费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

控制台

  1. 删除 Cloud Composer 环境。在此过程中,您还需要删除环境的存储桶。
  2. 删除您在 Cloud Monitoring 中创建的每项提醒政策

Terraform

  1. 确保您的 Terraform 脚本不包含项目仍需要的资源的条目。例如,您可能希望让某些 API 保持启用状态,并且仍分配 IAM 权限(如果您已将此类定义添加到 Terraform 脚本中)。
  2. 运行 terraform destroy
  3. 手动删除环境的存储桶。Cloud Composer 不会自动将其删除。您可以通过 Google Cloud 控制台或 Google Cloud CLI 执行此操作。

后续步骤