Fehlerbehebung bei Umgebungsupdates und -upgrades

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite finden Sie Informationen zur Fehlerbehebung bei Problemen, die beim Aktualisieren oder Upgraden von Cloud Composer-Umgebungen auftreten können.

Informationen zur Fehlerbehebung beim Erstellen von Umgebungen finden Sie unter Fehlerbehebung bei der Erstellung von Umgebungen.

Wenn Cloud Composer-Umgebungen aktualisiert werden, treten die meisten Probleme aus den folgenden Gründen auf:

  • Probleme mit Dienstkontoberechtigungen
  • Probleme mit der PyPI-Abhängigkeit
  • Größe der Airflow-Datenbank

Unzureichende Berechtigungen zum Aktualisieren oder Upgraden einer Umgebung

Wenn Cloud Composer eine Umgebung aus folgendem Grund nicht aktualisieren kann: unzureichenden Berechtigungen erhalten Sie folgende Fehlermeldung:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Lösung: Weisen Sie Ihrem Konto und dem Dienstkonto Ihrer Umgebung Rollen zu, wie unter Zugriffssteuerung beschrieben.

Das Dienstkonto der Umgebung hat nicht die erforderlichen Berechtigungen

Beim Erstellen einer Cloud Composer-Umgebung geben Sie ein Dienstkonto an, das die GKE-Clusterknoten der Umgebung ausführt. Wenn dieses Dienstkonto nicht genügend Berechtigungen für den angeforderten Vorgang hat, gibt Cloud Composer einen Fehler aus:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Lösung: Weisen Sie Ihrem Konto und dem Dienstkonto Ihrer Umgebung Rollen zu, wie unter Zugriffssteuerung beschrieben.

Die Airflow-Datenbank ist zu groß für den Vorgang durchzuführen

Ein Cloud Composer-Upgradevorgang ist möglicherweise nicht erfolgreich, da die Größe der Die Airflow-Datenbank ist zu groß, um Upgradevorgänge ausführen zu können.

Wenn die Größe der Airflow-Datenbank mehr als 16 GB beträgt, gibt Cloud Composer den folgenden Fehler aus:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Lösung: Führen Sie eine Airflow-Datenbankbereinigung durch, wie unter Airflow-Datenbankwartung beschrieben.

Ein Upgrade auf eine neue Cloud Composer-Version schlägt aufgrund von PyPI-Paketkonflikten fehl

Wenn Sie eine Umgebung mit installierten benutzerdefinierten PyPI-Paketen aktualisieren, können Fehler im Zusammenhang mit PyPI-Paketkonflikten auftreten. Dies kann daran liegen, dass das neue Cloud Composer-Image neuere Versionen vorinstallierter Pakete enthält, die Abhängigkeitskonflikte mit PyPI-Paketen verursachen, die Sie in Ihrer Umgebung installiert haben.

Lösung:

  • Um detaillierte Informationen zu Paketkonflikten zu erhalten, führen Sie einen Upgradeprüfung
  • Lockern Sie Versionseinschränkungen für installierte benutzerdefinierte PyPI-Pakete. Geben Sie für eine Version statt ==1.0.1 die Option >=1.0.1 ein.
  • Weitere Informationen zum Ändern von Versionsanforderungen, um das Problem zu beheben widersprüchlichen Abhängigkeiten finden Sie unter Dokumentation zu pip

Es ist nicht möglich, eine Umgebung auf eine Version zu aktualisieren, die noch unterstützt wird.

Cloud Composer-Umgebungen können nur auf mehrere aktuelle und frühere Versionen aktualisiert werden.

Die Versionseinschränkungen für das Erstellen neuer Umgebungen und das Upgrade vorhandener Umgebungen Umgebungen unterscheiden. Die Cloud Composer-Version, die Sie beim Erstellen einer neuen Umgebung auswählen, ist beim Aktualisieren vorhandener Umgebungen möglicherweise nicht verfügbar.

Sie können den Upgradevorgang mit der Google Cloud CLI, API oder Terraform In der Google Cloud Console sind nur die neuesten Versionen als Upgrade-Optionen verfügbar.

Eine fehlende Verbindung zum DNS kann Probleme bei Upgrades oder Updates verursachen.

Solche Verbindungsprobleme können zu folgenden Logeinträgen führen:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Normalerweise bedeutet das, dass keine Route zum DNS vorhanden ist. Achten Sie daher darauf, dass metadata.google.internal Der DNS-Name kann aus Cluster-, Pods- und Dienstnetzwerken in IP-Adressen aufgelöst werden. Prüfen, ob in der VPC (im Host- oder Dienstprojekt) der privater Google-Zugriff aktiviert ist wo Ihre Umgebung erstellt wird.

Weitere Informationen:

Die Trigger-CPU überschreitet das Limit von 1 vCPU

In Cloud Composer 2 Version 2.4.4 und höher wird eine andere Trigger-Ressourcenzuweisungsstrategie eingeführt, um die Leistungsskalierung zu verbessern. Wenn beim Ausführen einer Umgebung ein Fehler im Zusammenhang mit der Trigger-CPU auftritt aktualisieren, bedeutet dies, dass Ihre aktuellen Trigger konfiguriert, mehr als 1 vCPU pro Triggerer zu verwenden.

Lösung:

Warnungen bei fehlgeschlagenen Migrationen

Wenn Sie Airflow auf eine neuere Version aktualisieren, werden manchmal neue Einschränkungen auf die Airflow-Datenbank angewendet. Wenn diese Einschränkungen nicht angewendet werden können, Airflow erstellt neue Tabellen zum Speichern der Zeilen, für die die Einschränkungen nicht angewendet werden. In der Airflow-UI wird eine Warnmeldung angezeigt, bis die verschobenen Datentabellen übertragen wurden. umbenannt oder verworfen wurden.

Lösung:

Mit den folgenden beiden DAGs können Sie die verschobenen Daten prüfen und die Tabellen umbenennen.

Im list_moved_tables_after_upgrade_dag-DAG sind Zeilen aufgeführt, die aus jeder Tabelle verschoben wurden, in der Einschränkungen nicht angewendet werden konnten. Daten prüfen und entscheiden ob Sie sie behalten möchten. Wenn Sie sie behalten möchten, müssen Sie die Daten in der Airflow-Datenbank manuell korrigieren. Beispielsweise, indem Sie die Zeilen mit den richtigen Daten wieder hinzufügen.

Wenn Sie die Daten nicht benötigen oder das Problem bereits behoben haben, können Sie den DAG für rename_moved_tables_after_upgrade_dag ausführen. Dieser DAG benennt die verschobenen Tabellen um. Die Tabellen und ihre Daten werden nicht gelöscht, sodass Sie sie später noch einmal aufrufen können.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Nächste Schritte