Solución de problemas de actualizaciones y mejoras de entornos

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se proporciona información para solucionar problemas relacionados con problemas que puedes encontrar cuando actualizas entornos de Cloud Composer.

Para obtener información sobre la solución de problemas relacionados con la creación de entornos, consulta Solución de problemas de creación de entornos.

Cuando se actualizan los entornos de Cloud Composer, la mayoría de los problemas ocurren debido a los siguientes motivos:

  • Problemas de permisos de la cuenta de servicio
  • Problemas de dependencia de PyPI
  • Tamaño de la base de datos de Airflow

No tienes permisos suficientes para actualizar un entorno

Si Cloud Composer no puede actualizar ni actualizar un entorno debido a si no tienes los permisos necesarios, se mostrará el siguiente mensaje de error:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Solución: Asigna roles a tu cuenta y a la cuenta de servicio de tu entorno como se describe en Control de acceso.

La cuenta de servicio del entorno no tiene permisos suficientes.

Cuando se crea un entorno de Cloud Composer, se especifica un servicio que ejecuta los nodos del clúster de GKE del entorno. Si esta La cuenta de servicio no tiene permisos suficientes para la operación solicitada. Cloud Composer genera el siguiente error:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Solución: Asigna funciones a tu cuenta y a la cuenta de servicio de tu entorno, como se describe en Control de acceso.

El tamaño de la base de datos de Airflow es demasiado grande para realizar la operación.

Una operación de actualización de Cloud Composer puede fallar porque el tamaño de La base de datos de Airflow es demasiado grande para que las operaciones de actualización se realicen correctamente.

Si el tamaño de la base de datos de Airflow supera los 16 GB, Cloud Composer genera el siguiente error:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Solución: Realiza la limpieza de la base de datos de Airflow, como se describe en Mantenimiento de la base de datos de Airflow.

Una actualización a una versión nueva de Cloud Composer falla debido a conflictos de paquetes de PyPI.

Cuando actualizas un entorno con paquetes de PyPI personalizados instalados, es posible que encuentres errores relacionados con conflictos de paquetes de PyPI. Esto puede ocurrir porque el nuevo La imagen de Cloud Composer contiene versiones más recientes de paquetes preinstalados causar conflictos de dependencias con los paquetes de PyPI que instalaste en tu en un entorno de nube.

Solución:

  • Para obtener información detallada sobre los conflictos de paquetes, ejecuta un verificación de actualizaciones.
  • Disminuye las restricciones de versiones para los paquetes personalizados de PyPI instalados. Por ejemplo: en lugar de especificar una versión como ==1.0.1, especifícala como >=1.0.1.
  • Obtén más información sobre el cambio de los requisitos de la versión para resolver dependencias conflictivas, consulta documentación de pip.

No es posible actualizar un entorno a una versión que aún sea compatible.

Los entornos de Cloud Composer solo se pueden actualizar a varias versiones anteriores y más recientes.

Las limitaciones de la versión para crear entornos nuevos y actualizar los existentes entornos son diferentes. La versión de Cloud Composer que elijas Cuando se crea un entorno nuevo, es posible que no esté disponible cuando se actualiza entornos de prueba.

Puedes realizar la operación de actualización con Google Cloud CLI, la API o en Terraform. En la consola de Google Cloud, solo las versiones más recientes están disponibles como opciones de actualización.

La falta de conectividad con el DNS puede causar problemas durante las actualizaciones.

Estos problemas de conectividad pueden generar entradas de registro como las siguientes:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Por lo general, significa que no hay una ruta hacia el DNS, así que asegúrate de que el nombre de DNS metadata.google.internal se pueda resolver en una dirección IP desde las redes de clústeres, pods y servicios. Verifica si tienes el Acceso privado a Google activado en la VPC (en el proyecto host o de servicio) en la que se creó tu entorno.

Más información:

La CPU del activador supera el límite de 1 CPU virtual

Cloud Composer 2 en las versiones 2.4.4 y posteriores Se presenta una estrategia diferente de asignación de recursos del activador para mejorar el escalamiento del rendimiento. Si encuentras un error relacionado con la CPU del activador mientras realizas un entorno actualiza, significa que tus activadores actuales para usar más de 1 CPU virtual por activador.

Solución:

Inspecciona las advertencias de migración fallida

Cuando se actualiza Airflow a una versión posterior, a veces se aplican nuevas restricciones a la base de datos de Airflow. Si no se pueden aplicar estas restricciones, Airflow crea tablas nuevas para almacenar las filas para las que no se pudieron aplicar las restricciones. La IU de Airflow muestra un mensaje de advertencia hasta que se cambie el nombre de las tablas de datos que se movieron o se descarten.

Solución:

Puedes usar los dos DAG que se muestran a continuación para inspeccionar los datos que se movieron y cambiarle el nombre en diferentes tipos de tablas particionadas.

El DAG list_moved_tables_after_upgrade_dag enumera las filas que se movieron de todas las tablas en las que no se pudieron aplicar restricciones. Inspecciona los datos y decide si quieres conservarlos. Para conservarlos, debes corregir manualmente los datos en la base de datos de Airflow. Por ejemplo, puedes volver a agregar las filas con los datos correctos.

Si no necesitas los datos o ya los corregiste, puedes ejecutar el DAG de rename_moved_tables_after_upgrade_dag. Este DAG cambia el nombre de las tablas movidas. Las tablas y sus datos no se borran, por lo que puedes revisarlos más adelante.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

¿Qué sigue?