Risoluzione dei problemi relativi agli upgrade e agli aggiornamenti degli ambienti

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina fornisce informazioni per la risoluzione dei problemi che potresti durante l'aggiornamento o l'upgrade degli ambienti Cloud Composer.

Per informazioni sulla risoluzione dei problemi relativi alla creazione di ambienti, consulta Risolvere i problemi di creazione dell'ambiente.

Quando gli ambienti Cloud Composer vengono aggiornati, la maggior parte dei problemi perché si verificano per i seguenti motivi:

  • Problemi relativi alle autorizzazioni dell'account di servizio
  • Problemi di dipendenza PyPI
  • Dimensione del database Airflow

Autorizzazioni insufficienti per l'aggiornamento o l'upgrade di un ambiente

Se Cloud Composer non può aggiornare o eseguire l'upgrade di un ambiente a causa autorizzazioni insufficienti, viene visualizzato il seguente messaggio di errore:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Soluzione: assegna i ruoli sia al tuo account sia all'account di servizio del tuo ambiente come descritto in Controllo dell'accesso.

L'account di servizio dell'ambiente non dispone di autorizzazioni sufficienti

Quando crei un ambiente Cloud Composer, specifichi un servizio che esegue i nodi del cluster GKE dell'ambiente. Se questo l'account di servizio non dispone di autorizzazioni sufficienti per l'operazione richiesta Cloud Composer restituisce un errore:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Soluzione: assegna i ruoli sia al tuo account sia all'account di servizio del tuo ambiente come descritto in Controllo dell'accesso.

La dimensione del database Airflow è troppo grande per eseguire l'operazione

Un'operazione di upgrade di Cloud Composer potrebbe non riuscire perché le dimensioni il database Airflow è troppo grande per la riuscita delle operazioni di upgrade.

Se la dimensione del database Airflow è superiore a 16 GB, Cloud Composer restituisce il seguente errore:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Soluzione: esegui la pulizia del database Airflow, come descritto in Manutenzione del database Airflow.

L'upgrade a una nuova versione di Cloud Composer non va a buon fine a causa di conflitti tra i pacchetti PyPI

Quando esegui l'upgrade di un ambiente con pacchetti PyPI personalizzati installati, potresti riscontrare errori relativi a conflitti di pacchetti PyPI. Ciò potrebbe accadere perché il nuovo L'immagine Cloud Composer contiene versioni più recenti di pacchetti preinstallati che causare conflitti di dipendenza con i pacchetti PyPI installati completamente gestito di Google Cloud.

Soluzione:

  • Per ottenere informazioni dettagliate sui conflitti di pacchetto, esegui un controllo degli upgrade.
  • Allenta i vincoli di versione per i pacchetti PyPI personalizzati installati. Ad esempio: anziché specificare una versione come ==1.0.1, specificala come >=1.0.1.
  • Per ulteriori informazioni sulla modifica dei requisiti di versione da risolvere sono in conflitto con le dipendenze, documentazione pip.

Non è possibile eseguire l'upgrade di un ambiente a una versione ancora supportata

È possibile eseguire l'upgrade degli ambienti Cloud Composer solo diverse versioni più recenti e precedenti.

Le limitazioni della versione per la creazione di nuovi ambienti e l'upgrade di quelli esistenti ambienti sono diversi. La versione di Cloud Composer che scegli quando si crea un nuovo ambiente potrebbero non essere disponibili durante l'upgrade di quelle esistenti ambienti cloud-native.

Puoi eseguire l'operazione di upgrade utilizzando Google Cloud CLI, l'API o con Terraform. Nella console Google Cloud sono disponibili solo le versioni più recenti come scelte di upgrade.

La mancanza di connettività al DNS può causare problemi durante l'esecuzione di upgrade o aggiornamenti

Tali problemi di connettività potrebbero causare la visualizzazione di voci di log simili alla seguente:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Di solito significa che non esiste una route per il DNS, quindi assicurati che metadata.google.internal Il nome DNS può essere risolto in un indirizzo IP dalle reti di cluster, pod e servizi. Controlla se hai attivato l'accesso privato Google all'interno del VPC (nel progetto host o di servizio) in cui viene creato l'ambiente.

Ulteriori informazioni:

La CPU del triggerer supera il limite di 1 vCPU

Cloud Composer 2 nelle versioni 2.4.4 e successive introduce una diversa strategia di allocazione delle risorse dell'attivatore per migliorare la scalabilità del rendimento. Se si verifica un errore relativo alla CPU del triggerer durante l'esecuzione di un ambiente aggiornato, significa che gli attivatori correnti configurate per utilizzare più di 1 vCPU per triggerer.

Soluzione:

Ispeziona gli avvisi di migrazione non riuscita

Quando esegui l'upgrade di Airflow a una versione successiva, a volte vengono aggiunti nuovi vincoli applicata al database Airflow. Se questi vincoli non possono essere applicati, Airflow crea nuove tabelle per archiviare le righe per le quali i vincoli non possono . La UI di Airflow mostra un messaggio di avviso fino a quando le tabelle di dati spostate vengono rinominati o eliminati.

Soluzione:

Puoi utilizzare i due DAG seguenti per ispezionare i dati spostati e rinominare tabelle.

Il DAG list_moved_tables_after_upgrade_dag elenca le righe spostate da tutte le tabelle in cui non era possibile applicare vincoli. Esamina i dati e decidi se vuoi mantenerlo. Per conservarli, devi correggere manualmente i dati in il database Airflow. Ad esempio, aggiungendo le righe di nuovo con i dati corretti.

Se i dati non ti servono o se li hai già corretti, puoi eseguire rename_moved_tables_after_upgrade_dag DAG. Questo DAG rinomina le tabelle spostate. Le tabelle e i relativi dati non vengono eliminati, quindi puoi esaminarli in un secondo momento in un secondo momento.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Passaggi successivi