Fehlerbehebung bei Dataflow-Fehlern aufgrund von Speichermangel

Auf dieser Seite wird beschrieben, wie Sie Fehler aufgrund fehlenden Speichers in Dataflow ermitteln und beheben.

Speichermangelfehler finden

Verwenden Sie eine der folgenden Methoden, um festzustellen, ob Ihrer Pipeline der Speicher ausgeht.

  • Rufen Sie auf der Seite Jobdetails im Bereich Protokolle den Tab Diagnose auf. Auf diesem Tab werden Fehler im Zusammenhang mit Speicherproblemen und die Häufigkeit des Auftretens dieser Fehler angezeigt.
  • Verwenden Sie in der Dataflow-Monitoring-Oberfläche das Diagramm Speicherauslastung, um die Kapazität und Nutzung des Workers zu überwachen.
  • Wählen Sie auf der Seite Jobdetails im Bereich Protokolle die Option Worker-Protokolle aus, um Arbeitsspeicherfehler in den Worker-Protokollen zu finden.
  • Fehler aufgrund fehlenden Arbeitsspeichers können auch in Systemprotokollen auftreten. Rufen Sie dazu den Log-Explorer auf und verwenden Sie die folgende Abfrage:

    resource.type="dataflow_step"
    resource.labels.job_id="JOB_ID"
    "out of memory" OR "OutOfMemory" OR "Shutting down JVM"
    

    Ersetzen Sie JOB_ID durch die ID Ihrer Jobs.

  • Für Java-Jobs meldet der Java-Speichermonitor regelmäßig Messwerte für die automatische Speicherbereinigung. Wenn der Anteil der CPU-Zeit, der für die automatische Speicherbereinigung genutzt wird, einen Grenzwert von 50% über einen längeren Zeitraum hinweg überschreitet, schlägt das SDK-Nutzung fehl. Möglicherweise wird ein Fehler wie der folgende angezeigt:

    Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...
    

    Dieser Fehler kann auftreten, wenn der physische Speicher noch verfügbar ist. Er weist in der Regel darauf hin, dass die Arbeitsspeichernutzung der Pipeline ineffizient ist. Optimieren Sie Ihre Pipeline, um dieses Problem zu beheben.

    Der Java-Speichermonitor wird über die MemoryMonitorOptions-Oberfläche konfiguriert.

Wenn Ihr Job eine hohe Speichernutzung oder Fehler aufgrund fehlenden Speichers hat, folgen Sie den Empfehlungen auf dieser Seite, um die Speichernutzung zu optimieren oder die verfügbare Speichermenge zu erhöhen.

Nicht genügend Arbeitsspeicherfehler beheben

Änderungen an der Dataflow-Pipeline können Probleme mit fehlendem Arbeitsspeicher auflösen oder die Arbeitsspeichernutzung reduzieren. Mögliche Änderungen beinhalten die folgenden Aktionen:

Das folgende Diagramm zeigt den Workflow zur Dataflow-Fehlerbehebung, der auf dieser Seite beschrieben wird.

Diagramm, das den Workflow zur Fehlerbehebung zeigt.

Sie können die folgenden Maßnahmen ergreifen:

  • Optimieren Sie nach Möglichkeit Ihre Pipeline, um die Speichernutzung zu reduzieren.
  • Wenn es sich um einen Batchjob handelt, versuchen Sie Folgendes:
    1. Verwenden Sie einen Maschinentyp mit mehr Arbeitsspeicher pro vCPU.
    2. Reduzieren Sie die Anzahl der Threads auf weniger als die Anzahl der vCPUs pro Worker.
    3. Verwenden Sie einen benutzerdefinierten Maschinentyp mit mehr Arbeitsspeicher pro vCPU.
  • Wenn es sich um einen Streamingjob handelt, der Python verwendet, reduzieren Sie die Anzahl der Threads auf weniger als 12.
  • Wenn es sich um einen Streaming-Job handelt, der Java oder Go verwendet, versuchen Sie Folgendes:
    1. Reduzieren Sie die Anzahl der Threads auf weniger als 500 für Runner v2-Jobs oder auf weniger als 300 für Jobs, für die Runner v2 nicht verwendet wird.
    2. Verwenden Sie einen Maschinentyp mit mehr Arbeitsspeicher.

Pipeline optimieren

Mehrere Pipelinevorgänge können zu Speicherfehlern führen. Dieser Abschnitt bietet Optionen zur Reduzierung der Speichernutzung Ihrer Pipeline. Verwenden Sie Cloud Profiler, um die Pipelineleistung zu überwachen und um die Pipelinephasen zu identifizieren, die den meisten Arbeitsspeicher belegen.

Mit den folgenden Best Practices können Sie Ihre Pipeline optimieren:

Integrierte E/A-Connectors von Apache Beam zum Lesen von Dateien verwenden

Öffnen Sie keine großen Dateien in einer DoFn. Verwenden Sie zum Lesen von Dateien integrierte E/A-Connectors von Apache Beam. Dateien, die in einer DoFn geöffnet werden, müssen in den Arbeitsspeicher passen. Da mehrere DoFn-Instanzen gleichzeitig ausgeführt werden, können große Dateien, die in DoFn geöffnet sind, zu wenig Arbeitsspeicher führen.

Vorgänge bei der Verwendung von GroupByKey PTransforms neu gestalten

Wenn Sie eine GroupByKey PTransform in Dataflow verwenden, werden die resultierenden Werte pro Schlüssel und pro Fenster in einem einzigen Thread verarbeitet. Da diese Daten als Stream vom Dataflow-Back-End-Dienst an die Worker übergeben werden, müssen sie nicht in den Worker-Arbeitsspeicher passen. Sind die Werte jedoch im Arbeitsspeicher gesammelt, kann die Verarbeitungslogik Fehler aufgrund fehlenden Speichers verursachen.

Wenn Sie beispielsweise einen Schlüssel haben, der Daten für ein Fenster enthält, und Sie die Schlüsselwerte zu einem speicherinternen Objekt wie einer Liste hinzufügen, kann es zu Fehlern aufgrund von fehlendem Speicherplatz kommen. In diesem Szenario verfügt der Worker möglicherweise nicht über genügend Arbeitsspeicherkapazität, um alle Objekte zu enthalten.

Weitere Informationen zu GroupByKey PTransforms finden Sie in der Apache Beam-Dokumentation zu Python GroupByKey und Java GroupByKey.

Die folgende Liste enthält Vorschläge zum Entwerfen Ihrer Pipeline, um den Speicherverbrauch bei Verwendung von GroupByKey PTransforms zu minimieren.

  • Um die Datenmenge pro Schlüssel und pro Fenster zu reduzieren, sollten Sie Schlüssel mit vielen Werten vermeiden, die auch als „heiße“ Schlüssel bezeichnet werden.
  • Wenn Sie die Menge der pro Fenster erfassten Daten reduzieren möchten, verwenden Sie eine kleinere Fenstergröße.
  • Wenn Sie Werte eines Schlüssels in einem Fenster verwenden, um eine Zahl zu berechnen, verwenden Sie eine Combine-Transformation. Führen Sie die Berechnung nicht in einer einzelnen DoFn-Instanz durch, nachdem Sie die Werte erfasst haben.
  • Filtern Sie Werte oder Duplikate vor der Verarbeitung. Weitere Informationen finden Sie in der Transformationsdokumentation zu Python Filter und Java Filter.

Eingehende Daten aus externen Quellen reduzieren

Wenn Sie eine externe API oder Datenbank zur Datenanreicherung aufrufen, müssen die zurückgegebenen Daten in den Arbeitsspeicher passen. Wenn Sie Batch-Aufrufe verwenden, wird die Verwendung einer GroupIntoBatches-Transformation empfohlen. Wenn Fehler aufgrund fehlenden Speichers auftreten, verringern Sie die Batchgröße. Weitere Informationen zum Gruppieren in Batches finden Sie in der Transformationsdokumentation zu Python GroupIntoBatches und Java GroupIntoBatches.

Objekte in Threads freigeben

Das Freigeben eines speicherinternen Datenobjekts auf DoFn-Instanzen kann den Speicherplatz und die Zugriffseffizienz verbessern. Datenobjekte, die in einer beliebigen DoFn-Methode erstellt wurden, einschließlich Setup, StartBundle, Process, FinishBundle und Teardown, werden für jede DoFn aufgerufen. In Dataflow kann jeder Worker mehrere DoFn-Instanzen haben. Für eine effizientere Speichernutzung übergeben Sie ein Datenobjekt als Singleton, um es für mehrere DoFns freizugeben. Weitere Informationen finden Sie im Blogpost Cache-Wiederverwendung in DoFns.

Speichereffiziente Elementdarstellungen verwenden

Prüfen Sie, ob Sie Darstellungen für PCollection-Elemente verwenden können, die weniger Arbeitsspeicher verwenden. Wenn Sie Codierer in Ihrer Pipeline verwenden, sollten Sie nicht nur codierte, sondern auch decodierte PCollection-Elementdarstellungen berücksichtigen. Sparse-Matrizen können häufig von dieser Art von Optimierung profitieren.

Größe der Nebeneingaben verringern

Wenn Ihre DoFns Nebeneingaben verwenden, reduzieren Sie die Größe der Nebeneingabe. Für Nebeneingaben, bei denen es sich um Sammlungen von Elementen handelt, sollten Sie iterierbare Ansichten wie AsIterable oder AsMultimap anstelle von Ansichten verwenden, die die gesamte Nebeneingabe auf einmal materialisieren, wie AsList.

Reduzieren Sie die Anzahl der Threads.

Sie können den pro Thread verfügbaren Arbeitsspeicher erhöhen, indem Sie die maximale Anzahl der Threads reduzieren, die DoFn-Instanzen ausführen. Dadurch wird die Parallelität reduziert, aber für jede DoFn steht mehr Arbeitsspeicher zur Verfügung.

In der folgenden Tabelle sehen Sie die Standardanzahl der Threads, die von Dataflow erstellt werden:

Jobtyp Python SDK Java/Go SDKs
Batch 1 Thread pro vCPU 1 Thread pro vCPU
Streaming mit Runner v2 12 Threads pro vCPU 500 Threads pro Worker-VM
Streaming ohne Runner v2 12 Threads pro vCPU 300 Threads pro Worker-VM

Um die Anzahl der Apache Beam SDK-Threads zu reduzieren, legen Sie die folgende Pipelineoption fest:

Verwenden Sie die Pipelineoption --numberOfWorkerHarnessThreads.

Verwenden Sie die Pipelineoption --number_of_worker_harness_threads.

Verwenden Sie die Pipelineoption --number_of_worker_harness_threads.

Legen Sie für Batchjobs einen Wert fest, der kleiner als die Anzahl der vCPUs ist.

Reduzieren Sie den Wert für Streamingjobs zuerst auf die Hälfte des Standardwerts. Wenn das Problem dadurch nicht behoben wird, reduzieren Sie den Wert weiter um die Hälfte und beobachten Sie die Ergebnisse bei jedem Schritt. Wenn Sie beispielsweise Python verwenden, versuchen Sie es mit den Werten 6, 3 und 1.

Maschinentyp mit mehr Arbeitsspeicher pro vCPU verwenden.

Verwenden Sie eine der folgenden Methoden, um einen Worker mit mehr Arbeitsspeicher pro vCPU auszuwählen.

  • Verwenden Sie einen Maschinentyp mit großem Arbeitsspeicher in der Maschinenfamilie für allgemeine Zwecke. Maschinentypen mit großem Speicher haben einen höheren Arbeitsspeicher pro vCPU als Standardmaschinentypen. Bei einem Maschinentyp mit großem Speicher wird der für jeden Worker verfügbare Arbeitsspeicher erhöht und der pro Thread verfügbare Speicher, da die Anzahl der vCPUs gleich bleibt. Daher kann die Verwendung eines Maschinentyps mit großem Speicher eine kostengünstige Möglichkeit sein, einen Worker mit mehr Arbeitsspeicher pro vCPU auszuwählen.
  • Für mehr Flexibilität bei der Angabe der Anzahl der vCPUs und der Größe des Arbeitsspeichers können Sie Folgendes verwenden: Sie können einen benutzerdefinierten Maschinentyp verwenden. Mit benutzerdefinierten Maschinentypen können Sie den Speicher um 256-MB-Schritte erhöhen. Diese Maschinentypen unterscheiden sich preislich von den standardmäßigen Maschinentypen.
  • Bei einigen Maschinenfamilien können Sie Erweiterter Arbeits-Speicher-benutzerdefinierten Maschinentypen verwenden. Erweiterter Arbeitsspeicher ermöglicht ein höheres Arbeitsspeicher-zu-vCPU-Verhältnis. Die Kosten sind höher.

Verwenden Sie die folgende Pipelineoption, um Worker-Typen festzulegen. Weitere Informationen finden Sie unter Pipelineoptionen festlegen und Pipelineoptionen.

Verwenden Sie die Pipelineoption --workerMachineType.

Verwenden Sie die Pipelineoption --machine_type.

Verwenden Sie die Pipelineoption --worker_machine_type.

Dataflow-Speichernutzung

Zur Behebung von Fehlern bei unzureichendem Speicher ist es hilfreich zu verstehen, wie Dataflow-Pipelines Speicher verwenden.

Wenn Dataflow eine Pipeline ausführt, wird die Verarbeitung auf mehrere Compute Engine-VMs (VMs) verteilt, die häufig als Worker bezeichnet werden. Worker verarbeiten Arbeitselemente aus dem Dataflow-Dienst und delegieren die Arbeitselemente an Apache Beam SDK-Prozesse. Ein Apache Beam SDK-Prozess erstellt Instanzen von DoFns. DoFn ist eine Apache Beam SDK-Klasse, die eine verteilte Verarbeitungsfunktion definiert.

Dataflow startet mehrere Threads auf jedem Worker und der Arbeitsspeicher eines jeden Workers ist für alle Threads freigegeben. Ein Thread ist eine einzelne ausführbare Aufgabe, die in einem größeren Prozess ausgeführt wird. Die Standardanzahl der Threads hängt von mehreren Faktoren ab und variiert zwischen Batch- und Streamingjobs.

Wenn Ihre Pipeline mehr Arbeitsspeicher benötigt, als standardmäßig auf den Workern verfügbar ist, kann es zu Fehlern kommen, wenn der Speicher nicht ausreicht.

Dataflow-Pipelines verwenden primär den Worker-Arbeitsspeicher auf drei Arten:

Operativer Worker-Speicher

Dataflow-Worker benötigen Arbeitsspeicher für ihre Betriebssysteme und System-Prozesse. Die Worker-Speichernutzung ist in der Regel nicht größer als 1 GB. Die Nutzung beträgt normalerweise weniger als 1 GB.

  • Verschiedene Prozesse auf dem Worker nutzen Arbeitsspeicher, um dafür zu sorgen, dass Ihre Pipeline ordnungsgemäß funktioniert. Für jeden dieser Prozesse ist möglicherweise ein wenig Arbeitsspeicher für den Vorgang reserviert.
  • Wenn Ihre Pipeline keine Streaming Engine verwendet, nutzen zusätzliche Worker-Prozesse Speicher.

SDK-Prozessspeicher

Apache Beam SDK-Prozesse können Objekte und Daten erstellen, die von Threads innerhalb des Prozesses gemeinsam genutzt werden. Diese Prozesse werden auf dieser Seite als freigegebene SDK-Objekte und -Daten bezeichnet. Die Arbeitsspeichernutzung aus diesen freigegebenen SDK-Objekten und -Daten wird als SDK-Prozessspeicher bezeichnet. Die folgende Liste enthält Beispiele für freigegebene SDK-Objekte und -Daten:

  • Nebeneingaben
  • Modelle für maschinelles Lernen
  • In-Memory-Singleton-Objekte
  • Python-Objekte, die mit dem apache_beam.utils.shared-Modul erstellt wurden
  • Daten, die aus externen Quellen wie Cloud Storage oder BigQuery geladen wurden

Streamingjobs, die keine Streaming Engine-Nebeneingaben im Speicher verwenden. Bei Java- und Go-Pipelines hat jeder Worker eine Kopie der Nebeneingabe. Bei Python-Pipelines enthält jeder Apache Beam SDK-Prozess eine Kopie der Nebeneingabe.

Streamingjobs, die Streaming Engine verwenden, haben eine Größenbeschränkung von 80 MB für Nebeneingaben. Nebeneingaben werden außerhalb des Worker-Arbeitsspeichers gespeichert.

Die Speichernutzung von freigegebenen SDK-Objekten und -Daten steigt linear mit der Anzahl der Apache Beam SDK-Prozesse. In Java- und Go-Pipelines wird ein Apache Beam SDK-Prozess gestartet pro Worker. In Python-Pipelines wird ein Apache Beam SDK-Prozess pro vCPU gestartet. Freigegebene Objekte und Daten des SDK werden von Threads innerhalb desselben Apache Beam SDK-Prozesses wiederverwendet.

DoFn Arbeitsspeichernutzung

DoFn ist eine Apache Beam SDK-Klasse, die eine verteilte Verarbeitungsfunktion definiert. Jeder Worker kann gleichzeitige DoFn-Instanzen ausführen. In jedem Thread wird eine DoFn-Instanz ausgeführt. Bei der Bewertung der Gesamtspeichernutzung, der Berechnung der Arbeitssatzgröße oder der Größe des Arbeitsspeichers, die eine Anwendung weiterhin benötigt, kann es hilfreich sein. Wenn beispielsweise ein einzelner DoFn maximal 5 MB Arbeitsspeicher verwendet und ein Worker 300 Threads hat, könnte die DoFn-Speicherauslastung einen Spitzenwert von 1,5 GB erreichen, also die Anzahl der Speicherbytes multipliziert mit der Anzahl der Threads. Je nachdem, wie die Worker den Arbeitsspeicher nutzen, kann ein Anstieg der Speichernutzung dazu führen, dass den Workern der Speicher ausgeht.

Es lässt sich nur schwer schätzen, wie viele Instanzen von DoFn von Dataflow erstellt wird. Die Anzahl hängt von verschiedenen Faktoren ab, z. B. vom SDK, dem Maschinentyp usw. Darüber hinaus kann das DoFn von mehreren Threads hintereinander verwendet werden. Der Dataflow-Dienst garantiert weder, wie oft ein DoFn aufgerufen wird, noch garantiert er die genaue Anzahl der DoFn-Instanzen, die im Laufe einer Pipeline erstellt werden. Die folgende Tabelle gibt jedoch einen Einblick in die Ebene der Parallelität, die Sie erwarten können, und schätzt eine Obergrenze für die Anzahl der DoFn-Instanzen.

Batch Streaming ohne Streaming Engine Streaming Engine
Parallelität 1 Prozess pro vCPU

1 Thread pro Prozess

1 Thread pro vCPU

1 Prozess pro vCPU

12 Threads pro Prozess

12 Threads pro vCPU

1 Prozess pro vCPU

12 Threads pro Prozess

12 Threads pro vCPU

Maximale Anzahl gleichzeitiger DoFn-Instanzen (Alle diese Zahlen können sich jederzeit ändern.) 1 DoFn pro Thread

1 DoFn pro vCPU

1 DoFn pro Thread

12 DoFn pro vCPU

1 DoFn pro Thread

12 DoFn pro vCPU

Batch Streaming Appliance und Streaming Engine ohne Runner v2 Streaming Engine mit Runner v2
Parallelität 1 Prozess pro Worker-VM

1 Thread pro vCPU

1 Prozess pro Worker-VM

300 Threads pro Prozess

300 Threads pro Worker-VM

1 Prozess pro Worker-VM

500 Threads pro Prozess

500 Threads pro Worker-VM

Maximale Anzahl gleichzeitiger DoFn-Instanzen (Alle diese Zahlen können sich jederzeit ändern.) 1 DoFn pro Thread

1 DoFn pro vCPU

1 DoFn pro Thread

300 DoFn pro Worker-VM

1 DoFn pro Thread

500 DoFn pro Worker-VM

Wenn Sie beispielsweise das Python SDK mit einem n1-standard-2-Dataflow-Worker verwenden, gilt Folgendes:

  • Batchjobs: Dataflow startet einen Prozess pro vCPU (in diesem Fall zwei). Jeder Prozess verwendet einen Thread und jeder Thread erstellt eine DoFn-Instanz.
  • Streamingjobs mit Streaming Engine: Dataflow startet einen Prozess pro vCPU (insgesamt zwei). Jeder Prozess kann jedoch bis zu 12 Threads mit jeweils einer eigenen DoFn-Instanz erstellen.

Wenn Sie komplexe Pipelines entwerfen, ist es wichtig, den DoFn-Lebenszyklus zu verstehen. Ihre DoFn-Funktionen müssen serialisierbar sein und das Elementargument darf nicht direkt darin geändert werden.

Wenn Sie eine mehrsprachige Pipeline haben und mehr als ein Apache Beam SDK auf dem Worker ausgeführt wird, verwendet der Worker einen möglichst niedrigen Grad an Thread-pro-Prozess-Parallelität.

Unterschiede zwischen Java, Go und Python

Prozesse und Arbeitsspeicher werden in Java, Go und Python unterschiedlich verwaltet. Die Vorgehensweise bei der Behebung von Fehlern, bei denen der Speicher nicht ausreicht, hängt daher davon ab, ob Ihre Pipeline Java, Go oder Python verwendet.

Java- und Go-Pipelines

In Java- und Go-Pipelines:

  • Jeder Worker startet einen Apache Beam SDK-Prozess.
  • Freigegebene SDK-Objekte und -Daten, wie Nebeneingaben und Caches, werden unter allen Threads auf dem Worker freigegeben.
  • Der von freigegebenen SDK-Objekten und -Daten verwendete Arbeitsspeicher wird normalerweise nicht basierend auf der Anzahl der vCPUs auf dem Worker skaliert.

Python-Pipelines

In Python-Pipelines:

  • Jeder Worker startet einen Apache Beam SDK-Prozess pro vCPU.
  • Freigegebene Objekte und Daten des SDK, wie Nebeneingaben und Caches, werden von allen Threads in jedem Apache Beam SDK-Prozess gemeinsam genutzt.
  • Die Gesamtzahl der Threads auf dem Worker wird linear anhand der Anzahl der vCPUs skaliert. Folglich wächst der von gemeinsam genutzten SDK-Objekten und -Daten verwendete Arbeitsspeicher linear mit der Anzahl der vCPUs.
  • Die Threads, die die Arbeit ausführen, werden auf Prozesse verteilt. Neue Arbeitseinheiten werden entweder einem Prozess ohne Arbeitselemente oder dem Prozess mit den wenigsten gerade zugewiesenen Arbeitselementen zugewiesen.