DAG-Probleme mit unzureichendem Arbeitsspeicher und unzureichendem Speicherplatz beheben

Cloud Composer 1 Cloud Composer 2

In dieser Anleitung werden Schritte beschrieben, mit denen Sie einen fehlgeschlagenen Airflow-DAG in Cloud Composer beheben und mithilfe von Logs und Umgebungsmonitoring Probleme mit Worker-Ressourcen diagnostizieren können, z. B. unzureichender Worker-Arbeitsspeicher oder nicht ausreichender Speicherplatz.

Einleitung

In dieser Anleitung geht es um ressourcenbezogene Probleme, um Möglichkeiten zum Debuggen eines DAG zu demonstrieren.

Ein Mangel an zugewiesenen Worker-Ressourcen führt zu DAG-Fehlern. Wenn eine Airflow-Aufgabe nicht mehr genügend Arbeitsspeicher oder Speicherplatz hat, wird möglicherweise eine Airflow-Ausnahme angezeigt, z. B.:

WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.

oder

Task exited with return code Negsignal.SIGKILL

In solchen Fällen besteht die allgemeine Empfehlung, die Airflow-Worker-Ressourcen zu erhöhen oder die Anzahl der Aufgaben pro Worker zu reduzieren. Da Airflow-Ausnahmen jedoch generisch sein können, kann es schwierig sein, die jeweilige Ressource zu identifizieren, die das Problem verursacht.

In dieser Anleitung wird erläutert, wie Sie den Grund für einen DAG-Fehler diagnostizieren und den Ressourcentyp, der Probleme verursacht, ermitteln können. Dazu müssen Sie zwei Beispiel-DAGs debuggen, die aufgrund eines fehlenden Worker-Arbeitsspeichers und -Speichers fehlschlagen.

Lernziele

  • Führen Sie Beispiel-DAGs aus, die aus folgenden Gründen fehlschlagen:

    • Fehlender Worker-Arbeitsspeicher
    • Zu wenig Speicherplatz für Worker
  • Ursachen des Fehlers diagnostizieren

  • Zugewiesene Worker-Ressourcen erhöhen

  • DAGs mit neuen Ressourcenlimits testen

Kosten

In dieser Anleitung werden die folgenden kostenpflichtigen Komponenten von Google Cloud verwendet:

Nach Abschluss dieser Anleitung können Sie weitere Kosten durch Löschen von erstellten Ressourcen vermeiden. Weitere Informationen finden Sie im Hilfeartikel Bereinigen.

Hinweise

In diesem Abschnitt werden Aktionen beschrieben, die vor dem Start der Anleitung erforderlich sind.

Projekt erstellen und konfigurieren

Für diese Anleitung benötigen Sie ein Google Cloud-Projekt. So konfigurieren Sie das Projekt:

  1. Wählen oder erstellen Sie ein Projekt in der Google Cloud Console:

    Zur Projektauswahl

  2. Die Abrechnung für Ihr Projekt muss aktiviert sein. Hier erfahren Sie, wie Sie prüfen, ob die Abrechnung für ein Projekt aktiviert ist.

  3. Der Nutzer des Google Cloud-Projekts benötigt die folgenden Rollen, um die erforderlichen Ressourcen zu erstellen:

    • Administrator für Umgebung und Speicherobjekte (roles/composer.environmentAndStorageObjectAdmin)
    • Compute-Administrator (roles/compute.admin)
    • Monitoring-Bearbeiter (roles/monitoring.editor)

Die APIs für Ihr Projekt aktivieren

Cloud Composer API aktivieren.

Aktivieren Sie die API

Cloud Composer-Umgebung erstellen

Cloud Composer 2-Umgebung erstellen

Beim Erstellen der Umgebung gewähren Sie dem Composer-Dienst-Agent-Konto die Rolle Dienst-Agent-Erweiterung für Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext). Cloud Composer verwendet dieses Konto, um Vorgänge in Ihrem Google Cloud-Projekt auszuführen.

Worker-Ressourcenlimits prüfen

Prüfen Sie die Limits für Airflow-Worker-Ressourcen in Ihrer Umgebung:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Rufen Sie Ressourcen > Arbeitslastkonfiguration > Worker auf.

  5. Prüfen Sie, ob die Werte 0,5 vCPUs, 1,875 GB Arbeitsspeicher und 1 GB Speicher sind. Dies sind die Limits für Airflow-Worker-Ressourcen, mit denen Sie in den nächsten Schritten dieser Anleitung arbeiten.

Beispiel: Probleme mit unzureichendem Arbeitsspeicher diagnostizieren

Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG create_list_with_many_strings.

Dieser DAG enthält eine Aufgabe, die die folgenden Schritte ausführt:

  1. Erstellt eine leere Liste s.
  2. Führt einen Zyklus aus, um den String More an die Liste anzufügen.
  3. Gibt an, wie viel Speicher die Liste verbraucht, und wartet bei jedem einminütigen Durchlauf 1 Sekunde.
import time

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_list_with_many_strings',
    default_args=default_args,
    schedule_interval=None)


def consume():
    s = []
    for i in range(120):
        for j in range(1000000):
            s.append("More")
        print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
        time.sleep(1)


t1 = PythonOperator(
    task_id='task0',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0
)

Beispiel-DAG auslösen

Lösen Sie den Beispiel-DAG create_list_with_many_strings aus:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.

  4. Klickbasierter Trigger

  5. Klicken Sie auf der Seite DAGs auf die Aufgabe, die Sie ausgelöst haben, und überprüfen Sie die Ausgabelogs, um sicherzustellen, dass der DAG gestartet wurde.

Während die Aufgabe ausgeführt wird, geben die Ausgabelogs die vom DAG verwendete Arbeitsspeichergröße in GB aus.

Nach einigen Minuten schlägt die Aufgabe fehl, da sie das Arbeitsspeicherlimit des Airflow-Workers von 1,875 GB überschreitet.

Fehlgeschlagenen DAG diagnostizieren

Wenn Sie zum Zeitpunkt des Fehlers mehrere Aufgaben ausgeführt haben, sollten Sie nur eine Aufgabe ausführen und den Ressourcendruck während dieser Zeit diagnostizieren, um festzustellen, welche Aufgaben zu einem Ressourcendruck führen und welche Ressourcen erhöht werden müssen.

Airflow-Tasklogs prüfen

Die Aufgabe aus dem DAG create_list_with_many_strings hat den Status Failed.

Prüfen Sie die Logs der Aufgabe. Sie sehen den folgenden Logeintrag:

```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```

`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.

Arbeitslasten überprüfen

Prüfen Sie die Arbeitslasten, um sicherzustellen, dass die Last der Aufgabe nicht dazu führt, dass der Knoten, auf dem der Pod ausgeführt wird, das Limit für die Arbeitsspeichernutzung überschreitet:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Klicken Sie unter Ressourcen > GKE-Cluster > Arbeitslasten auf Clusterarbeitslasten ansehen.

  5. Prüfen Sie, ob einige der Arbeitslast-Pods einen Status ähnlich dem folgenden haben:

    Error with exit code 137 and 1 more issue.
    ContainerStatusUnknown with exit code 137 and 1 more issue
    

    Exit code 137 bedeutet, dass ein Container oder Pod versucht, mehr Arbeitsspeicher als zulässig zu verwenden. Der Prozess wird beendet, um die Arbeitsspeichernutzung zu verhindern.

Monitoring des Umgebungsstatus und des Ressourcenverbrauchs prüfen

Prüfen Sie das Monitoring des Umgebungsstatus und des Ressourcenverbrauchs:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Wechseln Sie zum Tab Monitoring und wählen Sie Übersicht aus.

  4. Suchen Sie im Bereich Umgebungsübersicht nach dem Diagramm Umgebungszustand (Airflow-Monitoring-DAG). Er enthält einen roten Bereich, der der Zeit entspricht, zu der der Fehlerdruck von Logs begann.

  5. Wählen Sie Worker aus und suchen Sie das Diagramm Gesamtarbeitsspeichernutzung der Worker. Achten Sie darauf, dass die Zeile Arbeitsspeichernutzung zu dem Zeitpunkt, als die Aufgabe ausgeführt wurde, einen Spitzenwert aufweist.

Die Zeile „Arbeitsspeichernutzung“ weist einen Spitzenwert auf, als die Aufgabe ausgeführt wurde
Abbildung 1. Diagramm zur Arbeitsspeichernutzung der gesamten Worker (zum Vergrößern klicken)

Auch wenn die Linie für die Arbeitsspeichernutzung in der Grafik das Limit nicht erreicht, müssen Sie bei der Diagnose der Fehlerursachen nur den zuweisbaren Arbeitsspeicher berücksichtigen. Die Linie Arbeitsspeicherlimit im Diagramm stellt den insgesamt verfügbaren Arbeitsspeicher (einschließlich der von GKE reservierten Kapazität) dar.

In diesem Beispiel ist das Arbeitsspeicherlimit für Worker auf 1, 875 GB festgelegt. GKE reserviert 25% der ersten 4 GiB Arbeitsspeicher. GKE reserviert außerdem einen zusätzlichen Schwellenwert für die Entfernung: 100 MiB Arbeitsspeicher auf jedem Knoten für die Kubelet-Bereinigung.

Der zuweisbare Arbeitsspeicher wird so berechnet:

ALLOCATABLE = CAPACITY - RESERVED - EVICTION-THRESHOLD

Wenn das Arbeitsspeicherlimit bei 1,875 GB liegt, beträgt der tatsächlich zuweisbare Arbeitsspeicher:

1.75 GiB (1.875GB) - 0.44 (25% GiB reserved) - 0.1 = 1.21 GiB (~1.3 GB).

Wenn Sie dieses tatsächliche Limit an das Diagramm zur Arbeitsspeichernutzung anhängen, sehen Sie, dass die Spitze der Arbeitsspeichernutzung der Aufgabe das tatsächliche Arbeitsspeicherlimit erreicht. Außerdem können Sie daraus ableiten, dass die Aufgabe aufgrund von zu wenig Worker-Arbeitsspeicher fehlgeschlagen ist.

Erhöhen Sie das Worker-Arbeitsspeicherlimit

Weisen Sie zusätzlichen Worker-Arbeitsspeicher zu, damit der Beispiel-DAG erfolgreich ist:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Suchen Sie die Konfiguration unter Ressourcen > Arbeitslasten und klicken Sie auf Bearbeiten.

  5. Geben Sie im Abschnitt Worker im Feld Arbeitsspeicher das neue Arbeitsspeicherlimit für Airflow-Worker an. In dieser Anleitung werden 3 GB verwendet.

  6. Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Worker neu gestartet werden.

DAG mit dem neuen Arbeitsspeicherlimit testen

Lösen Sie den DAG create_list_with_many_strings noch einmal aus und warten Sie, bis er beendet ist.

  1. In den Ausgabelogs Ihrer DAG-Ausführung wird Marking task as SUCCESS angezeigt und der Status der Aufgabe gibt Erfolgreich an.

  2. Prüfen Sie auf dem Tab Monitoring den Abschnitt Umgebungsübersicht und achten Sie darauf, dass keine roten Bereiche vorhanden sind.

  3. Klicken Sie auf den Abschnitt Worker und suchen Sie das Diagramm Gesamtarbeitsspeichernutzung der Worker. Sie werden feststellen, dass die Zeile Arbeitsspeicherlimit die Änderung des Arbeitsspeicherlimits widerspiegelt und die Zeile Arbeitsspeichernutzung weit unter dem tatsächlich zuweisbaren Arbeitsspeicherlimit liegt.

Beispiel: Probleme aufgrund von unzureichendem Speicherplatz diagnostizieren

In diesem Schritt laden Sie zwei DAGs hoch, die große Dateien erstellen. Der erste DAG erstellt eine große Datei. Der zweite DAG erstellt eine große Datei und imitiert einen lang andauernden Vorgang.

Die Größe der Datei in beiden DAGs überschreitet das standardmäßige Speicherlimit von Airflow-Worker von 1 GB. Der zweite DAG hat jedoch eine zusätzliche Warteaufgabe, um die Dauer künstlich zu verlängern.

In den nächsten Schritten untersuchen Sie das unterschiedliche Verhalten beider DAGs.

DAG hochladen, mit dem eine große Datei erstellt wird

Laden Sie den folgenden Beispiel-DAG in die Umgebung hoch, die Sie in den vorherigen Schritten erstellt haben. In dieser Anleitung heißt dieser DAG create_large_txt_file_print_logs.

Dieser DAG enthält eine Aufgabe, die die folgenden Schritte ausführt:

  1. Schreibt eine localfile.txt-Datei mit 1,5 GB in den Airflow-Worker-Speicher.
  2. Gibt die Größe der erstellten Datei mithilfe des Python-Moduls os aus.
  3. Gibt die Dauer der DAG-Ausführung jede Minute aus.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

DAG hochladen, mit dem in einem Vorgang mit langer Ausführungszeit eine große Datei erstellt wird

Laden Sie den zweiten Beispiel-DAG in Ihre Umgebung hoch, um einen lang andauernden DAG zu imitieren und die Auswirkungen der Aufgabendauer auf den Endzustand zu untersuchen. In dieser Anleitung heißt dieser DAG long_running_create_large_txt_file_print_logs.

Dieser DAG enthält eine Aufgabe, die die folgenden Schritte ausführt:

  1. Schreibt eine localfile.txt-Datei mit 1,5 GB in den Airflow-Worker-Speicher.
  2. Gibt die Größe der erstellten Datei mithilfe des Python-Moduls os aus.
  3. Wartet 1 Stunde und 15 Minuten, um einige Zeit nachzuahmen, die für Vorgänge mit der Datei benötigt wird, z. B. das Lesen aus der Datei.
  4. Gibt die Dauer der DAG-Ausführung jede Minute aus.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(
    'long_running_create_large_txt_file_print_logs',
    default_args=default_args,
    schedule_interval=None)


def consume():
    size = 1000**2  # bytes in 1 MB
    amount = 100

    def create_file():
        print(f"Start creating a huge file")
        with open("localfile.txt", "ab") as f:
            for j in range(15):
                f.write(os.urandom(amount) * size)
        print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")

    create_file()
    for k in range(75):
        time.sleep(60)
        print(f"{k+1} minute")

    print("Success!")


t1 = PythonOperator(
    task_id='create_huge_file',
    python_callable=consume,
    dag=dag,
    depends_on_past=False,
    retries=0)

Beispiel-DAGs auslösen

Lösen Sie den ersten DAG create_large_txt_file_print_logs aus:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.

  4. Klickbasierter Trigger

  5. Klicken Sie auf der Seite DAGs auf die Aufgabe, die Sie ausgelöst haben, und überprüfen Sie die Ausgabelogs, um sicherzustellen, dass der DAG gestartet wurde.

  6. Warten Sie, bis die Aufgabe abgeschlossen ist, die Sie mit dem DAG create_large_txt_file_print_logs erstellt haben. Dies kann einige Minuten dauern.

  7. Klicken Sie auf der Seite DAGs auf die DAG-Ausführung. Sie sehen, dass die Aufgabe den Status Success hat, obwohl das Speicherlimit überschritten wurde.

Prüfen Sie die Airflow-Logs für die Aufgabe:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

    1. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

    2. Wechseln Sie zum Tab Logs und dann zu Alle Logs > Airflow-Logs > Worker > In Log-Explorer ansehen.

    3. Logs nach Typ filtern: Es werden nur Error-Meldungen angezeigt.

Die Logs enthalten Nachrichten wie diese:

Worker: warm shutdown (Main Process)

oder

A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

Diese Logs zeigen, dass der Pod den Prozess "Warm Herunterfahren" gestartet hat, da der verwendete Speicher das Limit überschritten und in 1 Stunde entfernt wurde. Die DAG-Ausführung ist jedoch nicht fehlgeschlagen, da sie innerhalb des Kulanzzeitraums für die Beendigung von Kubernetes abgeschlossen wurde, der in dieser Anleitung genauer erläutert wird.

Das Konzept des Kulanzzeitraums für die Beendigung wird am Ergebnis des zweiten Beispiel-DAG long_running_create_large_txt_file_print_logs veranschaulicht.

Lösen Sie den zweiten DAG long_running_create_large_txt_file_print_logs aus:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Spalte Airflow-Webserver auf den Link Airflow für Ihre Umgebung.

  3. Klicken Sie in der Airflow-Weboberfläche auf der Seite DAGs in der Spalte Links für Ihren DAG auf die Schaltfläche Trigger DAG.

  4. Klickbasierter Trigger

  5. Klicken Sie auf der Seite DAGs auf die Aufgabe, die Sie ausgelöst haben, und überprüfen Sie die Ausgabelogs, um sicherzustellen, dass der DAG gestartet wurde.

  6. Warten Sie, bis die DAG-Ausführung long_running_create_large_txt_file_print_logs fehlschlägt. Das dauert etwa eine Stunde.

Überprüfen Sie die Ergebnisse der DAG-Ausführung:

  1. Klicken Sie auf der Seite DAGs auf die DAG-Ausführung long_running_create_large_txt_file_print_logs. Sie sehen, dass die Aufgabe den Status Failed hat und dass die Ausführungsdauer genau 1 Stunde und 5 Minuten betrug, was weniger als die Wartezeit der Aufgabe von 1 Stunde und 15 Minuten ist.

  2. Prüfen Sie die Logs der Aufgabe. Nachdem der DAG die Datei localfile.txt im Container des Airflow-Workers erstellt hat, gibt das Log aus, dass der DAG wartet, und die Ausführungsdauer wird jede Minute in den Aufgabenlogs ausgegeben. In diesem Beispiel gibt der DAG das Log localfile.txt size: aus und die Größe der Datei localfile.txt beträgt 1, 5 GB.

Sobald die in den Container des Airflow-Workers geschriebene Datei das Speicherlimit überschreitet, sollte die DAG-Ausführung fehlschlagen. Die Aufgabe schlägt jedoch nicht sofort fehl und wird weiter ausgeführt, bis ihre Dauer 1 Stunde und 5 Minuten erreicht ist. Dies liegt daran, dass Kubernetes die Aufgabe nicht sofort beendet und weiter ausgeführt wird, um eine Stunde für die Wiederherstellung zu ermöglichen. Dies wird als Kulanzzeitraum für die Beendigung bezeichnet. Wenn ein Knoten keine Ressourcen mehr hat, beendet Kubernetes den Pod nicht sofort, um die Beendigung ordnungsgemäß zu verarbeiten, sodass die Auswirkungen auf den Endnutzer minimal sind.

Der Kulanzzeitraum für die Beendigung hilft Nutzern, Dateien nach Aufgabenfehlern wiederherzustellen. Dies kann jedoch bei der Diagnose von DAGs zu Verwirrung führen. Wenn das Speicherlimit des Airflow-Workers überschritten wird, hängt der Status des Endvorgangs von der Dauer der DAG-Ausführung ab:

  • Wenn die DAG-Ausführung das Worker-Speicherlimit überschreitet, aber in weniger als einer Stunde abgeschlossen ist, wird die Aufgabe mit dem Status Success abgeschlossen, da sie innerhalb des Kulanzzeitraums für die Beendigung abgeschlossen wurde. Kubernetes beendet jedoch den Pod und die geschriebene Datei wird sofort aus dem Container gelöscht.

  • Wenn der DAG das Worker-Speicherlimit überschreitet und länger als eine Stunde ausgeführt wird, wird der DAG noch eine Stunde lang ausgeführt. Er kann das Speicherlimit um Tausende Prozent überschreiten, bevor Kubernetes den Pod entfernt und Airflow die Aufgabe als Failed markiert.

Fehlgeschlagenen DAG diagnostizieren

Wenn Sie zum Zeitpunkt des Fehlers mehrere Aufgaben ausgeführt haben, sollten Sie nur eine Aufgabe ausführen und den Ressourcendruck während dieser Zeit diagnostizieren, um festzustellen, welche Aufgaben zu einem Ressourcendruck führen und welche Ressourcen erhöht werden müssen.

Überprüfen Sie die Aufgabenlogs des zweiten DAG long_running_create_large_txt_file_print_logs:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Wechseln Sie zum Tab Logs und dann zu Alle Logs > Airflow-Logs > Worker > In Log-Explorer ansehen.

  4. Logs nach Typ filtern: Es werden nur Error-Meldungen angezeigt.

Die Logs enthalten Nachrichten wie diese:

Container storage usage of worker reached 155.7% of the limit.

This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.

You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.

oder

Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.

This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.

Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.

Diese Meldungen zeigen an, dass Airflow-Logs im Laufe der Aufgabe mit der Ausgabe von Fehlern begonnen haben, wenn die Größe der von Ihrem DAG generierten Dateien das Worker-Speicherlimit überschritten hat und der Kulanzzeitraum für die Beendigung begonnen hat. Während des Kulanzzeitraums für die Beendigung stieg die Speichernutzung nicht wieder auf das Limit, was dazu führte, dass der Pod nach Ablauf des Kulanzzeitraums für die Beendigung entfernt wurde.

Prüfen Sie das Monitoring des Umgebungsstatus und des Ressourcenverbrauchs:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Wechseln Sie zum Tab Monitoring und wählen Sie Übersicht aus.

  4. Suchen Sie im Bereich Umgebungsübersicht nach dem Diagramm Umgebungszustand (Airflow-Monitoring-DAG). Er enthält einen roten Bereich, der der Zeit entspricht, zu der der Fehlerdruck von Logs begann.

  5. Wählen Sie Worker aus und suchen Sie das Diagramm Gesamte Laufwerksnutzung der Worker. Achten Sie darauf, dass die Zeile Laufwerksnutzung eine Spitze hat und die Zeile Laufwerkslimit bei der Ausführung Ihrer Aufgabe überschreitet.

Die Zeile für die Laufwerksnutzung hat eine Spitze und überschreitet die Linie für das Laufwerkslimit bei der Ausführung Ihrer Aufgabe
Abbildung 2. Diagramm zur gesamten Laufwerksnutzung der Worker (zum Vergrößern klicken)

Worker-Speicherlimit erhöhen

Weisen Sie zusätzlichen Airflow-Worker-Speicher zu, damit der Beispiel-DAG erfolgreich ist:

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen“

  2. Klicken Sie in der Liste der Umgebungen auf den Namen Ihrer Umgebung. Die Seite Umgebungsdetails wird geöffnet.

  3. Rufen Sie den Tab Umgebungskonfiguration auf.

  4. Suchen Sie die Konfiguration unter Ressourcen > Arbeitslasten und klicken Sie auf Bearbeiten.

  5. Geben Sie im Abschnitt Worker im Feld Speicher das neue Speicherlimit für Airflow-Worker an. Legen Sie dafür in dieser Anleitung 2 GB fest.

  6. Speichern Sie die Änderungen und warten Sie einige Minuten, bis die Airflow-Worker neu gestartet werden.

DAG mit dem neuen Speicherlimit testen

Lösen Sie den DAG long_running_create_large_txt_file_print_logs noch einmal aus und warten Sie 1 Stunde und 15 Minuten, bis er abgeschlossen ist.

  1. In den Ausgabelogs Ihrer DAG-Ausführung wird Marking task as SUCCESS angezeigt. Der Status der Aufgabe gibt Erfolgreich mit einer Dauer von 1 Stunde und 15 Minuten an, was der im DAG-Code festgelegten Wartezeit entspricht.

  2. Prüfen Sie auf dem Tab Monitoring den Abschnitt Umgebungsübersicht und achten Sie darauf, dass keine roten Bereiche vorhanden sind.

  3. Klicken Sie auf den Abschnitt Worker und suchen Sie das Diagramm Laufwerksnutzung für Worker insgesamt. Sie werden feststellen, dass die Zeile Laufwerkslimit die Änderung des Speicherlimits widerspiegelt und die Zeile Laufwerknutzung innerhalb des zulässigen Bereichs liegt.

Zusammenfassung

In dieser Anleitung haben Sie den Grund für einen DAG-Fehler ermittelt und den Ressourcentyp, der die Belastung verursacht, identifiziert. Dazu haben Sie zwei Beispiel-DAGs behoben, die aufgrund von fehlenden Worker-Arbeitsspeichern und -Speicher fehlschlagen. Nachdem Sie den Workern mehr Arbeitsspeicher und Speicher zugewiesen haben, haben Sie die DAGs erfolgreich ausgeführt. Es wird jedoch empfohlen, Ihre DAGs (Workflows) zu optimieren, um die Worker-Ressourcennutzung von vornherein zu reduzieren, da es nicht möglich ist, die Ressourcen über einen bestimmten Grenzwert zu erhöhen.

Bereinigen

Damit Ihrem Google Cloud-Konto die in dieser Anleitung verwendeten Ressourcen nicht in Rechnung gestellt werden, löschen Sie entweder das Projekt, das die Ressourcen enthält, oder behalten Sie das Projekt bei und löschen Sie die einzelnen Ressourcen.

Projekt löschen

  1. Wechseln Sie in der Google Cloud Console zur Seite Ressourcen verwalten.

    Zur Seite „Ressourcen verwalten“

  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie dann auf Löschen.
  3. Geben Sie im Dialogfeld die Projekt-ID ein und klicken Sie auf Shut down (Beenden), um das Projekt zu löschen.

Einzelne Ressourcen löschen

Wenn Sie mehrere Anleitungen und Kurzanleitungen durcharbeiten möchten, können Sie die Überschreitung von Projektkontingenten verhindern, indem Sie Projekte wiederverwenden.

Löschen Sie die Cloud Composer-Umgebung. Während dieses Vorgangs löschen Sie auch den Bucket der Umgebung.

Nächste Schritte