Solución de problemas de actualizaciones y mejoras de entornos

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se proporciona información para solucionar problemas relacionados con problemas que puedes encontrar cuando actualizas entornos de Cloud Composer.

Para obtener información sobre la solución de problemas relacionados con la creación de entornos, consulta Solución de problemas de creación de entornos.

Cuando se actualizan los entornos de Cloud Composer, la mayoría de los problemas ocurren debido a los siguientes motivos:

  • Problemas de permisos de la cuenta de servicio
  • Problemas de dependencia de PyPI
  • Tamaño de la base de datos de Airflow

No tienes permisos suficientes para actualizar un entorno

Si Cloud Composer no puede actualizar ni actualizar un entorno debido a si no tienes los permisos necesarios, se mostrará el siguiente mensaje de error:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Solución: Asigna funciones a tu cuenta y a la cuenta de servicio de tu entorno, como se describe en Control de acceso.

La cuenta de servicio del entorno no tiene permisos suficientes.

Cuando se crea un entorno de Cloud Composer, se especifica un servicio que ejecuta los nodos del clúster de GKE del entorno. Si esta La cuenta de servicio no tiene permisos suficientes para la operación solicitada. Cloud Composer genera el siguiente error:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Solución: Asigna funciones a tu cuenta y a la cuenta de servicio de tu entorno, como se describe en Control de acceso.

El tamaño de la base de datos de Airflow es demasiado grande para realizar la operación.

Una operación de actualización de Cloud Composer puede fallar porque el tamaño de La base de datos de Airflow es demasiado grande para que las operaciones de actualización se realicen correctamente.

Si el tamaño de la base de datos de Airflow supera los 16 GB, Cloud Composer genera el siguiente error:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Solución: Realiza la limpieza de la base de datos de Airflow, como se describe en Mantenimiento de la base de datos de Airflow.

Una actualización a una versión nueva de Cloud Composer falla debido a conflictos de paquetes de PyPI.

Cuando actualizas un entorno con paquetes de PyPI personalizados instalados, es posible que encuentres errores relacionados con conflictos de paquetes de PyPI. Esto puede ocurrir porque el nuevo La imagen de Cloud Composer contiene versiones más recientes de paquetes preinstalados causar conflictos de dependencias con los paquetes de PyPI que instalaste en tu en un entorno de nube.

Solución:

  • Para obtener información detallada sobre los conflictos de paquetes, ejecuta un verificación de actualizaciones.
  • Disminuye las restricciones de versiones para los paquetes personalizados de PyPI instalados. Por ejemplo: en lugar de especificar una versión como ==1.0.1, especifícala como >=1.0.1.
  • Para obtener más información sobre el cambio de los requisitos de la versión para resolver dependencias conflictivas, consulta documentación de pip.

No es posible actualizar un entorno a una versión que todavía es compatible

Los entornos de Cloud Composer solo se pueden actualizar a varias versiones anteriores y más recientes.

Las limitaciones de la versión para crear entornos nuevos y actualizar los existentes entornos son diferentes. La versión de Cloud Composer que elijas Cuando se crea un entorno nuevo, es posible que no esté disponible cuando se actualiza entornos de prueba.

Puedes realizar la operación de actualización con Google Cloud CLI, la API o en Terraform. En la consola de Google Cloud, solo están disponibles las versiones más recientes como opciones de actualización.

La falta de conectividad al DNS puede causar problemas al realizar actualizaciones o actualizaciones

Estos problemas de conectividad pueden generar entradas de registro como las siguientes:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Por lo general, significa que no hay una ruta hacia el DNS, así que asegúrate de que metadata.google.internal El nombre de DNS se puede resolver como dirección IP desde el clúster, los Pods y las redes de servicios. Verifica si tienes el Acceso privado a Google activado en la VPC (en el proyecto host o de servicio) en la que se crea tu entorno.

Más información:

La CPU del activador supera el límite de 1 CPU virtual

Cloud Composer 2 en las versiones 2.4.4 y posteriores Se presenta una estrategia diferente de asignación de recursos del activador para mejorar el escalamiento del rendimiento. Si encuentras un error relacionado con la CPU del activador mientras realizas un entorno actualiza, significa que tus activadores actuales para usar más de 1 CPU virtual por activador.

Solución:

Inspeccione las advertencias de migración fallidas

Cuando se actualiza Airflow a una versión posterior, a veces se aplican restricciones nuevas que se aplican a la base de datos de Airflow. Si estas restricciones no se pueden aplicar, Airflow crea tablas nuevas para almacenar las filas en las que las restricciones no pudieron si se aplican. La IU de Airflow muestra un mensaje de advertencia hasta que se muevan las tablas de datos se les cambia el nombre o se descartan.

Solución:

Puedes usar los dos DAG que se muestran a continuación para inspeccionar los datos que se movieron y cambiarle el nombre en diferentes tipos de tablas particionadas.

El DAG list_moved_tables_after_upgrade_dag enumera las filas que se movieron de en todas las tablas en las que no se pudieron aplicar las restricciones. Inspecciona los datos y decide si quieres conservarlo. Para conservarlos, debes corregir manualmente los datos en la base de datos de Airflow. Por ejemplo, puedes volver a agregar las filas con los datos correctos.

Si no los necesitas o si ya los corregiste, puedes ejecutar el DAG rename_moved_tables_after_upgrade_dag. Este DAG cambia el nombre de las tablas que se movieron. Las tablas y sus datos no se borran, por lo que puedes revisar los datos de un más adelante.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

¿Qué sigue?