Résoudre les problèmes liés aux mises à jour et aux mises à niveau d'environnement

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Cette page fournit des informations de dépannage pour les problèmes que vous pouvez rencontrer lors de la mise à jour ou de la mise à niveau des environnements Cloud Composer.

Pour en savoir plus sur la création d'environnements, consultez la page Dépannage pour la création d'environnements.

Lorsque les environnements Cloud Composer sont mis à jour, la majorité des problèmes se produisent pour les raisons suivantes :

  • Problèmes d'autorisation de compte de service.
  • Problèmes de dépendance PyPI
  • Taille de la base de données Airflow

Autorisations insuffisantes pour mettre à jour ou mettre à niveau un environnement

Si Cloud Composer ne peut pas mettre à jour ou mettre à niveau un environnement en raison d'autorisations insuffisantes, le message d'erreur suivant s'affiche:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Solution: Attribuez des rôles à votre compte et au compte de service de votre environnement, comme décrit dans la section Contrôle des accès.

Le compte de service de l'environnement ne dispose pas des autorisations nécessaires

Lorsque vous créez un environnement Cloud Composer, vous spécifiez un compte de service qui effectue la plupart des opérations de l'environnement. Si ce compte de service ne dispose pas des autorisations nécessaires pour l'opération demandée, Cloud Composer génère une erreur:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Solution: Attribuez des rôles à votre compte Google et au compte de service de votre environnement, comme décrit dans la section Contrôle des accès.

La taille de la base de données Airflow est trop importante pour effectuer l'opération

Une opération de mise à niveau peut échouer, car la taille de la base de données Airflow est trop volumineuse.

Si la taille de la base de données Airflow est supérieure à 16 Go, Cloud Composer génère l'erreur suivante:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Solution: effectuez le nettoyage de la base de données Airflow, comme décrit dans la section Nettoyer la base de données Airflow.

Échec de la mise à niveau vers une nouvelle version de Cloud Composer en raison de conflits de packages PyPI

Lorsque vous mettez à niveau un environnement sur lequel des packages PyPI personnalisés sont installés, vous pouvez rencontrer des erreurs liées aux conflits de packages PyPI. Cela peut se produire, car la nouvelle image Cloud Composer contient des versions ultérieures des packages préinstallés. Cela peut entraîner des conflits de dépendance avec les packages PyPI que vous avez installés dans votre environnement.

Solution :

  • Pour obtenir des informations détaillées sur les conflits de packages, exécutez une vérification de mise à niveau.
  • Assouplissez les contraintes de version pour les packages PyPI personnalisés installés. Par exemple, au lieu de spécifier une version en tant que ==1.0.1, spécifiez-la en tant que >=1.0.1.
  • Pour en savoir plus sur la modification des exigences de version pour résoudre les conflits de dépendances, consultez la documentation de pip.

Il n'est pas possible de mettre à niveau un environnement vers une version toujours prise en charge.

Les environnements Cloud Composer ne peuvent être mis à niveau que vers plusieurs des dernières versions et versions précédentes.

Les limites de version pour la création d'environnements et la mise à niveau d'environnements existants sont différentes. La version Cloud Composer que vous choisissez lorsque vous créez un environnement peut ne pas être disponible lors de la mise à niveau d'environnements existants.

Vous pouvez effectuer l'opération de mise à niveau à l'aide de la Google Cloud CLI, de l'API ou de Terraform. Dans la console Google Cloud, seules les dernières versions sont disponibles comme options de mise à niveau.

L'absence de connectivité au DNS peut entraîner des problèmes lors des mises à niveau ou des mises à jour.

De tels problèmes de connectivité peuvent entraîner des entrées de journal comme suit:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Cela signifie généralement qu'il n'y a pas de route vers le DNS. Assurez-vous donc que le nom DNS metadata.google.internal peut être résolu en adresse IP à partir des réseaux de cluster, de pods et de services. Vérifiez si l'accès privé à Google est activé dans le VPC (dans le projet hôte ou de service) où votre environnement est créé.

Le processeur du déclencheur dépasse la limite de 1 vCPU

Les versions 2.4.4 et ultérieures de Cloud Composer introduisent une stratégie d'allocation des ressources de déclencheur différente pour améliorer la mise à l'échelle des performances. Si vous rencontrez une erreur liée au processeur du déclencheur lors de la mise à jour d'un environnement, cela signifie que vos déclencheurs actuels sont configurés pour utiliser plus d'un vCPU par déclencheur.

Solution :

Inspecter les avertissements de migration ayant échoué

Lorsque vous mettez à niveau Airflow vers une version ultérieure, de nouvelles contraintes sont parfois appliquées à la base de données Airflow. Si ces contraintes ne peuvent pas être appliquées, Airflow crée des tables pour stocker les lignes pour lesquelles les contraintes n'ont pas pu être appliquées. L'interface utilisateur d'Airflow affiche un message d'avertissement jusqu'à ce que les tables de données déplacées soient renommées ou supprimées.

Solution :

Vous pouvez utiliser les deux DAG suivants pour inspecter les données déplacées et renommer les tables.

Le DAG list_moved_tables_after_upgrade_dag liste les lignes qui ont été déplacées de chaque table où des contraintes n'ont pas pu être appliquées. Examinez les données et décidez si vous souhaitez les conserver. Pour le conserver, vous devez corriger manuellement les données dans la base de données Airflow. Par exemple, en ajoutant à nouveau les lignes avec les données correctes.

Si vous n'avez pas besoin des données ou si vous avez déjà résolu le problème, vous pouvez exécuter le DAG rename_moved_tables_after_upgrade_dag. Ce DAG renomme les tables déplacées. Les tables et leurs données ne sont pas supprimées. Vous pouvez donc les consulter ultérieurement.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Étape suivante