排查环境更新和升级问题

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页面针对更新或升级 Cloud Composer 环境时可能遇到的问题提供了问题排查信息。

如需了解与创建环境相关的问题排查信息,请参阅排查环境创建问题

在更新 Cloud Composer 环境时,导致大多数问题的原因如下:

  • 服务账号权限问题
  • PyPI 依赖项问题
  • Airflow 数据库的大小

权限不足,无法更新或升级环境

如果 Cloud Composer 因以下原因而无法更新或升级环境: 权限不足,则它会输出以下错误消息:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

解决方案:将角色同时分配给您的账号和服务账号 如访问权限控制中所述,对您的环境进行管理。

环境的服务账号权限不足

创建 Cloud Composer 环境时,您需要指定一项服务 运行环境的 GKE 集群节点的账号。如果 服务账号没有足够的权限来执行请求的操作, Cloud Composer 会输出错误:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

解决方案:将角色同时分配给您的账号和服务账号 如访问权限控制中所述,对您的环境进行管理。

Airflow 数据库太大,无法执行此操作

Cloud Composer 升级操作可能会失败, Airflow 数据库过大,导致升级操作无法成功。

如果 Airflow 数据库的大小超过 16 GB,Cloud Composer 会输出以下错误:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

解决方案:执行 Airflow 数据库清理,如 Airflow 数据库维护中所述。

由于 PyPI 软件包冲突,升级到新的 Cloud Composer 版本失败

将环境升级为 已安装的自定义 PyPI 软件包中,您可能会遇到 与 PyPI 软件包冲突相关的错误。这可能是因为新的 Cloud Composer 映像包含较新版本的预安装软件包, 导致与在容器映像中安装的 PyPI 软件包发生依赖项冲突 环境

解决方案

  • 如需获取有关软件包冲突的详细信息,请运行 升级检查
  • 放宽已安装的自定义 PyPI 软件包的版本限制条件。例如: 请将其指定为 >=1.0.1,而不是 ==1.0.1
  • 详细了解如何通过更改版本要求来 请参阅 pip 文档

无法将环境升级到仍受支持的版本

Cloud Composer 环境只能升级到 多个最新版本和之前的版本

创建新环境和升级现有环境的版本限制 环境也有所不同您选择的 Cloud Composer 版本 创建新环境时, 升级现有环境时可能不可用 环境

您可以使用 Google Cloud CLI、API 或 Terraform。在 Google Cloud 控制台中,仅提供最新版本 作为升级选项

与 DNS 的连接不足可能会导致在执行升级或更新时出现问题

此类连接问题可能会产生如下日志条目:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

这通常表示没有通向 DNS 的路由,因此请确保 metadata.google.internal DNS 名称可以从集群、Pod 和 Service 网络内部解析为 IP 地址。 检查您是否在 VPC 中(在宿主项目或服务项目中)启用了专用 Google 访问通道 和创建环境的位置

更多信息:

触发器 CPU 数量超过 1 个 vCPU 的限制

2.4.4 及更高版本中的 Cloud Composer 2 它引入了一种不同的触发器资源分配策略 以提高性能伸缩能力 如果您在执行环境时遇到与触发器 CPU 相关的错误 则表示您当前的触发器是 配置为每个触发器使用 1 个以上的 vCPU。

解决方案

查看迁移失败警告

将 Airflow 升级到更高版本时,有时会出现新的限制 应用于 Airflow 数据库。如果无法应用这些限制条件 Airflow 会创建新表来存储限制条件无法执行的行 。Airflow 界面会显示一条警告消息,直到移动的数据表 重命名或舍弃操作

解决方案

您可以使用以下两个 DAG 来检查移动的数据并重命名 表格。

list_moved_tables_after_upgrade_dag DAG 列出了从原位置移出的行 无法应用限制条件的每个表。检查数据并确定 来决定是否要保留如要保留,你需要手动修正 Airflow 数据库例如,通过添加包含正确数据的行。

如果您不需要数据,或者您已经修复了数据,则可以运行 rename_moved_tables_after_upgrade_dag DAG。此 DAG 会对移动的表进行重命名。 这些表及其数据不会被删除,因此您可以在 。

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

后续步骤