解決更新及升級環境的相關問題

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面提供疑難排解資訊,協助您解決更新或升級 Cloud Composer 環境時可能遇到的問題。

如需建立環境的疑難排解資訊,請參閱「解決建立環境的相關問題」。

更新 Cloud Composer 環境時,大多數問題都是由下列原因造成:

  • 服務帳戶權限問題
  • PyPI 依附元件問題
  • Airflow 資料庫大小

權限不足,無法更新或升級環境

如果權限不足,導致 Cloud Composer 無法更新或升級環境,系統會輸出下列錯誤訊息:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

解決方法:如「存取權控管」一文所述,將角色指派給您的帳戶和環境的服務帳戶。

環境的服務帳戶權限不足

建立 Cloud Composer 環境時,您會指定服務帳戶,該帳戶會執行環境的大部分作業。如果這個服務帳戶沒有足夠的權限執行要求作業,Cloud Composer 會輸出錯誤:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

解決方法:如「存取權控管」一文所述,將角色指派給 Google 帳戶和環境的服務帳戶。

Airflow 資料庫過大,無法執行作業

如果 Airflow 資料庫過大,升級作業可能會失敗。

如果 Airflow 資料庫大小超過 20 GB,Cloud Composer 會輸出下列錯誤:

Airflow database uses more than 20 GB. Please clean the database before upgrading.

解決方案:按照「清除 Airflow 資料庫」一文的說明,清除 Airflow 資料庫。

由於 PyPI 套件衝突,升級至新版 Cloud Composer 失敗

升級已安裝自訂 PyPI 套件的環境時,可能會遇到與 PyPI 套件衝突相關的錯誤。這是因為新的 Airflow 建構版本包含較新版本的預先安裝套件。這可能會導致依附元件與您在環境中安裝的 PyPI 套件發生衝突。

解決方法

  • 如要取得套件衝突的詳細資訊,請執行升級檢查
  • 放寬已安裝自訂 PyPI 套件的版本限制。舉例來說,請將版本指定為 >=1.0.1,而不是 ==1.0.1
  • 如要進一步瞭解如何變更版本需求,解決依附元件衝突問題,請參閱 pip 說明文件

檢查遷移失敗警告

將 Airflow 升級至較新版本時,有時會對 Airflow 資料庫套用新的限制。如果無法套用這些限制,Airflow 會建立新資料表,儲存無法套用限制的資料列。在重新命名或捨棄已移動的資料表之前,Airflow UI 會顯示警告訊息。

解決方法

您可以使用下列兩個 DAG 檢查已遷移的資料,並重新命名資料表。

list_moved_tables_after_upgrade_dag DAG 會列出無法套用限制的每個資料表,以及從這些資料表移出的資料列。檢查資料並決定是否要保留。如要保留資料,請手動修正 Airflow 資料庫中的資料。例如,重新加入含有正確資料的資料列。

如果不需要這些資料,或已修正問題,可以執行 rename_moved_tables_after_upgrade_dag DAG。這個 DAG 會重新命名已移動的資料表。系統不會刪除表格及其資料,因此您稍後可以查看資料。

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

後續步驟