Risoluzione dei problemi relativi agli upgrade e agli aggiornamenti degli ambienti

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina fornisce informazioni per la risoluzione dei problemi che potresti riscontrare durante l'aggiornamento o l'upgrade degli ambienti Cloud Composer.

Per informazioni sulla risoluzione dei problemi relativi alla creazione di ambienti, consulta Risolvere i problemi di creazione dell'ambiente.

Quando gli ambienti Cloud Composer vengono aggiornati, la maggior parte dei problemi perché si verificano per i seguenti motivi:

  • Problemi relativi alle autorizzazioni dell'account di servizio
  • Problemi di dipendenza PyPI
  • Dimensione del database Airflow

Autorizzazioni insufficienti per l'aggiornamento o l'upgrade di un ambiente

Se Cloud Composer non riesce ad aggiornare o eseguire l'upgrade di un ambiente a causa di autorizzazioni insufficienti, viene visualizzato il seguente messaggio di errore:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Soluzione: assegna i ruoli sia al tuo account sia all'account di servizio del tuo ambiente come descritto in Controllo dell'accesso.

L'account di servizio dell'ambiente dispone di autorizzazioni insufficienti

Quando crei un ambiente Cloud Composer, specifichi un account di servizio che esegue i nodi del cluster GKE dell'ambiente. Se questo l'account di servizio non dispone di autorizzazioni sufficienti per l'operazione richiesta Cloud Composer restituisce un errore:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Soluzione: assegna i ruoli sia al tuo account sia all'account di servizio del tuo ambiente come descritto in Controllo dell'accesso.

La dimensione del database Airflow è troppo grande per eseguire l'operazione

Un'operazione di upgrade di Cloud Composer potrebbe non riuscire perché le dimensioni il database Airflow è troppo grande per la riuscita delle operazioni di upgrade.

Se la dimensione del database Airflow è superiore a 16 GB, Cloud Composer restituisce il seguente errore:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Soluzione: esegui la pulizia del database Airflow, come descritto in Manutenzione del database Airflow.

L'upgrade a una nuova versione di Cloud Composer non riesce a causa di conflitti dei pacchetti PyPI

Quando esegui l'upgrade di un ambiente con pacchetti PyPI personalizzati installati, potresti riscontrare errori relativi a conflitti di pacchetti PyPI. Questo potrebbe accadere perché la nuova immagine Cloud Composer contiene versioni più recenti dei pacchetti preinstallati che causano conflitti di dipendenza con i pacchetti PyPI installati nel tuo ambiente.

Soluzione:

  • Per informazioni dettagliate sui conflitti dei pacchetti, esegui un controllo di upgrade.
  • Allenta i vincoli di versione per i pacchetti PyPI personalizzati installati. Ad esempio: anziché specificare una versione come ==1.0.1, specificala come >=1.0.1.
  • Per ulteriori informazioni sulla modifica dei requisiti delle versioni per risolvere le dipendenze in conflitto, consulta la documentazione di pip.

Non è possibile eseguire l'upgrade di un ambiente a una versione ancora supportata

È possibile eseguire l'upgrade degli ambienti Cloud Composer solo diverse versioni più recenti e precedenti.

Le limitazioni delle versioni per la creazione di nuovi ambienti e l'upgrade di quelli esistenti sono diverse. La versione di Cloud Composer che scegli quando crei un nuovo ambiente potrebbe non essere disponibile durante l'upgrade degli ambienti esistenti.

Puoi eseguire l'operazione di upgrade utilizzando Google Cloud CLI, l'API o con Terraform. Nella console Google Cloud, come opzioni di upgrade sono disponibili solo le versioni più recenti.

La mancanza di connettività al DNS può causare problemi durante l'esecuzione di upgrade o aggiornamenti

Questi problemi di connettività potrebbero comportare voci di log come questa:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Di solito significa che non esiste una route per il DNS, quindi assicurati che metadata.google.internal Il nome DNS può essere risolto in un indirizzo IP dalle reti di cluster, pod e servizi. Verifica di aver attivato l'accesso privato Google all'interno del VPC (nel progetto host o di servizio) in cui è stato creato il tuo ambiente.

Ulteriori informazioni:

La CPU dell'attivatore supera il limite di 1 vCPU

Cloud Composer 2 nelle versioni 2.4.4 e successive introduce una strategia di allocazione delle risorse degli attivatori diversa per migliorare la scalabilità delle prestazioni. Se riscontri un errore relativo alla CPU dell'attivatore durante l'esecuzione di un aggiornamento dell'ambiente, significa che gli attivatori attuali sono configurati per utilizzare più di 1 vCPU per attivatore.

Soluzione:

Controllare gli avvisi di migrazione non riuscita

Quando esegui l'upgrade di Airflow a una versione successiva, a volte vengono applicati nuovi vincoli al database Airflow. Se questi vincoli non possono essere applicati, Airflow crea nuove tabelle per archiviare le righe per le quali i vincoli non possono . La UI di Airflow mostra un messaggio di avviso fino a quando le tabelle di dati spostate vengono rinominati o eliminati.

Soluzione:

Puoi utilizzare i due DAG seguenti per ispezionare i dati spostati e rinominare le tabelle.

Il DAG list_moved_tables_after_upgrade_dag elenca le righe spostate da ogni tabella in cui non è stato possibile applicare i vincoli. Esamina i dati e decidi se vuoi mantenerlo. Per conservarli, devi correggere manualmente i dati in il database Airflow. Ad esempio, aggiungendo le righe di nuovo con i dati corretti.

Se i dati non ti servono o se li hai già corretti, puoi eseguire rename_moved_tables_after_upgrade_dag DAG. Questo DAG rinomina le tabelle spostate. Le tabelle e i relativi dati non vengono eliminati, quindi puoi esaminarli in un secondo momento in un secondo momento.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Passaggi successivi