Solucionar problemas de actualizaciones del entorno

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página se proporciona información para solucionar problemas que pueden surgir al actualizar o cambiar a una versión superior los entornos de Cloud Composer.

Para obtener información sobre cómo solucionar problemas relacionados con la creación de entornos, consulta el artículo Solucionar problemas de creación de entornos.

Cuando se actualizan los entornos de Cloud Composer, la mayoría de los problemas se deben a los siguientes motivos:

  • Problemas con los permisos de la cuenta de servicio
  • Problemas de dependencias de PyPI
  • Tamaño de la base de datos de Airflow

Permisos insuficientes para actualizar o mejorar un entorno

Si Cloud Composer no puede actualizar o mejorar un entorno debido a que no tiene suficientes permisos, muestra el siguiente mensaje de error:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Solución: Asigna roles tanto a tu cuenta como a la cuenta de servicio de tu entorno, tal como se describe en Control de acceso.

La cuenta de servicio del entorno no tiene permisos suficientes

Cuando creas un entorno de Cloud Composer, especificas una cuenta de servicio que realiza la mayoría de las operaciones del entorno. Si esta cuenta de servicio no tiene suficientes permisos para la operación solicitada, Cloud Composer mostrará un error:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Solución: Asigna roles a tu cuenta de Google y a la cuenta de servicio de tu entorno tal como se describe en Control de acceso.

El tamaño de la base de datos de Airflow es demasiado grande para realizar la operación

Es posible que no se pueda realizar una operación de actualización porque el tamaño de la base de datos de Airflow sea demasiado grande.

Si el tamaño de la base de datos de Airflow es superior a 20 GB, Cloud Composer genera el siguiente error:

Airflow database uses more than 20 GB. Please clean the database before upgrading.

Solución: limpia la base de datos de Airflow tal como se describe en Limpiar la base de datos de Airflow.

No se puede actualizar a una nueva versión de Cloud Composer debido a conflictos de paquetes de PyPI

Cuando actualizas un entorno con paquetes PyPI personalizados instalados, es posible que se produzcan errores relacionados con conflictos de paquetes PyPI. Esto puede ocurrir porque la nueva compilación de Airflow contiene versiones posteriores de los paquetes preinstalados. Esto puede provocar conflictos de dependencias con los paquetes de PyPI que hayas instalado en tu entorno.

Solución:

  • Para obtener información detallada sobre los conflictos de paquetes, ejecuta una comprobación de actualización.
  • Relaja las restricciones de versión de los paquetes PyPI personalizados instalados. Por ejemplo, en lugar de especificar una versión como ==1.0.1, especifícala como >=1.0.1.
  • Para obtener más información sobre cómo cambiar los requisitos de versión para resolver dependencias conflictivas, consulta la documentación de pip.

Inspeccionar las advertencias de migración fallidas

Al actualizar Airflow a una versión posterior, a veces se aplican nuevas restricciones a la base de datos de Airflow. Si no se pueden aplicar estas restricciones, Airflow crea tablas nuevas para almacenar las filas en las que no se han podido aplicar. La interfaz de usuario de Airflow muestra un mensaje de advertencia hasta que se cambie el nombre o se eliminen las tablas de datos movidas.

Solución:

Puedes usar los dos DAGs siguientes para inspeccionar los datos movidos y cambiar el nombre de las tablas.

El list_moved_tables_after_upgrade_dag DAG muestra las filas que se han movido de cada tabla en la que no se han podido aplicar restricciones. Inspecciona los datos y decide si quieres conservarlos. Para conservarlo, debes corregir manualmente los datos en la base de datos de Airflow. Por ejemplo, añadiendo de nuevo las filas con los datos correctos.

Si no necesitas los datos o ya los has corregido, puedes ejecutar el rename_moved_tables_after_upgrade_dag DAG. Este DAG cambia el nombre de las tablas movidas. Las tablas y sus datos no se eliminan, por lo que puede revisar los datos más adelante.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Siguientes pasos