Solución de problemas de actualizaciones y mejoras de entornos

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se proporciona información para solucionar problemas relacionados con problemas que puedes encontrar cuando actualizas entornos de Cloud Composer.

Para obtener información sobre la solución de problemas relacionados con la creación de entornos, consulta Solución de problemas de creación de entornos.

Cuando se actualizan los entornos de Cloud Composer, la mayoría de los problemas ocurren debido a los siguientes motivos:

  • Problemas de permisos de la cuenta de servicio
  • Problemas de dependencia de PyPI
  • Tamaño de la base de datos de Airflow

No tienes permisos suficientes para actualizar un entorno

Si Cloud Composer no puede actualizar un entorno debido a permisos insuficientes, se mostrará el siguiente mensaje de error:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Solución: Asigna roles a tu cuenta y a la cuenta de servicio de tu entorno como se describe en Control de acceso.

La cuenta de servicio del entorno no tiene permisos suficientes.

Cuando creas un entorno de Cloud Composer, especificas una cuenta de servicio que ejecuta los nodos del clúster de GKE del entorno. Si esta La cuenta de servicio no tiene permisos suficientes para la operación solicitada. Cloud Composer genera el siguiente error:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Solución: Asigna roles a tu cuenta y a la cuenta de servicio de tu entorno como se describe en Control de acceso.

El tamaño de la base de datos de Airflow es demasiado grande para realizar la operación.

Es posible que una operación de actualización de Cloud Composer no se realice de forma correcta porque el tamaño de la base de datos de Airflow es demasiado grande para que las operaciones de actualización se realicen con éxito.

Si el tamaño de la base de datos de Airflow supera los 16 GB, Cloud Composer genera el siguiente error:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Solución: Realiza la limpieza de la base de datos de Airflow, como se describe en Mantenimiento de la base de datos de Airflow.

Una actualización a una versión nueva de Cloud Composer falla debido a conflictos de paquetes de PyPI.

Cuando actualizas un entorno con paquetes de PyPI personalizados instalados, es posible que encuentres errores relacionados con conflictos de paquetes de PyPI. Esto puede ocurrir porque el nuevo La imagen de Cloud Composer contiene versiones más recientes de paquetes preinstalados causar conflictos de dependencias con los paquetes de PyPI que instalaste en tu en un entorno de nube.

Solución:

  • Para obtener información detallada sobre los conflictos de paquetes, ejecuta un verificación de actualizaciones.
  • Disminuye las restricciones de versiones para los paquetes personalizados de PyPI instalados. Por ejemplo, en lugar de especificar una versión como ==1.0.1, especifícala como >=1.0.1.
  • Para obtener más información sobre cómo cambiar los requisitos de versión para resolver dependencias en conflicto, consulta la documentación de pip.

No es posible actualizar un entorno a una versión que todavía sea compatible

Los entornos de Cloud Composer solo pueden actualizarse a varias versiones anteriores y más recientes.

Las limitaciones de versión para crear entornos nuevos y actualizar entornos existentes son diferentes. La versión de Cloud Composer que elijas Cuando se crea un entorno nuevo, es posible que no esté disponible cuando se actualiza entornos de prueba.

Puedes realizar la operación de actualización con Google Cloud CLI, la API o en Terraform. En la consola de Google Cloud, solo las versiones más recientes están disponibles como opciones de actualización.

La falta de conectividad con el DNS puede causar problemas durante las actualizaciones.

Estos problemas de conectividad pueden generar entradas de registro como las siguientes:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Por lo general, significa que no hay una ruta hacia el DNS, así que asegúrate de que metadata.google.internal El nombre de DNS se puede resolver como dirección IP desde el clúster, los Pods y las redes de servicios. Verifica si tienes el Acceso privado a Google activado en la VPC (en el proyecto host o de servicio) en la que se crea tu entorno.

Más información:

La CPU del activador supera el límite de 1 vCPU.

Cloud Composer 2 en las versiones 2.4.4 y posteriores Se presenta una estrategia diferente de asignación de recursos del activador para mejorar el escalamiento del rendimiento. Si encuentras un error relacionado con la CPU del activador cuando realizas una actualización de entorno, significa que tus activadores actuales están configurados para usar más de 1 vCPU por activador.

Solución:

Inspecciona las advertencias de migración fallida

Cuando se actualiza Airflow a una versión posterior, a veces se aplican nuevas restricciones a la base de datos de Airflow. Si no se pueden aplicar estas restricciones, Airflow crea tablas nuevas para almacenar las filas para las que no se pudieron aplicar las restricciones. La IU de Airflow muestra un mensaje de advertencia hasta que se muevan las tablas de datos se les cambia el nombre o se descartan.

Solución:

Puedes usar los siguientes dos DAG para inspeccionar los datos que se movieron y cambiar el nombre de las tablas.

El DAG list_moved_tables_after_upgrade_dag enumera las filas que se movieron de todas las tablas en las que no se pudieron aplicar restricciones. Inspecciona los datos y decide si quieres conservarlos. Para conservarlos, debes corregir manualmente los datos en la base de datos de Airflow. Por ejemplo, puedes volver a agregar las filas con los datos correctos.

Si no necesitas los datos o ya los corregiste, puedes ejecutar el DAG de rename_moved_tables_after_upgrade_dag. Este DAG cambia el nombre de las tablas que se movieron. Las tablas y sus datos no se borran, por lo que puedes revisarlos más adelante.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

¿Qué sigue?