DAG-Probleme mit unzureichendem Arbeitsspeicher und fehlendem Speicher beheben

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Diese Anleitung enthält Schritte zum Debuggen eines fehlgeschlagenen Airflow-DAG in Cloud Composer und zur Diagnose von Worker-Ressourcenproblemen wie fehlendem Worker-Arbeitsspeicher oder Speicherplatz mithilfe von Logs und Umgebungsmonitoring.

Einleitung

In dieser Anleitung geht es vorrangig um ressourcenbezogene Probleme, um Möglichkeiten für die Fehlerbehebung in einem DAG aufzuzeigen.

Ein Mangel an zugewiesenen Worker-Ressourcen führt zu DAG-Fehlern. Wenn für eine Airflow-Aufgabe nicht genügend Arbeitsspeicher oder Speicherplatz vorhanden ist, wird möglicherweise eine Airflow-Ausnahme wie die folgende angezeigt:

WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.

oder

Task exited with return code Negsignal.SIGKILL

In solchen Fällen besteht die allgemeine Empfehlung darin, die Anzahl der Airflow-Worker-Ressourcen zu erhöhen oder die Anzahl der Aufgaben pro Worker zu reduzieren. Da Airflow-Ausnahmen jedoch allgemein sein können, kann es schwierig sein, die jeweilige Ressource zu identifizieren, die das Problem verursacht.

In dieser Anleitung wird erläutert, wie Sie den Grund für einen DAG-Fehler diagnostizieren und den Ressourcentyp einer Ressource identifizieren können, indem Sie zwei Beispiel-DAGs beheben, die aufgrund von zu wenig Worker-Arbeitsspeicher und -Speicher fehlschlagen.

Lernziele

  • Führen Sie Beispiel-DAGs aus, die aus folgenden Gründen fehlschlagen:

    • Fehlender Worker-Arbeitsspeicher
    • Fehlender Worker-Speicherplatz
  • Diagnose der Fehlerursachen

  • Zugewiesene Worker-Ressourcen erhöhen

  • DAGs mit neuen Ressourcenlimits testen

Kosten

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

Nach Abschluss dieser Anleitung können Sie weitere Kosten durch Löschen von erstellten Ressourcen vermeiden. Weitere Informationen finden Sie unter Bereinigen.

Hinweise

In diesem Abschnitt werden die Aktionen beschrieben, die vor dem Start der Anleitung erforderlich sind.

Projekt erstellen und konfigurieren

Für diese Anleitung benötigen Sie ein Google Cloud-Projekt. Konfigurieren Sie das Projekt so:

  1. Wählen Sie in der Google Cloud Console ein Projekt aus oder erstellen Sie eines:

    Zur Projektauswahl

  2. Die Abrechnung für Ihr Projekt muss aktiviert sein. Hier erfahren Sie, wie Sie prüfen, ob die Abrechnung für ein Projekt aktiviert ist.

  3. Sorgen Sie dafür, dass Ihr Google Cloud-Projektnutzer die folgenden Rollen hat, um die erforderlichen Ressourcen zu erstellen:

    • Administrator für Umgebung und Storage-Objekte (roles/composer.environmentAndStorageObjectAdmin)
    • Compute-Administrator (roles/compute.admin)
    • Monitoring-Bearbeiter (roles/monitoring.editor)

Die APIs für Ihr Projekt aktivieren

Cloud Composer API aktivieren.

Aktivieren Sie die API

Cloud Composer-Umgebung erstellen

Erstellen Sie eine Cloud Composer 2-Umgebung.

Beim Erstellen der Umgebung gewähren Sie dem Composer-Dienst-Agent-Konto die Rolle Dienst-Agent-Erweiterung für Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext). Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud-Projekt auszuführen.

Limits für Worker-Ressourcen prüfen

Prüfen Sie die Airflow-Worker-Ressourcenlimits in Ihrer Umgebung:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Gehen Sie zu Ressourcen > Arbeitslastkonfiguration > Worker.

  5. Die Werte müssen 0,5 vCPUs, 1,875 GB Arbeitsspeicher und 1 GB Speicher lauten. Dies sind die Limits für die Airflow-Worker-Ressourcen, mit denen Sie in den nächsten Schritten dieser Anleitung arbeiten.

Beispiel: Probleme mit unzureichendem Arbeitsspeicher diagnostizieren

Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG create_list_with_many_strings.

Dieser DAG enthält eine Aufgabe, die die folgenden Schritte ausführt:

  1. Erstellt eine leere Liste s.
  2. Führt einen Zyklus aus, um den String More an die Liste anzufügen.
  3. Gibt an, wie viel Arbeitsspeicher die Liste verbraucht, und wartet bei jeder einminütigen Iteration 1 Sekunde.
import time

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_list_with_many_strings',
    default_args=default_args,
    schedule_interval=None)


def consume():
    s = []
    for i in range(120):
        for j in range(1000000):
            s.append("More")
        print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
        time.sleep(1)


t1 = PythonOperator(
    task_id='task0',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0
)

Beispiel-DAG auslösen

Lösen Sie den Beispiel-DAG create_list_with_many_strings aus:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.

  4. Klickbasierter Trigger

  5. Klicken Sie auf der Seite DAGs auf die Aufgabe, die Sie ausgelöst haben, und prüfen Sie anhand der Ausgabelogs, ob der DAG ausgeführt wird.

Während die Aufgabe ausgeführt wird, wird in den Ausgabelogs die vom DAG verwendete Arbeitsspeichergröße in GB ausgegeben.

Nach einigen Minuten schlägt die Aufgabe fehl, da sie das Airflow-Arbeitsspeicherlimit von 1,875 GB überschreitet.

Fehlgeschlagenen DAG diagnostizieren

Wenn Sie zum Zeitpunkt des Fehlers mehrere Aufgaben ausgeführt haben, sollten Sie nur eine Aufgabe ausführen und die Ressourcenauslastung in dieser Zeit diagnostizieren, um festzustellen, welche Aufgaben Ressourcenauslastung verursachen und welche Ressourcen erhöht werden müssen.

Airflow-Aufgabenlogs prüfen

Beachten Sie, dass die Aufgabe aus dem DAG create_list_with_many_strings den Status Failed hat.

Überprüfen Sie die Logs der Aufgabe. Sie sehen den folgenden Logeintrag:

```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```

`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.

Arbeitslasten überprüfen

Überprüfen Sie die Arbeitslasten, um sicherzustellen, dass die Auslastung Ihrer Aufgabe nicht dazu führt, dass der Knoten, auf dem der Pod ausgeführt wird, das Limit für die Arbeitsspeichernutzung überschreitet:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Klicken Sie unter Ressourcen > GKE-Cluster > Arbeitslasten auf Clusterarbeitslasten ansehen.

  5. Prüfen Sie, ob einige der Arbeitslast-Pods einen ähnlichen Status wie den folgenden haben:

    Error with exit code 137 and 1 more issue.
    ContainerStatusUnknown with exit code 137 and 1 more issue
    

    Exit code 137 bedeutet, dass ein Container oder Pod versucht, mehr Arbeitsspeicher als zulässig zu verwenden. Der Prozess wird beendet, um eine Arbeitsspeichernutzung zu verhindern.

Monitoring des Umgebungszustands und des Ressourcenverbrauchs prüfen

Prüfen Sie das Monitoring des Umgebungszustands und des Ressourcenverbrauchs:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Wechseln Sie zum Tab Monitoring und wählen Sie Übersicht aus.

  4. Suchen Sie im Bereich Umgebungsübersicht nach dem Diagramm Umgebungszustand (Airflow Monitoring-DAG). Es enthält einen roten Bereich, der dem Zeitpunkt entspricht, zu dem die Logs mit dem Drucken von Fehlern beginnen.

  5. Wählen Sie Worker aus und gehen Sie zur Grafik Gesamte Arbeitsspeichernutzung der Worker. Beachten Sie, dass die Zeile Arbeitsspeichernutzung zu dem Zeitpunkt, an dem die Aufgabe ausgeführt wurde, einen Anstieg aufweist.

Die Zeile „Arbeitsspeichernutzung“ hat zu dem Zeitpunkt, als die Aufgabe ausgeführt wurde, eine Spitze erreicht
Abbildung 1. Diagramm zur Arbeitsspeichernutzung der gesamten Worker (zum Vergrößern klicken)

Auch wenn die Linie für die Arbeitsspeichernutzung in der Grafik nicht das Limit erreicht, müssen Sie bei der Diagnose der Fehlerursachen nur den zuweisbaren Arbeitsspeicher berücksichtigen. Die Zeile Arbeitsspeicherlimit im Diagramm steht für den insgesamt verfügbaren Arbeitsspeicher (einschließlich der von GKE reservierten Kapazität).

In diesem Beispiel ist das Worker-Arbeitsspeicherlimit auf 1, 875 GB festgelegt. GKE reserviert 25% der ersten 4 GiB Arbeitsspeicher. GKE reserviert außerdem einen zusätzlichen Schwellenwert für die Bereinigung: 100 MiB Arbeitsspeicher pro Knoten für die Kubelet-Bereinigung.

Der zuweisbare Arbeitsspeicher wird so berechnet:

ALLOCATABLE = CAPACITY - RESERVED - EVICTION-THRESHOLD

Wenn das Arbeitsspeicherlimit 1,875 GB beträgt, ist der tatsächlich zuweisbare Arbeitsspeicher:

1.75 GiB (1.875GB) - 0.44 (25% GiB reserved) - 0.1 = 1.21 GiB (~1.3 GB).

Wenn Sie dieses tatsächliche Limit an das Diagramm der Arbeitsspeichernutzung anhängen, sehen Sie, dass der Anstieg der Arbeitsspeichernutzung der Aufgabe das tatsächliche Arbeitsspeicherlimit erreicht. Sie können daraus schließen, dass die Aufgabe aufgrund unzureichenden Worker-Arbeitsspeichers fehlgeschlagen ist.

Worker-Arbeitsspeicherlimit erhöhen

Weisen Sie zusätzlichen Worker-Arbeitsspeicher zu, damit der Beispiel-DAG erfolgreich ist:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Suchen Sie die Konfiguration unter Ressourcen > Arbeitslasten und klicken Sie auf Bearbeiten.

  5. Geben Sie im Abschnitt Worker im Feld Arbeitsspeicher das neue Arbeitsspeicherlimit für Airflow-Worker an. Verwenden Sie in dieser Anleitung 3 GB.

  6. Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Worker neu gestartet werden.

DAG mit neuem Arbeitsspeicherlimit testen

Lösen Sie den DAG create_list_with_many_strings noch einmal aus und warten Sie, bis die Ausführung abgeschlossen ist.

  1. In den Ausgabelogs der DAG-Ausführung sehen Sie Marking task as SUCCESS und der Status der Aufgabe lautet Success.

  2. Prüfen Sie auf dem Tab Monitoring im Abschnitt Umgebungsübersicht, ob rote Bereiche zu sehen sind.

  3. Klicken Sie auf den Abschnitt Worker und suchen Sie die Grafik Gesamte Arbeitsspeichernutzung der Worker. Sie sehen, dass die Zeile Speicherlimit die Änderung des Arbeitsspeicherlimits widerspiegelt und die Zeile Arbeitsspeichernutzung weit unter dem tatsächlichen Limit für den zuweisbaren Arbeitsspeicher liegt.

Beispiel: Probleme mit unzureichendem Speicherplatz diagnostizieren

In diesem Schritt laden Sie zwei DAGs hoch, die große Dateien erstellen. Der erste DAG erstellt eine große Datei. Der zweite DAG erstellt eine große Datei und imitiert einen lang andauernden Vorgang.

Die Datei ist in beiden DAGs größer als das standardmäßige Speicherlimit von Airflow-Workern von 1 GB. Der zweite DAG hat jedoch eine zusätzliche Warteaufgabe, um die Dauer künstlich zu verlängern.

In den nächsten Schritten untersuchen Sie die Unterschiede im Verhalten beider DAGs.

DAG hochladen, der eine große Datei erstellt

Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG create_large_txt_file_print_logs.

Dieser DAG enthält eine Aufgabe, die die folgenden Schritte ausführt:

  1. Schreibt eine 1,5 GB große Datei localfile.txt in den Airflow-Worker-Speicher.
  2. Gibt die Größe der erstellten Datei mit dem Python-Modul os aus.
  3. Gibt die Dauer der DAG-Ausführung jede Minute aus.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

DAG hochladen, der eine große Datei in einem lang andauernden Vorgang erstellt

Wenn Sie einen DAG mit langer Ausführungszeit imitieren und die Auswirkungen der Aufgabendauer auf den Endzustand untersuchen möchten, laden Sie den zweiten Beispiel-DAG in Ihre Umgebung hoch. In dieser Anleitung heißt dieser DAG long_running_create_large_txt_file_print_logs.

Dieser DAG enthält eine Aufgabe, die die folgenden Schritte ausführt:

  1. Schreibt eine 1,5 GB große Datei localfile.txt in den Airflow-Worker-Speicher.
  2. Gibt die Größe der erstellten Datei mit dem Python-Modul os aus.
  3. Wartet 1 Stunde und 15 Minuten, um die für Vorgänge mit der Datei erforderliche Zeit nachzuahmen, z. B. das Lesen aus der Datei.
  4. Gibt die Dauer der DAG-Ausführung jede Minute aus.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'long_running_create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    for k in range(75):
        time.sleep(60)
        print(f"{k+1} minute")

    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

Beispiel-DAGs auslösen

Lösen Sie den ersten DAG create_large_txt_file_print_logs aus:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.

  4. Klickbasierter Trigger

  5. Klicken Sie auf der Seite DAGs auf die Aufgabe, die Sie ausgelöst haben, und prüfen Sie anhand der Ausgabelogs, ob der DAG ausgeführt wird.

  6. Warten Sie, bis die mit dem DAG create_large_txt_file_print_logs erstellte Aufgabe abgeschlossen ist. Dies kann einige Minuten dauern.

  7. Klicken Sie auf der Seite DAGs auf die DAG-Ausführung. Sie sehen, dass Ihre Aufgabe den Status Success hat, obwohl das Speicherlimit überschritten wurde.

Prüfen Sie die Airflow-Logs der Aufgabe:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

    1. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

    2. Wechseln Sie zum Tab Logs und klicken Sie dann auf Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen.

    3. Filtern Sie die Logs nach Typ: Nur Fehlermeldungen werden angezeigt.

Die Protokolle enthalten Nachrichten wie diese:

Worker: warm shutdown (Main Process)

oder

A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

Diese Logs zeigen, dass der Pod das Warm-Herunterfahren gestartet hat, weil der verwendete Speicher das Limit überschritten hat und innerhalb einer Stunde entfernt wurde. Die DAG-Ausführung ist jedoch nicht fehlgeschlagen, da sie innerhalb des Kulanzzeitraums für die Beendigung von Kubernetes abgeschlossen wurde, der in dieser Anleitung weiter erläutert wird.

Sehen Sie sich das Ergebnis des zweiten Beispiel-DAG long_running_create_large_txt_file_print_logs an, um das Konzept des Kulanzzeitraums für die Beendigung zu veranschaulichen.

Lösen Sie den zweiten DAG long_running_create_large_txt_file_print_logs aus:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.

  4. Klickbasierter Trigger

  5. Klicken Sie auf der Seite DAGs auf die Aufgabe, die Sie ausgelöst haben, und prüfen Sie anhand der Ausgabelogs, ob der DAG ausgeführt wird.

  6. Warten Sie, bis die DAG-Ausführung long_running_create_large_txt_file_print_logs fehlschlägt. Dies dauert etwa eine Stunde.

Prüfen Sie die Ergebnisse der DAG-Ausführung:

  1. Klicken Sie auf der Seite DAGs auf die DAG-Ausführung long_running_create_large_txt_file_print_logs. Sie sehen, dass die Aufgabe den Status Failed hat und die Ausführungsdauer genau 1 Stunde und 5 Minuten betrug, was kürzer als die Wartezeit der Aufgabe von 1 Stunde und 15 Minuten ist.

  2. Überprüfen Sie die Logs der Aufgabe. Nachdem der DAG die Datei localfile.txt im Container des Airflow-Workers erstellt hat, gibt das Log aus, dass der DAG gewartet hat. Die Ausführungsdauer wird alle 1 Minuten in die Aufgabenlogs eingetragen. In diesem Beispiel gibt der DAG das Log localfile.txt size: aus und die Größe der Datei localfile.txt beträgt 1, 5 GB.

Sobald die in den Container des Airflow-Workers geschriebene Datei das Speicherlimit überschreitet, sollte die DAG-Ausführung fehlschlagen. Die Aufgabe schlägt jedoch nicht sofort fehl und wird weiter ausgeführt, bis die Dauer 1 Stunde und 5 Minuten erreicht ist. Dies liegt daran, dass Kubernetes die Aufgabe nicht sofort abbricht und weiterhin ausgeführt wird, um eine Stunde für die Wiederherstellung zu ermöglichen, was als „Kulanzzeitraum für die Beendigung“ bezeichnet wird. Wenn die Ressourcen eines Knotens ausgehen, beendet Kubernetes den Pod nicht sofort, um eine ordnungsgemäße Beendigung zu ermöglichen, sodass die Auswirkungen auf den Endnutzer minimal sind.

Der Kulanzzeitraum für die Beendigung hilft Nutzern, Dateien nach Aufgabenfehlern wiederherzustellen. Er kann jedoch bei der Diagnose von DAGs zu Verwirrung führen. Wenn das Speicherlimit des Airflow-Workers überschritten wird, hängt der Status der Endaufgabe von der Dauer der DAG-Ausführung ab:

  • Wenn die DAG-Ausführung das Worker-Speicherlimit überschreitet, aber in weniger als einer Stunde abgeschlossen wird, wird die Aufgabe mit dem Status Success abgeschlossen, da sie innerhalb des Kulanzzeitraums für die Beendigung abgeschlossen wurde. Kubernetes beendet jedoch den Pod und die geschriebene Datei wird sofort aus dem Container gelöscht.

  • Wenn der DAG das Worker-Speicherlimit überschreitet und länger als eine Stunde ausgeführt wird, wird der DAG eine Stunde lang ausgeführt und kann das Speicherlimit um Tausende Prozent überschreiten, bevor Kubernetes die Pod- und Airflow-Markierung der Aufgabe als Failed löscht.

Fehlgeschlagenen DAG diagnostizieren

Wenn Sie zum Zeitpunkt des Fehlers mehrere Aufgaben ausgeführt haben, sollten Sie nur eine Aufgabe ausführen und die Ressourcenauslastung in dieser Zeit diagnostizieren, um festzustellen, welche Aufgaben Ressourcenauslastung verursachen und welche Ressourcen erhöht werden müssen.

Sehen Sie sich die Aufgabenlogs des zweiten DAG long_running_create_large_txt_file_print_logs an:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Wechseln Sie zum Tab Logs und klicken Sie dann auf Alle Logs > Airflow-Logs > Worker > Im Log-Explorer ansehen.

  4. Filtern Sie die Logs nach Typ: Nur Fehlermeldungen werden angezeigt.

Die Protokolle enthalten Nachrichten wie diese:

Container storage usage of worker reached 155.7% of the limit.

This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.

You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.

oder

Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.

Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.

Diese Meldungen weisen darauf hin, dass Airflow-Logs im Verlauf der Aufgabe Fehler ausgegeben haben, als die Größe der von Ihrem DAG generierten Dateien das Speicherlimit für Worker überschritten und der Kulanzzeitraum für die Beendigung eingeleitet wurde. Während des Kulanzzeitraums für die Beendigung hat die Speichernutzung nicht wieder das Limit erreicht, was zu einer Pod-Bereinigung nach Ablauf des Kulanzzeitraums für die Beendigung führte.

Prüfen Sie das Monitoring des Umgebungszustands und des Ressourcenverbrauchs:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Wechseln Sie zum Tab Monitoring und wählen Sie Übersicht aus.

  4. Suchen Sie im Bereich Umgebungsübersicht nach dem Diagramm Umgebungszustand (Airflow Monitoring-DAG). Es enthält einen roten Bereich, der dem Zeitpunkt entspricht, zu dem die Logs mit dem Drucken von Fehlern beginnen.

  5. Wählen Sie Worker aus und suchen Sie das Diagramm Gesamte Laufwerksnutzung der Worker. Beachten Sie, dass die Zeile Laufwerksnutzung einen Anstieg aufweist und die Zeile Laufwerkslimit zum Zeitpunkt der Ausführung der Aufgabe überschreitet.

Die Zeile „Laufwerksnutzung“ hat einen Spitzenwert und überschreitet die Zeile „Laufwerkslimit“ zum Zeitpunkt, als Ihre Aufgabe ausgeführt wurde
Abbildung 2. Diagramm zur gesamten Laufwerksnutzung der Worker (zum Vergrößern klicken)

Worker-Speicherlimit erhöhen

Weisen Sie zusätzlichen Airflow-Worker-Speicher zu, damit der Beispiel-DAG erfolgreich ist:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Suchen Sie die Konfiguration unter Ressourcen > Arbeitslasten und klicken Sie auf Bearbeiten.

  5. Geben Sie im Abschnitt Worker im Feld Speicher das neue Speicherlimit für Airflow-Worker an. In dieser Anleitung legen Sie dafür 2 GB fest.

  6. Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Worker neu gestartet werden.

DAG mit neuem Speicherlimit testen

Lösen Sie den DAG long_running_create_large_txt_file_print_logs noch einmal aus und warten Sie 1 Stunde und 15 Minuten, bis er fertig ist.

  1. In den Ausgabelogs der DAG-Ausführung wird Marking task as SUCCESS angezeigt und der Status der Aufgabe lautet Erfolgreich mit einer Dauer von 1 Stunde und 15 Minuten, was der im DAG-Code festgelegten Wartezeit entspricht.

  2. Prüfen Sie auf dem Tab Monitoring im Abschnitt Umgebungsübersicht, ob rote Bereiche zu sehen sind.

  3. Klicken Sie auf den Abschnitt Worker und suchen Sie die Grafik Gesamte Laufwerksnutzung der Worker. Sie sehen, dass die Zeile Laufwerklimit die Änderung des Speicherlimits widerspiegelt und die Zeile Laufwerksnutzung innerhalb des zulässigen Bereichs liegt.

Zusammenfassung

In dieser Anleitung haben Sie den Grund für einen DAG-Fehler diagnostiziert und den Ressourcentyp ermittelt, der Auslastung verursacht. Dazu haben Sie zwei Beispiel-DAGs behoben, die aufgrund von zu wenig Worker-Arbeitsspeicher und -Speicher fehlschlagen. Anschließend haben Sie die DAGs erfolgreich ausgeführt, nachdem Sie Ihren Workern mehr Arbeitsspeicher und Speicherplatz zugewiesen haben. Es wird jedoch empfohlen, Ihre DAGs (Workflows) zu optimieren, um den Verbrauch von Worker-Ressourcen von vornherein zu reduzieren, da es nicht möglich ist, die Ressourcen über einen bestimmten Schwellenwert hinaus zu erhöhen.

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder behalten Sie das Projekt bei und löschen Sie die einzelnen Ressourcen.

Projekt löschen

  1. Wechseln Sie in der Google Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Einzelne Ressourcen löschen

Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.

Löschen Sie die Cloud Composer-Umgebung. Während dieses Vorgangs löschen Sie auch den Bucket der Umgebung.

Nächste Schritte