Risoluzione dei problemi relativi agli upgrade e agli aggiornamenti degli ambienti

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina fornisce informazioni sulla risoluzione dei problemi che potresti riscontrare durante l'aggiornamento o l'upgrade degli ambienti Cloud Composer.

Per informazioni sulla risoluzione dei problemi relativi alla creazione di ambienti, consulta Risoluzione dei problemi relativi alla creazione dell'ambiente.

Quando gli ambienti Cloud Composer vengono aggiornati, la maggior parte dei problemi si verifica per i seguenti motivi:

  • Problemi relativi alle autorizzazioni dell'account di servizio
  • Problemi di dipendenza PyPI
  • Dimensione del database Airflow

Autorizzazioni insufficienti per l'aggiornamento o l'upgrade di un ambiente

Se Cloud Composer non può aggiornare o eseguire l'upgrade di un ambiente a causa di autorizzazioni insufficienti, viene visualizzato il seguente messaggio di errore:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Soluzione: assegna i ruoli sia al tuo account sia all'account di servizio del tuo ambiente come descritto in Controllo dell'accesso.

L'account di servizio dell'ambiente non dispone di autorizzazioni sufficienti

Quando crei un ambiente Cloud Composer, specifichi un account di servizio che esegue i nodi del cluster GKE dell'ambiente. Se questo account di servizio non dispone di autorizzazioni sufficienti per l'operazione richiesta, Cloud Composer restituisce un errore:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Soluzione: assegna i ruoli sia al tuo account sia all'account di servizio del tuo ambiente come descritto in Controllo dell'accesso.

La dimensione del database Airflow è troppo grande per eseguire l'operazione

Un'operazione di upgrade di Cloud Composer potrebbe non riuscire perché la dimensione del database Airflow è troppo grande per la riuscita delle operazioni di upgrade.

Se la dimensione del database Airflow è superiore a 16 GB, Cloud Composer restituisce il seguente errore:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Soluzione: esegui la pulizia del database Airflow, come descritto in Manutenzione del database Airflow.

L'upgrade a una nuova versione di Cloud Composer non va a buon fine a causa di conflitti tra i pacchetti PyPI

Quando esegui l'upgrade di un ambiente con pacchetti PyPI personalizzati installati, potresti riscontrare errori relativi a conflitti di pacchetti PyPI. Questo può accadere perché la nuova immagine Cloud Composer contiene versioni più recenti di pacchetti preinstallati che causano conflitti di dipendenza con pacchetti PyPI installati nel tuo ambiente.

Soluzione:

  • Per informazioni dettagliate sui conflitti di pacchetti, esegui un controllo degli upgrade.
  • Allenta i vincoli di versione per i pacchetti PyPI personalizzati installati. Ad esempio, invece di specificare una versione come ==1.0.1, specificala come >=1.0.1.
  • Per ulteriori informazioni sulla modifica dei requisiti di versione per risolvere le dipendenze in conflitto, consulta la documentazione di pip.

Non è possibile eseguire l'upgrade di un ambiente a una versione ancora supportata

È possibile eseguire l'upgrade degli ambienti Cloud Composer solo a diverse versioni più recenti e precedenti.

Le limitazioni della versione per la creazione di nuovi ambienti e l'upgrade di ambienti esistenti sono diverse. La versione di Cloud Composer che scegli durante la creazione di un nuovo ambiente potrebbe non essere disponibile durante l'upgrade degli ambienti esistenti.

Puoi eseguire l'operazione di upgrade utilizzando Google Cloud CLI, API o Terraform. Nella console Google Cloud, sono disponibili solo le versioni più recenti come opzioni di upgrade.

La mancanza di connettività al DNS può causare problemi durante l'esecuzione di upgrade o aggiornamenti

Tali problemi di connettività potrebbero causare la visualizzazione di voci di log simili alla seguente:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Di solito significa che non esiste una route al DNS, quindi assicurati che il nome DNS metadata.google.internal possa essere risolto in un indirizzo IP dalle reti di cluster, pod e servizi. Controlla se hai attivato l'accesso privato Google all'interno del VPC (nel progetto host o di servizio) in cui viene creato l'ambiente.

Ulteriori informazioni:

La CPU del triggerer supera il limite di 1 vCPU

Cloud Composer 2 nelle versioni 2.4.4 e successive introduce una diversa strategia di allocazione delle risorse degli attivatori per migliorare la scalabilità delle prestazioni. Se si verifica un errore relativo alla CPU del triggerer quando esegui un aggiornamento dell'ambiente, significa che i triggerer attuali sono configurati in modo da utilizzare più di 1 vCPU per triggerer.

Soluzione:

Ispeziona gli avvisi di migrazione non riuscita

Quando esegui l'upgrade di Airflow a una versione successiva, a volte vengono applicati nuovi vincoli al database Airflow. Se non è possibile applicare questi vincoli, Airflow crea nuove tabelle per archiviare le righe per le quali non è stato possibile applicare i vincoli. La UI di Airflow mostra un messaggio di avviso fino a quando le tabelle di dati spostati non vengono rinominate o eliminate.

Soluzione:

Puoi utilizzare i due DAG seguenti per ispezionare i dati spostati e rinominare le tabelle.

Il DAG list_moved_tables_after_upgrade_dag elenca le righe spostate da ogni tabella in cui non è stato possibile applicare vincoli. Esamina i dati e decidi se vuoi conservarli. Per conservarli, devi correggere manualmente i dati nel database Airflow. Ad esempio, aggiungendo le righe di nuovo con i dati corretti.

Se non hai bisogno dei dati o se li hai già corretti, puoi eseguire il DAG rename_moved_tables_after_upgrade_dag. Questo DAG rinomina le tabelle spostate. Le tabelle e i relativi dati non vengono eliminati, quindi puoi esaminarli in un secondo momento.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Passaggi successivi