Fehlerbehebung bei Umgebungsupdates und ‑upgrades

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite finden Sie Informationen zur Fehlerbehebung bei Problemen, die beim Aktualisieren oder Upgraden von Cloud Composer-Umgebungen auftreten können.

Informationen zur Fehlerbehebung beim Erstellen von Umgebungen finden Sie unter Fehlerbehebung bei der Erstellung von Umgebungen.

Wenn Cloud Composer-Umgebungen aktualisiert werden, treten die meisten Probleme aus den folgenden Gründen auf:

  • Probleme mit Dienstkontoberechtigungen
  • Probleme mit der PyPI-Abhängigkeit
  • Größe der Airflow-Datenbank

Unzureichende Berechtigungen zum Aktualisieren oder Upgraden einer Umgebung

Wenn Cloud Composer eine Umgebung aufgrund unzureichender Berechtigungen nicht aktualisieren oder upgraden kann, wird die folgende Fehlermeldung ausgegeben:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Lösung: Weisen Sie Ihrem Konto und dem Dienstkonto Ihrer Umgebung Rollen zu, wie unter Zugriffssteuerung beschrieben.

Das Dienstkonto der Umgebung hat nicht die erforderlichen Berechtigungen

Beim Erstellen einer Cloud Composer-Umgebung geben Sie ein Dienstkonto an, das die GKE-Clusterknoten der Umgebung ausführt. Wenn dieses Dienstkonto nicht die erforderlichen Berechtigungen für den angeforderten Vorgang hat, gibt Cloud Composer einen Fehler aus:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Lösung: Weisen Sie Ihrem Konto und dem Dienstkonto Ihrer Umgebung Rollen zu, wie unter Zugriffssteuerung beschrieben.

Die Airflow-Datenbank ist zu groß für den Vorgang durchzuführen

Ein Cloud Composer-Upgradevorgang ist möglicherweise nicht erfolgreich, da die Airflow-Datenbank zu groß für Upgradevorgänge ist.

Wenn die Größe der Airflow-Datenbank mehr als 16 GB beträgt, gibt Cloud Composer den folgenden Fehler aus:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Lösung: Führen Sie eine Airflow-Datenbankbereinigung durch, wie unter Airflow-Datenbankwartung beschrieben.

Ein Upgrade auf eine neue Cloud Composer-Version schlägt aufgrund von PyPI-Paketkonflikten fehl

Wenn Sie eine Umgebung mit installierten benutzerdefinierten PyPI-Paketen aktualisieren, können Fehler im Zusammenhang mit PyPI-Paketkonflikten auftreten. Dies kann daran liegen, dass das neue Cloud Composer-Image neuere Versionen von vorinstallierten Paketen enthält, die zu Abhängigkeitskonflikten mit den in Ihrer Umgebung installierten PyPI-Paketen führen.

Lösung:

  • Ausführliche Informationen zu Paketkonflikten erhalten Sie, wenn Sie eine Upgradeprüfung ausführen.
  • Lockern Sie Versionseinschränkungen für installierte benutzerdefinierte PyPI-Pakete. Anstatt eine Version als ==1.0.1 anzugeben, geben Sie sie beispielsweise als >=1.0.1 an.
  • Weitere Informationen zum Ändern von Versionsanforderungen, um in Konflikt stehende Abhängigkeiten zu beheben, finden Sie in der Dokumentation zu pip.

Es ist nicht möglich, ein Upgrade einer Umgebung auf eine Version durchzuführen, die noch unterstützt wird

Für Cloud Composer-Umgebungen ist nur ein Upgrade auf mehrere aktuelle und frühere Versionen möglich.

Die Versionseinschränkungen für das Erstellen neuer Umgebungen und für das Upgrade vorhandener Umgebungen unterscheiden sich. Die Cloud Composer-Version, die Sie beim Erstellen einer neuen Umgebung auswählen, ist beim Upgrade vorhandener Umgebungen möglicherweise nicht verfügbar.

Sie können den Upgradevorgang mit der Google Cloud CLI, API oder Terraform ausführen. In der Google Cloud Console sind nur die neuesten Versionen als Upgradeoptionen verfügbar.

Eine mangelnde DNS-Verbindung kann zu Problemen bei Upgrades oder Updates führen

Solche Verbindungsprobleme können zu folgenden Logeinträgen führen:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Dies bedeutet in der Regel, dass keine Route zum DNS vorhanden ist. Achten Sie daher darauf, dass der DNS-Name „metadata.google.internal“ von Cluster-, Pod- und Dienstnetzwerken in IP-Adressen aufgelöst werden kann. Prüfen Sie, ob der privater Google-Zugriff in der VPC (im Host- oder Dienstprojekt), in der Ihre Umgebung erstellt wurde, aktiviert ist.

Weitere Informationen:

Triggerer-CPU überschreitet Limit von 1 vCPU

Cloud Composer 2 in den Versionen 2.4.4 und höher führt eine andere Triggerer-Ressourcenzuweisungsstrategie ein, um die Leistungsskalierung zu verbessern. Wenn beim Ausführen einer Umgebungsaktualisierung ein Fehler in Bezug auf die Trigger-CPU auftritt, bedeutet dies, dass Ihre aktuellen Trigger so konfiguriert sind, dass sie mehr als eine vCPU pro Trigger verwenden.

Lösung:

Warnungen bei fehlgeschlagenen Migrationen

Beim Upgrade von Airflow auf eine neuere Version werden manchmal neue Einschränkungen auf die Airflow-Datenbank angewendet. Wenn diese Einschränkungen nicht angewendet werden können, erstellt Airflow neue Tabellen zum Speichern der Zeilen, für die die Einschränkungen nicht angewendet werden konnten. In der Airflow-UI wird eine Warnmeldung angezeigt, bis die verschobenen Datentabellen umbenannt oder gelöscht werden.

Lösung:

Sie können die folgenden beiden DAGs verwenden, um die verschobenen Daten zu überprüfen und die Tabellen umzubenennen.

Der DAG list_moved_tables_after_upgrade_dag listet Zeilen auf, die aus jeder Tabelle verschoben wurden, in der keine Einschränkungen angewendet werden konnten. Überprüfen Sie die Daten und entscheiden Sie, ob Sie sie behalten möchten. Damit sie beibehalten werden, müssen Sie die Daten in der Airflow-Datenbank manuell korrigieren. Beispielsweise, indem Sie die Zeilen mit den richtigen Daten wieder hinzufügen.

Wenn Sie die Daten nicht benötigen oder bereits korrigiert haben, können Sie den DAG rename_moved_tables_after_upgrade_dag ausführen. Dieser DAG benennt die verschobenen Tabellen um. Die Tabellen und ihre Daten werden nicht gelöscht, sodass Sie die Daten später überprüfen können.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Nächste Schritte