Solución de problemas de actualizaciones y mejoras de entornos

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se proporciona información para solucionar problemas relacionados con problemas que puedes encontrar cuando actualizas entornos de Cloud Composer.

Para obtener información sobre la solución de problemas relacionados con la creación de entornos, consulta Solución de problemas de creación de entornos.

Cuando se actualizan los entornos de Cloud Composer, la mayoría de los problemas ocurren debido a los siguientes motivos:

  • Problemas de permisos de la cuenta de servicio
  • Problemas de dependencia de PyPI
  • Tamaño de la base de datos de Airflow

No tienes permisos suficientes para actualizar un entorno

Si Cloud Composer no puede actualizar ni actualizar un entorno debido a si no tienes los permisos necesarios, se mostrará el siguiente mensaje de error:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Solución: Asigna funciones a tu cuenta y a la cuenta de servicio de tu entorno, como se describe en Control de acceso.

La cuenta de servicio del entorno no tiene permisos suficientes.

Cuando creas un entorno de Cloud Composer, especificas una cuenta de servicio que ejecuta los nodos del clúster de GKE del entorno. Si esta La cuenta de servicio no tiene permisos suficientes para la operación solicitada. Cloud Composer genera el siguiente error:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Solución: Asigna funciones a tu cuenta y a la cuenta de servicio de tu entorno, como se describe en Control de acceso.

El tamaño de la base de datos de Airflow es demasiado grande para realizar la operación.

Es posible que una operación de actualización de Cloud Composer no se realice de forma correcta porque el tamaño de la base de datos de Airflow es demasiado grande para que las operaciones de actualización se realicen con éxito.

Si el tamaño de la base de datos de Airflow supera los 16 GB, Cloud Composer genera el siguiente error:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Solución: Realiza la limpieza de la base de datos de Airflow, como se describe en Mantenimiento de la base de datos de Airflow.

Una actualización a una versión nueva de Cloud Composer falla debido a conflictos de paquetes de PyPI.

Cuando actualizas un entorno con paquetes PyPI personalizados instalados, es posible que encuentres errores relacionados con conflictos de paquetes de PyPI. Esto puede ocurrir porque el nuevo La imagen de Cloud Composer contiene versiones más recientes de paquetes preinstalados causar conflictos de dependencias con los paquetes de PyPI que instalaste en tu en un entorno de nube.

Solución:

  • Para obtener información detallada sobre los conflictos de paquetes, ejecuta un verificación de actualizaciones.
  • Disminuye las restricciones de versiones para los paquetes personalizados de PyPI instalados. Por ejemplo: en lugar de especificar una versión como ==1.0.1, especifícala como >=1.0.1.
  • Para obtener más información sobre cómo cambiar los requisitos de versión para resolver dependencias en conflicto, consulta la documentación de pip.

No es posible actualizar un entorno a una versión que aún sea compatible.

Los entornos de Cloud Composer solo se pueden actualizar a varias versiones anteriores y más recientes.

Las limitaciones de la versión para crear entornos nuevos y actualizar los existentes entornos son diferentes. Es posible que la versión de Cloud Composer que elijas cuando crees un entorno nuevo no esté disponible cuando actualices entornos existentes.

Puedes realizar la operación de actualización con Google Cloud CLI, la API o Terraform. En la consola de Google Cloud, solo están disponibles las versiones más recientes como opciones de actualización.

La falta de conectividad al DNS puede causar problemas al realizar actualizaciones o actualizaciones

Estos problemas de conectividad pueden generar entradas de registro como las siguientes:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Por lo general, significa que no hay una ruta hacia el DNS, así que asegúrate de que el nombre de DNS metadata.google.internal se pueda resolver en una dirección IP desde las redes de clústeres, pods y servicios. Verifica si tienes el Acceso privado a Google activado en la VPC (en el proyecto host o de servicio) en la que se creó tu entorno.

Más información:

La CPU del activador supera el límite de 1 vCPU.

Cloud Composer 2 en las versiones 2.4.4 y posteriores presenta una estrategia de asignación de recursos del activador diferente para mejorar la escalabilidad del rendimiento. Si encuentras un error relacionado con la CPU del activador mientras realizas un entorno actualiza, significa que tus activadores actuales para usar más de 1 CPU virtual por activador.

Solución:

Inspecciona las advertencias de migración fallida

Cuando se actualiza Airflow a una versión posterior, a veces se aplican nuevas restricciones a la base de datos de Airflow. Si no se pueden aplicar estas restricciones, Airflow crea tablas nuevas para almacenar las filas para las que no se pudieron aplicar las restricciones. La IU de Airflow muestra un mensaje de advertencia hasta que se cambie el nombre de las tablas de datos que se movieron o se descarten.

Solución:

Puedes usar los siguientes dos DAG para inspeccionar los datos que se movieron y cambiar el nombre de las tablas.

El DAG list_moved_tables_after_upgrade_dag enumera las filas que se movieron de todas las tablas en las que no se pudieron aplicar restricciones. Inspecciona los datos y decide si quieres conservarlo. Para conservarlo, debes corregir los datos de forma manual en la base de datos de Airflow. Por ejemplo, puedes volver a agregar las filas con los datos correctos.

Si no los necesitas o si ya los corregiste, puedes ejecutar el DAG rename_moved_tables_after_upgrade_dag. Este DAG cambia el nombre de las tablas que se movieron. Las tablas y sus datos no se borran, por lo que puedes revisarlos más adelante.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

¿Qué sigue?