Fehlerbehebung bei Umgebungsupdates und ‑upgrades

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Auf dieser Seite finden Sie Informationen zur Fehlerbehebung bei Problemen, die beim Aktualisieren oder Upgraden von Cloud Composer-Umgebungen auftreten können.

Informationen zur Fehlerbehebung beim Erstellen von Umgebungen finden Sie unter Fehlerbehebung bei der Erstellung von Umgebungen.

Wenn Cloud Composer-Umgebungen aktualisiert werden, treten die meisten Probleme aus den folgenden Gründen auf:

  • Probleme mit Dienstkontoberechtigungen
  • Probleme mit der PyPI-Abhängigkeit
  • Größe der Airflow-Datenbank

Unzureichende Berechtigungen zum Aktualisieren oder Upgraden einer Umgebung

Wenn Cloud Composer eine Umgebung aufgrund unzureichender Berechtigungen nicht aktualisieren oder updaten kann, wird die folgende Fehlermeldung ausgegeben:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Lösung: Weisen Sie Ihrem Konto und dem Dienstkonto Ihrer Umgebung Rollen zu, wie unter Zugriffssteuerung beschrieben.

Das Dienstkonto der Umgebung hat nicht die erforderlichen Berechtigungen

Wenn Sie eine Cloud Composer-Umgebung erstellen, geben Sie ein Dienstkonto an, das die meisten Vorgänge in der Umgebung ausführt. Wenn dieses Dienstkonto nicht genügend Berechtigungen für den angeforderten Vorgang hat, gibt Cloud Composer einen Fehler aus:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Lösung: Weisen Sie Ihrem Google-Konto und dem Dienstkonto Ihrer Umgebung Rollen zu, wie unter Zugriffssteuerung beschrieben.

Die Airflow-Datenbank ist zu groß für den Vorgang durchzuführen

Ein Upgrade-Vorgang ist möglicherweise nicht erfolgreich, da die Größe der Airflow-Datenbank für einen erfolgreichen Upgrade zu groß ist.

Wenn die Größe der Airflow-Datenbank mehr als 20 GB beträgt, gibt Cloud Composer den folgenden Fehler aus:

Airflow database uses more than 20 GB. Please clean the database before upgrading.

Lösung: Führen Sie eine Airflow-Datenbankbereinigung durch, wie unter Airflow-Datenbank bereinigen beschrieben.

Ein Upgrade auf eine neue Cloud Composer-Version schlägt aufgrund von PyPI-Paketkonflikten fehl

Wenn Sie eine Umgebung mit installierten benutzerdefinierten PyPI-Paketen aktualisieren, können Fehler im Zusammenhang mit PyPI-Paketkonflikten auftreten. Dies kann daran liegen, dass der neue Airflow-Build neuere Versionen vorinstallierter Pakete enthält. Dies kann zu Abhängigkeitskonflikten mit PyPI-Paketen führen, die Sie in Ihrer Umgebung installiert haben.

Lösung:

  • Führen Sie eine Upgradeprüfung durch, um detaillierte Informationen zu Paketkonflikten zu erhalten.
  • Lockern Sie Versionseinschränkungen für installierte benutzerdefinierte PyPI-Pakete. Geben Sie für eine Version statt ==1.0.1 die Option >=1.0.1 ein.
  • Weitere Informationen zum Ändern von Versionsanforderungen, um in Konflikt stehende Abhängigkeiten zu beheben, finden Sie in der Dokumentation zu pip.

Warnungen zur fehlgeschlagenen Migration prüfen

Wenn Sie Airflow auf eine neuere Version aktualisieren, werden manchmal neue Einschränkungen auf die Airflow-Datenbank angewendet. Wenn diese Einschränkungen nicht angewendet werden können, erstellt Airflow neue Tabellen, um die Zeilen zu speichern, für die die Einschränkungen nicht angewendet werden konnten. In der Airflow-Benutzeroberfläche wird eine Warnmeldung angezeigt, bis die verschobenen Datentabellen umbenannt oder gelöscht wurden.

Lösung:

Mit den folgenden beiden DAGs können Sie die verschobenen Daten prüfen und die Tabellen umbenennen.

Im list_moved_tables_after_upgrade_dag-DAG sind Zeilen aufgeführt, die aus jeder Tabelle verschoben wurden, in der Einschränkungen nicht angewendet werden konnten. Prüfen Sie die Daten und entscheiden Sie, ob Sie sie behalten möchten. Wenn Sie sie behalten möchten, müssen Sie die Daten in der Airflow-Datenbank manuell korrigieren. Beispielsweise können Sie die Zeilen mit den richtigen Daten wieder hinzufügen.

Wenn Sie die Daten nicht benötigen oder das Problem bereits behoben haben, können Sie den DAG für rename_moved_tables_after_upgrade_dag ausführen. In diesem DAG werden die verschobenen Tabellen umbenannt. Die Tabellen und ihre Daten werden nicht gelöscht, sodass Sie sie später noch einmal aufrufen können.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Nächste Schritte