Fehlerbehebung bei Umgebungsupdates und -upgrades

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite finden Sie Informationen zur Fehlerbehebung bei Problemen, die beim Aktualisieren oder Upgraden von Cloud Composer-Umgebungen auftreten können.

Informationen zur Fehlerbehebung beim Erstellen von Umgebungen finden Sie unter Fehlerbehebung bei der Erstellung von Umgebungen.

Wenn Cloud Composer-Umgebungen aktualisiert werden, treten die meisten Probleme aus den folgenden Gründen auf:

  • Probleme mit Dienstkontoberechtigungen
  • Probleme mit der PyPI-Abhängigkeit
  • Größe der Airflow-Datenbank

Unzureichende Berechtigungen zum Aktualisieren oder Upgraden einer Umgebung

Wenn Cloud Composer eine Umgebung aufgrund unzureichender Berechtigungen nicht aktualisieren oder updaten kann, wird die folgende Fehlermeldung ausgegeben:

ERROR: (gcloud.composer.environments.update) PERMISSION_DENIED: The caller does not have permission

Lösung: Weisen Sie Ihrem Konto und dem Dienstkonto Ihrer Umgebung Rollen zu, wie unter Zugriffssteuerung beschrieben.

Das Dienstkonto der Umgebung hat nicht die erforderlichen Berechtigungen

Beim Erstellen einer Cloud Composer-Umgebung geben Sie ein Dienstkonto an, das die GKE-Clusterknoten der Umgebung ausführt. Wenn dieses Dienstkonto nicht genügend Berechtigungen für den angeforderten Vorgang hat, gibt Cloud Composer einen Fehler aus:

    UPDATE operation on this environment failed 3 minutes ago with the
    following error message:
    Composer Backend timed out. Currently running tasks are [stage:
    CP_COMPOSER_AGENT_RUNNING
    description: "No agent response published."
    response_timestamp {
      seconds: 1618203503
      nanos: 291000000
    }
    ].

Lösung: Weisen Sie Ihrem Konto und dem Dienstkonto Ihrer Umgebung Rollen zu, wie unter Zugriffssteuerung beschrieben.

Die Airflow-Datenbank ist zu groß für den Vorgang durchzuführen

Ein Cloud Composer-Upgradevorgang ist möglicherweise nicht erfolgreich, da die Größe der Die Airflow-Datenbank ist zu groß, um Upgradevorgänge ausführen zu können.

Wenn die Größe der Airflow-Datenbank mehr als 16 GB beträgt, gibt Cloud Composer den folgenden Fehler aus:

Airflow database uses more than 16 GB. Please clean the database before upgrading.

Lösung: Führen Sie eine Airflow-Datenbankbereinigung durch, wie unter Airflow-Datenbankwartung beschrieben.

Ein Upgrade auf eine neue Cloud Composer-Version schlägt aufgrund von PyPI-Paketkonflikten fehl

Wenn Sie eine Umgebung mit installierten benutzerdefinierten PyPI-Paketen aktualisieren, können Fehler im Zusammenhang mit PyPI-Paketkonflikten auftreten. Dies kann daran liegen, dass das neue Das Cloud Composer-Image enthält neuere Versionen vorinstallierter Pakete, die Abhängigkeitskonflikte mit PyPI-Paketen verursachen, die Sie in Ihrem zu verbessern.

Lösung:

  • Führen Sie eine Upgradeprüfung durch, um detaillierte Informationen zu Paketkonflikten zu erhalten.
  • Lockern Sie Versionseinschränkungen für installierte benutzerdefinierte PyPI-Pakete. Geben Sie für eine Version statt ==1.0.1 die Option >=1.0.1 ein.
  • Weitere Informationen zum Ändern von Versionsanforderungen, um das Problem zu beheben widersprüchlichen Abhängigkeiten finden Sie unter Dokumentation zu pip

Es ist nicht möglich, ein Upgrade einer Umgebung auf eine Version durchzuführen, die noch unterstützt wird

Upgrades für Cloud Composer-Umgebungen können nur auf neuesten und älteren Versionen.

Die Versionseinschränkungen für das Erstellen neuer Umgebungen und das Upgraden vorhandener Umgebungen unterscheiden sich. Die ausgewählte Cloud Composer-Version beim Erstellen einer neuen Umgebung ist beim Upgrade einer vorhandenen Umgebung möglicherweise nicht verfügbar. Umgebungen.

Sie können das Upgrade mit der Google Cloud CLI, der API oder Terraform ausführen. In der Google Cloud Console sind nur die neuesten Versionen verfügbar. als Upgradeoptionen.

Eine fehlende Verbindung zum DNS kann Probleme bei Upgrades oder Updates verursachen.

Solche Verbindungsprobleme können zu folgenden Logeinträgen führen:

WARNING - Compute Engine Metadata server unavailable attempt 1 of 5. Reason: [Errno -3] Temporary failure in name resolution Error

Normalerweise bedeutet das, dass keine Route zum DNS vorhanden ist. Achten Sie daher darauf, dass metadata.google.internal Der DNS-Name kann aus Cluster-, Pods- und Dienstnetzwerken in IP-Adressen aufgelöst werden. Prüfen Sie, ob der private Google-Zugriff im VPC (im Host- oder Dienstprojekt) aktiviert ist, in dem Ihre Umgebung erstellt wird.

Weitere Informationen:

Die Trigger-CPU überschreitet das Limit von 1 vCPU

Cloud Composer 2 in Versionen 2.4.4 und höher führt eine andere Triggerer-Ressourcenzuweisungsstrategie ein. um die Leistungsskalierung zu verbessern. Wenn beim Ausführen einer Umgebung ein Fehler im Zusammenhang mit der Trigger-CPU auftritt aktualisieren, bedeutet dies, dass Ihre aktuellen Trigger konfiguriert, mehr als 1 vCPU pro Triggerer zu verwenden.

Lösung:

Warnungen bei fehlgeschlagenen Migrationen

Beim Upgrade von Airflow auf eine neuere Version werden manchmal neue Einschränkungen auf die Airflow-Datenbank angewendet. Wenn diese Einschränkungen nicht angewendet werden können, Airflow erstellt neue Tabellen zum Speichern der Zeilen, für die die Einschränkungen nicht angewendet werden. In der Airflow-UI wird eine Warnmeldung angezeigt, bis die verschobenen Datentabellen übertragen wurden. umbenannt oder verworfen wurden.

Lösung:

Mit den folgenden beiden DAGs können Sie die verschobenen Daten prüfen und die Tabellen umbenennen.

Im list_moved_tables_after_upgrade_dag-DAG sind Zeilen aufgeführt, die aus jeder Tabelle verschoben wurden, in der Einschränkungen nicht angewendet werden konnten. Daten prüfen und entscheiden ob Sie sie behalten möchten. Wenn Sie sie behalten möchten, müssen Sie die Daten in der Airflow-Datenbank manuell korrigieren. Beispielsweise können Sie die Zeilen mit den richtigen Daten wieder hinzufügen.

Wenn Sie die Daten nicht benötigen oder das Problem bereits behoben haben, können Sie den DAG für rename_moved_tables_after_upgrade_dag ausführen. Dieser DAG benennt die verschobenen Tabellen um. Die Tabellen und die zugehörigen Daten werden nicht gelöscht, sodass Sie die Daten jederzeit späteren Punkt.

"""
When upgrading Airflow to a newer version,
it might happen that some data cannot be migrated,
often because of constraint changes in the metadata base.
This file contains 2 DAGs:

1. 'list_moved_tables_after_upgrade_dag'
  Prints the rows which failed to be migrated.
2. 'rename_moved_tables_after_upgrade_dag'
  Renames the table which contains the failed migrations. This will remove the
  warning message from airflow.
"""

import datetime
import logging

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX


def get_moved_tables():
    hook = PostgresHook(postgres_conn_id="airflow_db")
    return hook.get_records(
        "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
        f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
    )


def list_moved_records():
    tables = get_moved_tables()
    if not tables:
        logging.info("No moved tables found")
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
        logging.info(df.to_markdown())


def rename_moved_tables():
    tables = get_moved_tables()
    if not tables:
        return

    hook = PostgresHook(postgres_conn_id="airflow_db")
    for schema, table in tables:
        hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")


with DAG(
    dag_id="list_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
):
    t1 = PythonOperator(
        task_id="list_moved_records", python_callable=list_moved_records
    )

with DAG(
    dag_id="rename_moved_tables_after_upgrade_dag",
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    t1 = PythonOperator(
        task_id="rename_moved_tables", python_callable=rename_moved_tables
    )

Nächste Schritte