Risoluzione dei problemi relativi agli upgrade e agli aggiornamenti degli ambienti

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa pagina fornisce informazioni per la risoluzione dei problemi che potresti riscontrare durante l'aggiornamento o l'upgrade degli ambienti Cloud Composer.

Per informazioni sulla risoluzione dei problemi relativi alla creazione degli ambienti, consulta Risoluzione dei problemi di creazione dell'ambiente.

Quando gli ambienti Cloud Composer vengono aggiornati, la maggior parte dei problemi si verifica per i seguenti motivi:

  • Problemi relativi alle autorizzazioni dell'account di servizio
  • Problemi di dipendenza PyPI
  • Dimensioni del database Airflow

Autorizzazioni insufficienti per aggiornare o eseguire l'upgrade di un ambiente

Se Cloud Composer non riesce ad aggiornare o eseguire l'upgrade di un ambiente a causa di autorizzazioni insufficienti, viene visualizzato il seguente messaggio di errore:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Soluzione: assegna i ruoli sia al tuo account sia all'account di servizio del tuo ambiente come descritto in Controllo dell'accesso.

L'account di servizio dell'ambiente dispone di autorizzazioni insufficienti

Quando crei un ambiente Cloud Composer, specifichi un account di servizio che esegue la maggior parte delle operazioni dell'ambiente. Se questo account di servizio non dispone di autorizzazioni sufficienti per l'operazione richiesta, Cloud Composer genera un errore:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Soluzione: assegna i ruoli al tuo Account Google e all'account di servizio del tuo ambiente come descritto in Controllo dell'accesso.

Le dimensioni del database Airflow sono troppo grandi per eseguire l'operazione

Un'operazione di upgrade potrebbe non riuscire perché le dimensioni del database Airflow sono troppo grandi per il buon esito delle operazioni di upgrade.

Se le dimensioni del database Airflow superano i 20 GB, Cloud Composer genera il seguente errore:

Airflow database uses more than 20 GB. Please clean the database before upgrading.

Soluzione: esegui la pulizia del database Airflow, come descritto in Pulizia del database Airflow.

L'upgrade a una nuova versione di Cloud Composer non riesce a causa di conflitti dei pacchetti PyPI

Quando esegui l'upgrade di un ambiente con pacchetti PyPI personalizzati installati, potresti riscontrare errori relativi a conflitti dei pacchetti PyPI. Questo potrebbe accadere perché la nuova compilazione di Airflow contiene versioni successive dei pacchetti preinstallati. Ciò può causare conflitti di dipendenza con i pacchetti PyPI che hai installato nel tuo ambiente.

Soluzione:

  • Per informazioni dettagliate sui conflitti dei pacchetti, esegui un controllo di upgrade.
  • Allenta i vincoli di versione per i pacchetti PyPI personalizzati installati. Ad esempio, invece di specificare una versione come ==1.0.1, specificala come >=1.0.1.
  • Per ulteriori informazioni sulla modifica dei requisiti delle versioni per risolvere le dipendenze in conflitto, consulta la documentazione di pip.

Controllare gli avvisi di migrazione non riuscita

Quando esegui l'upgrade di Airflow a una versione successiva, a volte vengono applicati nuovi vincoli al database Airflow. Se non è possibile applicare questi vincoli, Airflow crea nuove tabelle per memorizzare le righe per le quali non è stato possibile applicare i vincoli. L'interfaccia utente di Airflow mostra un messaggio di avviso finché le tabelle di dati spostate non vengono rinominate o eliminate.

Soluzione:

Puoi utilizzare i due DAG seguenti per ispezionare i dati spostati e rinominare le tabelle.

Il DAG list_moved_tables_after_upgrade_dag elenca le righe spostate da ogni tabella in cui non è stato possibile applicare i vincoli. Controlla i dati e decidi se conservarli. Per conservarlo, devi correggere manualmente i dati nel database Airflow. Ad esempio, aggiungendo di nuovo le righe con i dati corretti.

Se non hai bisogno dei dati o se hai già risolto il problema, puoi eseguire il DAGrename_moved_tables_after_upgrade_dag. Questo DAG rinomina le tabelle spostate. Le tabelle e i relativi dati non vengono eliminati, quindi puoi esaminarli in un secondo momento.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Passaggi successivi