Résoudre les problèmes liés aux mises à jour et aux mises à niveau d'environnement

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page fournit des informations de dépannage pour les problèmes que vous pouvez rencontrer lors de la mise à jour ou de la mise à niveau des environnements Cloud Composer.

Pour en savoir plus sur la création d'environnements, consultez la page Dépannage pour la création d'environnements.

Lorsque les environnements Cloud Composer sont mis à jour, la majorité des problèmes se produisent pour les raisons suivantes :

  • Problèmes d'autorisation de compte de service.
  • Problèmes de dépendance PyPI
  • Taille de la base de données Airflow

Autorisations insuffisantes pour mettre à jour ou mettre à niveau un environnement

Si Cloud Composer ne peut pas mettre à jour ou mettre à niveau un environnement en raison de autorisations insuffisantes, le message d'erreur suivant s'affiche:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Solution: Attribuez des rôles à votre compte et au compte de service. de votre environnement, comme décrit dans la section Contrôle des accès.

Le compte de service de l'environnement ne dispose pas des autorisations nécessaires

Lorsque vous créez un environnement Cloud Composer, vous spécifiez un service qui exécute les nœuds du cluster GKE de l'environnement. Si cette ne dispose pas des autorisations suffisantes pour effectuer l'opération demandée, Cloud Composer génère une erreur:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Solution: Attribuez des rôles à votre compte et au compte de service. de votre environnement, comme décrit dans la section Contrôle des accès.

La taille de la base de données Airflow est trop importante pour effectuer l'opération

Une opération de mise à niveau Cloud Composer peut échouer, car la taille la base de données Airflow est trop volumineuse pour que les opérations de mise à niveau aboutissent.

Si la taille de la base de données Airflow est supérieure à 16 Go, Cloud Composer génère l'erreur suivante :

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Solution : effectuez le nettoyage de la base de données Airflow, comme décrit dans la section Maintenance de la base de données Airflow.

Échec de la mise à niveau vers une nouvelle version de Cloud Composer en raison de conflits de packages PyPI

Lorsque vous mettez à niveau un environnement package PyPI personnalisé, vous pouvez rencontrer liées à des conflits de packages PyPI. Cela peut se produire lorsque le nouveau L'image Cloud Composer contient des versions plus récentes de packages préinstallés des conflits de dépendances avec les packages PyPI que vous avez installés environnement.

Solution :

  • Pour obtenir des informations détaillées sur les conflits de packages, exécutez une vérification des mises à niveau.
  • Assouplissez les contraintes de version pour les packages PyPI personnalisés installés. Par exemple : au lieu de spécifier une version en tant que ==1.0.1, indiquez >=1.0.1.
  • Pour en savoir plus sur la modification des exigences de version pour résoudre dépendances en conflit, consultez documentation pip.

Il n'est pas possible de mettre à niveau un environnement vers une version encore compatible

Les environnements Cloud Composer ne peuvent être mis à niveau plusieurs versions (dernières et précédentes).

Limites de version pour la création d'environnements et la mise à niveau sont différents. Version de Cloud Composer choisie lors de la création d'un environnement peut ne pas être disponible lors de la mise à niveau de l'infrastructure.

Vous pouvez effectuer l'opération de mise à niveau à l'aide de Google Cloud CLI, de l'API ou Terraform. Dans la console Google Cloud, seules les dernières versions sont disponibles comme options de mise à niveau.

Le manque de connectivité au DNS peut entraîner des problèmes lors de l'exécution des mises à niveau

De tels problèmes de connectivité peuvent générer des entrées de journal comme celles-ci:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Cela signifie généralement qu'il n'y a pas de route vers le DNS. Assurez-vous donc que metadata.google.internal Le nom DNS peut être résolu en adresse IP depuis les réseaux des clusters, des pods et des services. Vérifier si l'accès privé à Google est activé dans le VPC (dans le projet hôte ou de service) où votre environnement est créé.

Plus d'informations :

Le processeur du déclencheur dépasse la limite de 1 vCPU

Cloud Composer 2 dans les versions 2.4.4 et ultérieures introduit une autre stratégie d'allocation des ressources de déclencheur pour améliorer le scaling des performances. Si vous rencontrez une erreur liée au processeur du déclencheur lors de l'exécution d'un environnement cela signifie que vos déclencheurs actuels configuré pour utiliser plus de 1 vCPU par déclencheur.

Solution :

Inspecter les avertissements de migration ayant échoué

Lors de la mise à niveau d'Airflow vers une version ultérieure, il arrive que de nouvelles contraintes soient à la base de données Airflow. Si ces contraintes ne peuvent être appliquées, Airflow crée des tables pour stocker les lignes pour lesquelles les contraintes n'ont pas pu être définies être appliqué. L'interface utilisateur d'Airflow affiche un message d'avertissement jusqu'à ce que les tables de données déplacées sont renommées ou supprimées.

Solution :

Vous pouvez utiliser les deux DAG suivants pour inspecter les données déplacées et renommer le tableaux.

Le DAG list_moved_tables_after_upgrade_dag répertorie les lignes qui ont été déplacées tous les tableaux où des contraintes n'ont pas pu être appliquées. Inspecter les données et décider si vous souhaitez le conserver. Pour la conserver, vous devez corriger manuellement les données dans dans la base de données Airflow. Par exemple, en ajoutant les lignes avec les bonnes données.

Si vous n'avez pas besoin de ces données ou si vous les avez déjà corrigées, vous pouvez exécuter la rename_moved_tables_after_upgrade_dag DAG. Ce DAG renomme les tables déplacées. Les tableaux et leurs données ne sont pas supprimés. Vous pouvez donc consulter les données ultérieurement.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Étape suivante