在 Monitoring 信息中心内使用关键指标监控环境健康状况和性能

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本页面介绍了如何使用 Monitoring 信息中心内的关键指标来监控 Cloud Composer 环境的整体健康状况和性能。

简介

本教程重点介绍关键的 Cloud Composer 监控指标,这些指标可让您全面了解环境级运行状况和性能。

Cloud Composer 提供了多个描述环境整体状态的指标。本教程中的监控指南基于 Cloud Composer 环境的 Monitoring 信息中心上公开的指标。

在本教程中,您将了解可作为环境性能和健康状况问题的主要指标的关键指标,以及将每个指标解释为纠正措施以保持环境健康的指南。您还将为每个指标设置提醒规则,运行示例 DAG,并使用这些指标和提醒来优化环境性能。

目标

费用

本教程使用 Google Cloud 的以下收费组件:

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

本部分介绍了在开始学习本教程之前必须执行的操作。

创建和配置项目

在本教程中,您需要一个 Google Cloud 项目。按以下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建项目

    转到项目选择器

  2. 确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 如需创建必要的资源,请确保您的 Google Cloud 项目用户具有以下角色

    • Environment and Storage Object Administrator (roles/composer.environmentAndStorageObjectAdmin)
    • Compute Admin (roles/compute.admin)
    • Monitoring Editor (roles/monitoring.editor)

为您的项目启用 API

启用 Cloud Composer API。

启用 API

创建 Cloud Composer 环境

创建一个 Cloud Composer 2 环境

在此过程中,您需要向 Composer Service Agent 帐号授予 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色。Cloud Composer 使用此账号在您的 Google Cloud 项目中执行操作。

探索有关环境级健康状况和性能的关键指标

本教程重点介绍一些关键指标,这些指标可帮助您全面了解环境的整体健康状况和性能。

Google Cloud 控制台中的 Monitoring 信息中心包含各种指标和图表,可用于监控环境中的趋势并找出 Airflow 组件和 Cloud Composer 资源的问题。

每个 Cloud Composer 环境都有自己的 Monitoring 信息中心。

熟悉以下关键指标,并在 Monitoring 信息中心内找到每个指标:

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到监控标签页。

  4. 选择概览部分,在信息中心内找到环境概览项,然后观察环境健康状况(Airflow 监控 DAG)指标。

    • 此时间轴显示 Cloud Composer 环境的健康状况。环境健康状况栏的绿色表示环境健康状况良好,而环境健康状况不佳时则用红色表示。

    • Cloud Composer 会每隔几分钟执行一个名为 airflow_monitoring 的活跃性 DAG。如果活跃性 DAG 运行成功完成,则运行状况为 True。如果活跃性 DAG 运行失败(例如,由于 Pod 逐出、外部进程终止或维护),则运行状况为 False

  5. 选择 SQL 数据库部分,在信息中心内找到数据库运行状况项,然后观察数据库运行状况指标。

    • 此时间轴显示与您的环境的 Cloud SQL 实例的连接状态。绿色的数据库运行状况栏表示连接,而连接失败则用红色表示。

    • Airflow 监控 Pod 会定期 ping 数据库,如果可以建立连接,将运行状况报告为 True,否则报告为 False

  6. Database health 项中,观察 Database CPU usageDatabase memory usage 指标。

    • 数据库 CPU 使用率图指示您环境的 Cloud SQL 数据库实例的 CPU 核心使用率与可用的总数据库 CPU 限制的比率。

    • 数据库内存用量图表表示您环境的 Cloud SQL 数据库实例的内存用量与可用总数据库内存上限的比值。

  7. 选择调度器部分,在信息中心内找到调度器检测信号项,然后观察调度器检测信号指标。

    • 此时间轴显示 Airflow 调度器的健康状况。检查红色区域,以识别 Airflow 调度器问题。如果您的环境有多个调度器,那么只要至少有一个调度器在响应,检测信号状态为良好。

    • 如果在当前时间之前超过 30 秒(默认值)收到最后一次检测信号,则调度程序会被视为运行状况不佳。

  8. 选择 DAG 统计信息部分,在信息中心内找到 Zombie tasks killed 项,然后观察 Zombie tasks killed 指标。

    • 此图表显示在短时间内终止的僵尸任务数量。僵尸任务通常是由 Airflow 进程在外部终止(例如任务进程被终止)导致的。

    • Airflow 调度器会定期终止僵尸任务,正如此图表所示。

  9. 选择工作器部分,在信息中心内找到工作器容器重启项,然后观察工作器容器重启指标。

    • 表示各个工作器容器的重启总次数。容器重启次数过多可能会影响您的服务或下游将它用作依赖项的其他服务的可用性。

了解关键指标的基准和可能的纠正措施

以下列表介绍了可以指明问题以及可用于解决这些问题的纠正措施的基准值。

  • 环境健康状况(Airflow 监控 DAG)

    • 在 4 小时的时间段内,成功率低于 90%

    • 由于环境过载或发生故障,失败可能意味着 Pod 被逐出或工作器终止。环境运行状况时间轴上的红色区域通常与各个环境组件的其他健康条形中的红色区域相关。通过查看 Monitoring 信息中心内的其他指标,确定根本原因。

  • 数据库健康状况

    • 在 4 小时的时间段内,成功率低于 95%

    • 故障意味着与 Airflow 数据库的连接出现问题,原因可能是数据库过载而发生数据库崩溃或停机(例如,CPU 或内存用量过高,或者连接到数据库时的延迟时间较长)。这些症状最常见的原因是次最佳 DAG,例如当 DAG 使用许多全局定义的 Airflow 或环境变量时。查看 SQL 数据库资源使用情况指标以确定根本原因。您还可以检查调度器日志中是否存在与数据库连接相关的错误

  • 数据库 CPU 和内存用量

    • 在 12 小时的时间段内,CPU 或内存的平均使用率超过 80%

    • 数据库可能已过载。分析 DAG 运行与数据库 CPU 或内存用量高峰之间的关系。

  • 调度程序检测信号

    • 在 4 小时的时间段内,成功率低于 90%

    • 为调度器分配更多资源,或将调度器的数量从 1 增加到 2(推荐)。

  • 已终止的僵尸任务

    • 每 24 小时内只能执行一项僵尸任务

    • 执行僵尸任务的最常见原因是环境的集群中缺少 CPU 或内存资源。查看工作器资源使用情况图表并为工作器分配更多资源,或延长僵尸任务超时时间,以便调度程序在将任务视为僵尸之前等待更长时间。

  • 工作器容器重启

    • 每 24 小时重启多次

    • 最常见的原因是缺少工作器内存或存储空间。请查看工作器资源使用情况,并为工作器分配更多内存或存储空间。如果缺少资源不是造成工作器重启的原因,请查看排查工作器重启突发事件,并使用 Logging 查询查找工作器重启的原因。

创建通知渠道

按照创建通知渠道中列出的说明创建电子邮件通知渠道。

如需详细了解通知渠道,请参阅管理通知渠道

创建提醒政策

根据本教程前面部分中提供的基准创建提醒政策,以便持续监控指标的值,并在这些指标违反条件时收到通知。

控制台

您可以点击相应项角落的铃铛图标,为 Monitoring 信息中心中显示的每个指标设置提醒:

为监控信息中心上显示的指标创建提醒
图 1.为监控信息中心上显示的指标创建提醒(点击可放大)
  1. 在 Monitoring 信息中心找到要监控的每个指标,然后点击指标项角落的铃铛图标。此时将打开创建提醒政策页面。

  2. 转换数据部分中,执行以下操作:

    1. 按照指标的提醒政策配置中的说明,配置在每个时序内部分。

    2. 点击下一步,然后按照指标的提醒政策配置中的说明配置配置提醒触发器部分。

  3. 点击下一步

  4. 配置通知。展开通知渠道菜单,然后选择您在上一步中创建的通知渠道。

  5. 点击 OK(确定)。

  6. 为提醒政策命名部分中,填写提醒政策名称字段。为每个指标使用描述性名称。按照指标的提醒政策配置中的说明,使用“为提醒政策命名”值。

  7. 点击下一步

  8. 查看提醒政策,然后点击创建政策

环境健康状况(Airflow 监控 DAG)指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 健康状况良好
  • API:composer.googleapis.com/environment/healthy
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:true 分数
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:低于阈值
    • 阈值:90
    • 条件名称:环境健康状况
  • 配置通知并完成提醒:

    • 将提醒政策命名为“Airflow Environment Health”

数据库运行状况指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库运行状况
  • API:composer.googleapis.com/environment/database_health
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:true 分数
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:低于阈值
    • 阈值:95
    • 条件名称:数据库健康状况
  • 配置通知并完成提醒:

    • 将提醒政策命名为:Airflow Database Health

数据库 CPU 用量指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库 CPU 利用率
  • API:composer.googleapis.com/environment/database/cpu/utilization
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:12
    • 自定义单位:小时
    • 滚动窗口函数:均值
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:高于阈值
    • 阈值:80
    • 条件名称:数据库 CPU 使用条件
  • 配置通知并完成提醒:

    • 将提醒政策命名为:Airflow Database CPU Usage

数据库 CPU 用量指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 数据库内存利用率
  • API:composer.googleapis.com/environment/database/memory/utilization
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:12
    • 自定义单位:小时
    • 滚动窗口函数:均值
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:高于阈值
    • 阈值:80
    • 条件名称:数据库内存用量
  • 配置通知并完成提醒:

    • 将提醒政策命名为:Airflow Database Memory Usage

调度器检测信号指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 调度器检测信号
  • API:composer.googleapis.com/environment/scheduler_heartbeat_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:自定义
    • 自定义值:4
    • 自定义单位:小时
    • 滚动窗口函数:count
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:低于阈值
    • 阈值:216

      1. 如需获取此数字,您可以在 Metrics Explorer 查询编辑器中运行汇总 _scheduler_heartbeat_count_mean 值的查询。
    • 条件名称:调度器检测信号条件

  • 配置通知并完成提醒:

    • 将提醒政策命名为:Airflow 调度程序检测信号

终止的僵尸任务数指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 已终止的僵尸任务
  • API:composer.googleapis.com/environment/zombie_task_killed_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:1 天
    • 滚动窗口函数:sum
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:高于阈值
    • 阈值:1
    • 条件名称:僵尸任务条件
  • 配置通知并完成提醒:

    • 将提醒政策命名为“Airflow Zombie Tasks”。

工作器容器重启指标 - 提醒政策配置

  • 指标名称:Cloud Composer 环境 - 已终止的僵尸任务
  • API:composer.googleapis.com/environment/zombie_task_killed_count
  • 过滤条件:

    environment_name = [ENVIRONMENT_NAME]
    location = [CLUSTER_LOCATION]
    
  • 转换数据 > 在每个时序内:

    • 滚动窗口:1 天
    • 滚动窗口函数:sum
  • 配置提醒触发器:

    • 条件类型:阈值
    • 提醒触发器:任何违反时序的情况
    • 阈值位置:高于阈值
    • 阈值:1
    • 条件名称:僵尸任务条件
  • 配置通知并完成提醒:

    • 将提醒政策命名为“Airflow Zombie Tasks”。

Terraform

运行一个 Terraform 脚本,该脚本会创建一个电子邮件通知渠道,并根据本教程中提供的关键指标各自的基准上传提醒政策:

  1. 将示例 Terraform 文件保存在本地计算机上。
  2. 请替换以下内容:

    • PROJECT_ID:您的项目 ID。例如 example-project
    • EMAIL_ADDRESS:在触发提醒时必须通知的电子邮件地址。
    • ENVIRONMENT_NAME:您的 Cloud Composer 环境的名称。例如 example-composer-environment
    • CLUSTER_NAME:您的环境集群名称,可在 Google Cloud 控制台的“环境配置”>“资源”>“GKE 集群”下找到。
resource "google_monitoring_notification_channel" "basic" {
  project      = "PROJECT_ID"
  display_name = "Test Notification Channel"
  type         = "email"
  labels = {
    email_address = "EMAIL_ADDRESS"
  }
  # force_delete = false
}

resource "google_monitoring_alert_policy" "environment_health_metric" {
  project      = "PROJECT_ID"
  display_name = "Airflow Environment Health"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Environment health condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/healthy\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 0.9
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_FRACTION_TRUE"
      }
    }
  }

}

resource "google_monitoring_alert_policy" "database_health_metric" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database Health"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database health condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database_health\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 0.95
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_FRACTION_TRUE"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_database_cpu_usage" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database CPU Usage"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database CPU usage condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database/cpu/utilization\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 80
      aggregations {
        alignment_period   = "43200s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_database_memory_usage" {
  project      = "PROJECT_ID"
  display_name = "Airflow Database Memory Usage"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Database memory usage condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/database/memory/utilization\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 80
      aggregations {
        alignment_period   = "43200s"
        per_series_aligner = "ALIGN_MEAN"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_scheduler_heartbeat" {
  project      = "PROJECT_ID"
  display_name = "Airflow Scheduler Heartbeat"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Scheduler heartbeat condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/scheduler_heartbeat_count\" AND resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_LT"
      threshold_value = 216 // Threshold is 90% of the average for composer.googleapis.com/environment/scheduler_heartbeat_count metric in an idle environment
      aggregations {
        alignment_period   = "14400s"
        per_series_aligner = "ALIGN_COUNT"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_zombie_task" {
  project      = "PROJECT_ID"
  display_name = "Airflow Zombie Tasks"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Zombie tasks condition"
    condition_threshold {
      filter     = "resource.type = \"cloud_composer_environment\" AND metric.type=\"composer.googleapis.com/environment/zombie_task_killed_count\" AND  resource.label.environment_name=\"ENVIRONMENT_NAME\""
      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 1
      aggregations {
        alignment_period   = "86400s"
        per_series_aligner = "ALIGN_SUM"
      }
    }
  }
}

resource "google_monitoring_alert_policy" "alert_worker_restarts" {
  project      = "PROJECT_ID"
  display_name = "Airflow Worker Restarts"
  combiner     = "OR"
  notification_channels = [google_monitoring_notification_channel.basic.name] // To manually add a notification channel add it with the syntax "projects/[PROJECT_ID]/notificationChannels/[CHANNEL_ID]"
  conditions {
    display_name = "Worker container restarts condition"
    condition_threshold {
      filter     = "resource.type = \"k8s_container\" AND (resource.labels.cluster_name = \"CLUSTER_NAME\" AND resource.labels.container_name = monitoring.regex.full_match(\"airflow-worker|base\") AND resource.labels.pod_name = monitoring.regex.full_match(\"airflow-worker-.*|airflow-k8s-worker-.*\")) AND metric.type = \"kubernetes.io/container/restart_count\""

      duration   = "60s"
      comparison = "COMPARISON_GT"
      threshold_value = 1
      aggregations {
        alignment_period   = "86400s"
        per_series_aligner = "ALIGN_RATE"
      }
    }
  }
}

测试提醒政策

本部分介绍如何测试创建的提醒政策并解读结果。

上传示例 DAG

本教程中提供的示例 DAG memory_consumption_dag.py 模拟了密集型工作器内存利用率。此 DAG 包含 4 项任务,每项任务都会将数据写入示例字符串,并占用 380 MB 的内存。示例 DAG 按计划每 2 分钟运行一次,将在您将其上传到 Composer 环境后自动开始运行。

将以下示例 DAG 上传到您在先前步骤中创建的环境:

from datetime import datetime
import sys
import time

from airflow import DAG
from airflow.operators.python import PythonOperator


def ram_function():
    data = ""
    start = time.time()
    for i in range(38):
        data += "a" * 10 * 1000**2
        time.sleep(0.2)
        print(f"{i}, {round(time.time() - start, 4)}, {sys.getsizeof(data) / (1000 ** 3)}")
    print(f"Size={sys.getsizeof(data) / (1000 ** 3)}GB")
    time.sleep(30 - (time.time() - start))
    print(f"Complete in {round(time.time() - start, 2)} seconds!")


with DAG(
    dag_id="memory_consumption_dag",
    start_date=datetime(2023, 1, 1, 1, 1, 1),
    schedule="1/2 * * * *",
    catchup=False,
) as dag:
    for i in range(4):
        PythonOperator(
            task_id=f"task_{i+1}",
            python_callable=ram_function,
            retries=0,
            dag=dag,
        )

解读 Monitoring 中的提醒和指标

在示例 DAG 开始运行后,等待大约 10 分钟再评估测试结果:

  1. 检查您的电子邮件邮箱,验证您是否收到了 Google Cloud Alerting 发送的主题行以 [ALERT] 开头的通知。此消息的内容包含提醒政策突发事件详细信息。

  2. 点击电子邮件通知中的查看突发事件按钮。您会被重定向到 Metrics Explorer。查看提醒突发事件的详细信息:

    提醒突发事件的详细信息
    图 2.提醒突发事件的详细信息(点击可放大)

    突发事件指标图表表明您创建的指标超出了阈值 1,这意味着 Airflow 检测到并终止了 1 项以上的僵尸任务。

  3. 在 Cloud Composer 环境中,转到 Monitoring 标签页,打开 DAG 统计信息部分,然后找到 Zombie tasks killed 图表:

    僵尸任务图表
    图 3.僵尸任务图表(点击可放大)

    该图表表明,Airflow 在运行示例 DAG 的前 10 分钟内就终止了大约 20 个僵尸任务。

  4. 根据基准测试和纠正措施,执行僵尸任务的最常见原因是缺少工作器内存或 CPU。通过分析工作器资源利用率来确定僵尸任务的根本原因。

    在 Monitoring 信息中心内打开“工作器”部分,并查看工作器 CPU 和内存使用情况指标:

    工作器 CPU 和内存使用情况指标
    图 4. 工作器 CPU 和内存使用情况指标(点击可放大)

    工作器 CPU 总使用率图表表明,工作器 CPU 使用率始终低于可用总上限的 50%,因此可用的 CPU 就足够了。工作器总内存用量图表显示,运行示例 DAG 会导致达到可分配的内存上限,接近图表所示的总内存上限的 75%(GKE 预留前 4 GiB 内存的 25%,并在每个节点上额外预留 100 MiB 内存来处理 Pod 逐出)。

    您可以得出结论,工作器缺少成功运行示例 DAG 的内存资源。

优化您的环境并评估其性能

根据对工作器资源利用率的分析,您需要为工作器分配更多内存,以便 DAG 中的所有任务能够成功完成。

  1. 在 Composer 环境中,打开 DAG 标签页,点击示例 DAG 的名称 (memory_consumption_dag),然后点击暂停 DAG

  2. 分配额外的工作器内存:

    1. 在“环境配置”标签页中,找到资源 > 工作负载配置,然后点击修改

    2. Worker 项中,提高内存限制。在本教程中,请使用 3.25 GB。

    3. 保存更改,然后等待几分钟,让工作器重启。

  3. 打开 DAGs 标签页,点击示例 DAG 的名称 (memory_consumption_dag),然后点击 Unpause DAG

转到 Monitoring 并验证更新工作器资源限制后是否没有新的僵尸任务出现:

内存限制发生变化后的僵尸任务图表
图 5.内存限制发生变化后的僵尸任务图表(点击可放大)

摘要

在本教程中,您了解了关键的环境级运行状况和性能指标、如何为每个指标设置提醒政策,以及如何将每个指标解读为纠正措施。然后,您运行了一个示例 DAG,借助提醒和 Monitoring 图表确定了环境运行状况问题的根本原因,并通过为工作器分配了更多内存来优化环境。但是,建议先优化 DAG 以减少工作器资源消耗,因为不可能将资源增加到超过特定阈值。

清理

为避免系统因本教程中使用的资源向您的 Google Cloud 帐号收取费用,请删除包含这些资源的项目,或者保留项目并删除各个资源。

删除项目

  1. 在 Google Cloud 控制台中,进入管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

控制台

  1. 删除 Cloud Composer 环境。在此过程中,您还将删除环境的存储桶。
  2. 删除您在 Cloud Monitoring 中创建的每个提醒政策

Terraform

  1. 请确保您的 Terraform 脚本不包含项目仍需要的资源的条目。例如,您可能想使某些 API 保持启用状态,同时仍分配 IAM 权限(如果您向 Terraform 脚本添加了此类定义)。
  2. 运行 terraform destroy
  3. 手动删除环境的存储桶。Cloud Composer 不会自动将其删除。您可以通过 Google Cloud 控制台或 Google Cloud CLI 执行此操作。

后续步骤