Fehlerbehebung bei Dataflow-Fehlern aufgrund von Speichermangel

Diese Seite enthält Informationen zur Speichernutzung in Dataflow-Pipelines und zu Schritten zum Untersuchen und Beheben von Problemen mit Dataflow-Fehlern aufgrund fehlenden Speichers.

Informationen zur Dataflow-Speichernutzung

Zur Behebung von Fehlern bei unzureichendem Speicher ist es hilfreich zu verstehen, wie Dataflow-Pipelines Speicher verwenden.

Wenn Dataflow eine Pipeline ausführt, wird die Verarbeitung auf mehrere Compute Engine-VMs (VMs) verteilt, die häufig als Worker bezeichnet werden. Worker verarbeiten Arbeitselemente aus dem Dataflow-Dienst und delegieren die Arbeitselemente an Apache Beam SDK-Prozesse. Ein Apache Beam SDK-Prozess erstellt Instanzen von DoFns. DoFn ist eine Apache Beam SDK-Klasse, die eine verteilte Verarbeitungsfunktion definiert.

Dataflow startet mehrere Threads auf jedem Worker und der Arbeitsspeicher jedes Workers wird für alle Threads freigegeben. Ein Thread ist eine einzelne ausführbare Aufgabe, die in einem größeren Prozess ausgeführt wird. Die Standardanzahl der Threads hängt von mehreren Faktoren ab und variiert zwischen Batch- und Streamingjobs.

Wenn Ihre Pipeline mehr Arbeitsspeicher benötigt, als standardmäßig auf den Workern verfügbar ist, kann es zu Fehlern kommen, wenn der Speicher nicht ausreicht.

Dataflow-Pipelines verwenden primär den Worker-Arbeitsspeicher auf drei Arten:

Operativer Worker-Speicher

Dataflow-Worker benötigen Arbeitsspeicher für ihre Betriebssysteme und Systemprozesse. Die Worker-Speichernutzung ist in der Regel nicht größer als 1 GB. Die Nutzung beträgt normalerweise weniger als 1 GB.

  • Verschiedene Prozesse auf dem Worker verwenden den Arbeitsspeicher, um sicherzustellen, dass die Pipeline ordnungsgemäß funktioniert. Für jeden dieser Prozesse ist möglicherweise ein wenig Arbeitsspeicher für den Vorgang reserviert.
  • Wenn Ihre Pipeline keine Streaming Engine verwendet, nutzen zusätzliche Worker-Prozesse Speicher.

SDK-Prozessspeicher

Apache Beam SDK-Prozesse können Objekte und Daten erstellen, die von Threads innerhalb des Prozesses gemeinsam genutzt werden. Diese Prozesse werden auf dieser Seite als freigegebene SDK-Objekte und -Daten bezeichnet. Die Arbeitsspeichernutzung aus diesen freigegebenen SDK-Objekten und -Daten wird als SDK-Prozessspeicher bezeichnet. Die folgende Liste enthält Beispiele für freigegebene SDK-Objekte und -Daten:

  • Nebeneingaben
  • Modelle für maschinelles Lernen
  • In-Memory-Singleton-Objekte
  • Python-Objekte, die mit dem apache_beam.utils.shared-Modul erstellt wurden
  • Daten, die aus externen Quellen wie Cloud Storage oder BigQuery geladen wurden

Streamingjobs, die keine Streaming Engine-Nebeneingaben im Speicher verwenden. Bei Java- und Go-Pipelines hat jeder Worker eine Kopie der Nebeneingabe. Bei Python-Pipelines enthält jeder Apache Beam SDK-Prozess eine Kopie der Nebeneingabe.

Streamingjobs, die Streaming Engine verwenden, haben eine Größenbeschränkung von 80 MB für Nebeneingaben. Nebeneingaben werden außerhalb des Worker-Arbeitsspeichers gespeichert.

Die Speichernutzung von freigegebenen SDK-Objekten und -Daten steigt linear mit der Anzahl der Apache Beam SDK-Prozesse. In Java- und Go-Pipelines wird ein Apache Beam SDK-Prozess pro Worker gestartet. In Python-Pipelines wird ein Apache Beam SDK-Prozess pro vCPU gestartet. Freigegebene Objekte und Daten des SDK werden über Threads im selben Apache Beam SDK-Prozess hinweg wiederverwendet.

DoFn Arbeitsspeichernutzung

DoFn ist eine Apache Beam SDK-Klasse, die eine verteilte Verarbeitungsfunktion definiert. Jeder Worker kann gleichzeitige DoFn-Instanzen ausführen. In jedem Thread wird eine DoFn-Instanz ausgeführt. Bei der Bewertung der Gesamtspeichernutzung, der Berechnung der Arbeitssatzgröße oder der Größe des Arbeitsspeichers, die eine Anwendung weiterhin benötigt, kann es hilfreich sein. Wenn beispielsweise ein einzelner DoFn maximal 5 MB Arbeitsspeicher verwendet und ein Worker 300 Threads hat, könnte die DoFn-Speicherauslastung einen Spitzenwert von 1,5 GB erreichen, also die Anzahl der Speicherbytes multipliziert mit der Anzahl der Threads. Je nachdem, wie die Worker den Arbeitsspeicher nutzen, kann ein Anstieg der Speichernutzung dazu führen, dass den Workern der Speicher ausgeht.

Es ist schwierig zu schätzen, wie viele Instanzen eines DoFn von Dataflows erstellt werden. Die Anzahl hängt von verschiedenen Faktoren ab, z. B. vom SDK, dem Maschinentyp usw. Darüber hinaus kann das DoFn von mehreren Threads nacheinander verwendet werden. Der Dataflow-Dienst garantiert weder, wie oft ein DoFn aufgerufen wird, noch garantiert er die genaue Anzahl der DoFn-Instanzen, die im Laufe einer Pipeline erstellt werden. Die folgende Tabelle gibt jedoch einen Einblick in die Ebene der Parallelität, die Sie erwarten können, und schätzt eine Obergrenze für die Anzahl der DoFn-Instanzen.

Beam Python SDK

Batch Streaming ohne Streaming Engine Streaming Engine
Parallelismus 1 Prozess pro vCPU

1 Thread pro Prozess

1 Thread pro vCPU

1 Prozess pro vCPU

12 Threads pro Prozess

12 Threads pro vCPU

1 Prozess pro vCPU

12 Threads pro Prozess

12 Threads pro vCPU

Maximale Anzahl gleichzeitiger DoFn-Instanzen (Alle diese Zahlen können sich jederzeit ändern.) 1 DoFn pro Thread

1 DoFn pro vCPU

1 DoFn pro Thread

12 DoFn pro vCPU

1 DoFn pro Thread

12 DoFn pro vCPU

Beam Java/Go SDK

Batch Streaming ohne Streaming Engine Streaming Engine
Parallelismus 1 Prozess pro Worker-VM

1 Thread pro vCPU

1 Prozess pro Worker-VM

300 Threads pro Prozess

300 Threads pro Worker-VM

1 Prozess pro Worker-VM

500 Threads pro Prozess

500 Threads pro Worker-VM

Maximale Anzahl gleichzeitiger DoFn-Instanzen (Alle diese Zahlen können sich jederzeit ändern.) 1 DoFn pro Thread

1 DoFn pro vCPU

1 DoFn pro Thread

300 DoFn pro Worker-VM

1 DoFn pro Thread

500 DoFn pro Worker-VM

Wenn Sie eine mehrsprachige Pipeline haben und mehr als ein Apache Beam SDK auf dem Worker ausgeführt wird, verwendet der Worker einen möglichst niedrigen Grad an Thread-pro-Prozess-Parallelität.

Unterschiede zwischen Java, Go und Python

Prozesse und Arbeitsspeicher werden in Java, Go und Python unterschiedlich verwaltet. Die Vorgehensweise bei der Behebung von Fehlern, bei denen der Speicher nicht ausreicht, hängt daher davon ab, ob Ihre Pipeline Java, Go oder Python verwendet.

Java- und Go-Pipelines

In Java- und Go-Pipelines:

  • Jeder Worker startet einen Apache Beam SDK-Prozess.
  • Freigegebene SDK-Objekte und -Daten, wie Nebeneingaben und Caches, werden unter allen Threads auf dem Worker freigegeben.
  • Der von freigegebenen SDK-Objekten und -Daten verwendete Arbeitsspeicher wird normalerweise nicht basierend auf der Anzahl der vCPUs auf dem Worker skaliert.

Python-Pipelines

In Python-Pipelines:

  • Jeder Worker startet einen Apache Beam SDK-Prozess pro vCPU.
  • Freigegebene Objekte und Daten des SDK, wie Nebeneingaben und Caches, werden von allen Threads in jedem Apache Beam SDK-Prozess gemeinsam genutzt.
  • Die Gesamtzahl der Threads auf dem Worker wird linear anhand der Anzahl der vCPUs skaliert. Folglich wächst der von gemeinsam genutzten SDK-Objekten und -Daten verwendete Arbeitsspeicher linear mit der Anzahl der vCPUs.
  • Threads, die die Arbeit ausführen, werden auf Prozesse verteilt. Neue Arbeitseinheiten werden entweder einem Prozess ohne Arbeitselemente oder dem Prozess mit den wenigsten gerade zugewiesenen Arbeitselementen zugewiesen.

Speichermangelfehler finden

Verwenden Sie eine der folgenden Methoden, um festzustellen, ob Ihrer Pipeline der Speicher ausgeht.

Wenn Ihr Job eine hohe Speichernutzung oder Fehler aufgrund fehlenden Speichers hat, folgen Sie den Empfehlungen auf dieser Seite, um die Speichernutzung zu optimieren oder die verfügbare Speichermenge zu erhöhen.

Nicht genügend Arbeitsspeicherfehler beheben

Änderungen an der Dataflow-Pipeline können zu Speicherfehlern führen oder die Arbeitsspeichernutzung reduzieren. Mögliche Änderungen beinhalten die folgenden Aktionen:

Das folgende Diagramm zeigt den Workflow zur Dataflow-Fehlerbehebung, der auf dieser Seite beschrieben wird.

Diagramm, das den Workflow zur Fehlerbehebung zeigt.

Pipeline optimieren

Mehrere Pipelinevorgänge können zu Speicherfehlern führen. Dieser Abschnitt bietet Optionen zur Reduzierung der Speichernutzung Ihrer Pipeline. Verwenden Sie Cloud Profiler, um die Pipelineleistung zu überwachen und um die Pipelinephasen zu identifizieren, die den meisten Arbeitsspeicher belegen.

Mit den folgenden Best Practices können Sie Ihre Pipeline optimieren:

Integrierte E/A-Connectors von Apache Beam zum Lesen von Dateien verwenden

Öffnen Sie keine großen Dateien in einem DoFn. Verwenden Sie zum Lesen von Dateien integrierte E/A-Connectors von Apache Beam. In einem DoFn geöffnete Dateien müssen in den Speicher passen. Da mehrere DoFn-Instanzen gleichzeitig ausgeführt werden, können große Dateien, die in DoFn geöffnet sind, zu wenig Arbeitsspeicher führen.

Vorgänge bei der Verwendung von GroupByKey PTransforms neu gestalten

Wenn Sie eine GroupByKey PTransform in Dataflow verwenden, werden die resultierenden Werte pro Schlüssel und pro Fenster in einem einzigen Thread verarbeitet. Da diese Daten als Stream vom Dataflow-Back-End-Dienst an die Worker übergeben werden, müssen sie nicht in den Worker-Arbeitsspeicher passen. Wenn die Werte jedoch im Speicher erfasst werden, kann die Verarbeitungslogik zu einem Fehler aufgrund fehlenden Arbeitsspeichers führen.

Wenn Sie beispielsweise einen Schlüssel haben, der Daten für ein Fenster enthält, und die Schlüsselwerte einem In-Memory-Objekt wie einer Liste hinzufügen, können Fehler aufgrund fehlenden Arbeitsspeichers auftreten. In diesem Szenario hat der Worker möglicherweise nicht genügend Speicherkapazität, um alle Objekte zu speichern.

Weitere Informationen zu GroupByKey PTransforms finden Sie in der Apache Beam-Dokumentation zu Python GroupByKey und Java GroupByKey.

Die folgende Liste enthält Vorschläge zum Entwerfen Ihrer Pipeline, um den Speicherverbrauch bei Verwendung von GroupByKey PTransforms zu minimieren.

  • Um die Datenmenge pro Schlüssel und pro Fenster zu reduzieren, sollten Sie Schlüssel mit vielen Werten vermeiden, die auch als „heiße“ Schlüssel bezeichnet werden.
  • Verwenden Sie eine kleinere Fenstergröße, um die pro Fenster erfasste Datenmenge zu reduzieren.
  • Wenn Sie Werte eines Schlüssels in einem Fenster zur Berechnung einer Zahl verwenden, nutzen Sie eine Combine-Transformation. Führen Sie die Berechnung nach der Erfassung der Werte nicht in einer einzelnen DoFn-Instanz durch.
  • Filtern Sie Werte oder Duplikate vor der Verarbeitung. Weitere Informationen finden Sie in der Transformationsdokumentation zu Python Filter und Java Filter.

Eingehende Daten aus externen Quellen reduzieren

Wenn Sie zur externen Datenanreicherung eine externe API oder eine Datenbank aufrufen, müssen die zurückgegebenen Daten in den Worker-Arbeitsspeicher passen. Wenn Sie Aufrufe in Batches zusammenfassen, wird die Verwendung einer GroupIntoBatches-Transformation empfohlen. Wenn nicht genügend Arbeitsspeicherfehler auftreten, reduzieren Sie die Batchgröße. Weitere Informationen zum Gruppieren in Batches finden Sie in der Transformationsdokumentation zu Python GroupIntoBatches und Java GroupIntoBatches.

Objekte in Threads freigeben

Das Freigeben eines speicherinternen Datenobjekts auf DoFn-Instanzen kann den Speicherplatz und die Zugriffseffizienz verbessern. Datenobjekte, die in einer beliebigen DoFn-Methode erstellt wurden, einschließlich Setup, StartBundle, Process, FinishBundle und Teardown, werden für jede DoFn aufgerufen. In Dataflow kann jeder Worker mehrere DoFn-Instanzen haben. Für eine effizientere Speichernutzung übergeben Sie ein Datenobjekt als Singleton, um es für mehrere DoFns freizugeben. Weitere Informationen finden Sie im Blogpost Cache-Wiederverwendung in DoFns.

Speichereffiziente Elementdarstellungen verwenden

Prüfen Sie, ob Sie Darstellungen für PCollection-Elemente verwenden können, die weniger Arbeitsspeicher verwenden. Wenn Sie Codierer in Ihrer Pipeline verwenden, sollten Sie nicht nur codierte, sondern auch decodierte PCollection-Elementdarstellungen berücksichtigen. Sparse-Matrizen können häufig von dieser Art von Optimierung profitieren.

Größe der Nebeneingaben verringern

Wenn Ihre DoFns Nebeneingaben verwenden, reduzieren Sie die Größe der Nebeneingabe. Für Nebeneingaben, bei denen es sich um Sammlungen von Elementen handelt, sollten Sie iterierbare Ansichten wie AsIterable oder AsMultimap anstelle von Ansichten verwenden, die die gesamte Nebeneingabe auf einmal materialisieren, wie AsList.

Mehr Arbeitsspeicher verfügbar machen

Wenn Sie den verfügbaren Arbeitsspeicher erhöhen möchten, können Sie die Gesamtmenge des für Worker verfügbaren Arbeitsspeichers erhöhen, ohne den Umfang des pro Thread verfügbaren Arbeitsspeichers zu ändern. Alternativ können Sie die Größe des Arbeitsspeichers erhöhen, der pro Thread verfügbar ist. Wenn Sie den Arbeitsspeicher pro Thread erhöhen, erhöhen Sie auch den Gesamtspeicher auf dem Worker.

Sie können den Arbeitsspeicher pro Thread auf vier Arten erhöhen:

Maschinentyp mit mehr Arbeitsspeicher pro vCPU verwenden.

Verwenden Sie eine der folgenden Methoden, um einen Worker mit mehr Arbeitsspeicher pro vCPU auszuwählen.

  • Verwenden Sie einen Maschinentyp mit großem Arbeitsspeicher in der Maschinenfamilie für allgemeine Zwecke. Maschinentypen mit großem Speicher haben einen höheren Arbeitsspeicher pro vCPU als Standardmaschinentypen. Bei einem Maschinentyp mit großem Speicher wird der für jeden Worker verfügbare Arbeitsspeicher erhöht und der pro Thread verfügbare Speicher, da die Anzahl der vCPUs gleich bleibt. Daher kann die Verwendung eines Maschinentyps mit großem Speicher eine kostengünstige Möglichkeit sein, einen Worker mit mehr Arbeitsspeicher pro vCPU auszuwählen.
  • Für mehr Flexibilität bei der Angabe der Anzahl der vCPUs und der Größe des Arbeitsspeichers können Sie Folgendes verwenden: Sie können einen benutzerdefinierten Maschinentyp verwenden. Mit benutzerdefinierten Maschinentypen können Sie den Speicher um 256-MB-Schritte erhöhen. Diese Maschinentypen unterscheiden sich preislich von den standardmäßigen Maschinentypen.
  • Einige Maschinenfamilien bieten Ihnen die Möglichkeit, benutzerdefinierte Maschinentypen für erweiterten Speicher zu verwenden. Erweiterter Speicher ermöglicht ein höheres Verhältnis von Arbeitsspeicher pro vCPU. Die Kosten sind höher.

Verwenden Sie die folgende Pipelineoption, um Worker-Typen festzulegen. Weitere Informationen finden Sie unter Pipelineoptionen festlegen und Pipelineoptionen.

Java

Verwenden Sie die Pipelineoption --workerMachineType.

Python

Verwenden Sie die Pipelineoption --machine_type.

Einfach loslegen (Go)

Verwenden Sie die Pipelineoption --worker_machine_type.

Nutzen Sie einen Maschinentyp mit mehr vCPUs.

Diese Option wird nur für Java- und Go-Streamingpipelines empfohlen. Maschinentypen mit mehr vCPUs haben mehr Gesamtspeicher, da der Arbeitsspeicher linear mit der Anzahl der vCPUs skaliert wird. Ein Maschinentyp n1-standard-4 mit vier vCPUs hat beispielsweise 15 GB Arbeitsspeicher. Der Maschinentyp n1-standard-8 mit acht vCPUs hat 30 GB Arbeitsspeicher. Weitere Informationen zu vordefinierten Maschinentypen finden Sie unter Maschinenfamilie für allgemeine Zwecke.

Die Verwendung von Workern mit einer höheren Anzahl von vCPUs kann die Kosten Ihrer Pipeline erheblich erhöhen. Sie können jedoch horizontales Autoscaling verwenden, um die Gesamtzahl der Worker zu reduzieren, sodass die Parallelität gleich bleibt. Wenn Sie beispielsweise 50 Worker haben, die einen Maschinentyp n1-standard-4 verwenden, und Sie zu einem Maschinentyp n1-standard-8 wechseln, können Sie die horizontale Autoskalierung verwenden und die maximale Anzahl der Worker so einstellen, dass die Gesamtzahl der Worker in Ihrer Pipeline auf etwa 25 reduziert wird. Diese Konfiguration führt zu einer Pipeline mit ähnlichen Kosten.

Verwenden Sie die folgende Pipelineoption, um die maximale Anzahl der Worker festzulegen.

Java

Verwenden Sie die Pipelineoption --maxNumWorkers.

Weitere Informationen finden Sie unter Pipelineoptionen.

Einfach loslegen (Go)

Verwenden Sie die Pipelineoption --max_num_workers.

Weitere Informationen finden Sie unter Pipelineoptionen.

Diese Methode wird für Python-Pipelines nicht empfohlen. Wenn Sie das Python SDK verwenden und zu einem Worker mit einer höheren Anzahl von vCPUs wechseln, erhöhen Sie nicht nur den Speicher, sondern auch die Anzahl der Apache Beam SDK-Prozesse. Der Maschinentyp n1-standard-4 hat beispielsweise denselben Arbeitsspeicher pro Thread wie der Maschinentyp n1-standard-8 für Python-Pipelines. Daher wird bei Python-Pipelines empfohlen, einen Maschinentyp mit großem Speicher zu verwenden, die Anzahl der Threads zu reduzieren oder nur einen Apache Beam SDK-Prozess zu verwenden.

Reduzieren Sie die Anzahl der Threads.

Wenn die Verwendung eines Maschinentyps mit hohem Arbeitsspeicher Ihr Problem nicht löst, erhöhen Sie den pro Thread verfügbaren Arbeitsspeicher, indem Sie die maximale Anzahl der Threads, die DoFn-Instanzen ausführen, reduzieren. Diese Änderung reduziert die Parallelität. Verwenden Sie die folgende Pipelineoption, um die Anzahl der Apache Beam SDK-Threads zu reduzieren, die DoFn-Instanzen ausführen.

Java

Verwenden Sie die Pipelineoption --numberOfWorkerHarnessThreads.

Weitere Informationen finden Sie unter Pipelineoptionen.

Python

Verwenden Sie die Pipelineoption --number_of_worker_harness_threads.

Weitere Informationen finden Sie unter Pipelineoptionen.

Einfach loslegen (Go)

Verwenden Sie die Pipelineoption --number_of_worker_harness_threads.

Weitere Informationen finden Sie unter Pipelineoptionen.

Um die Anzahl der Threads für Java- und Go-Batchpipelines zu reduzieren, setzen Sie den Wert des Flags auf eine Zahl, die kleiner als die Anzahl der vCPUs auf dem Worker ist. Setzen Sie bei Streaming-Pipelines den Wert des Flags auf eine Zahl, die kleiner ist als die Anzahl der Threads pro Apache Beam SDK-Prozess. Informationen zur Schätzung von Threads pro Prozess finden Sie in der Tabelle im Abschnitt DoFn Speichernutzung auf dieser Seite.

Diese Anpassung ist nicht verfügbar für Python-Pipelines, die mit dem Apache Beam SDK 2.20.0 oder früher laufen, oder für Python-Pipelines, die nicht Runner v2 verwenden.

Nur einen Apache Beam SDK-Prozess verwenden

Bei Python-Streamingpipelines und Python-Pipelines, die Runner v2 verwenden, können Sie erzwingen, dass Dataflow nur einen Apache Beam SDK-Prozess pro Worker startet. Bevor Sie diese Option ausprobieren können, versuchen Sie zuerst, das Problem mit den anderen Methoden zu beheben. Verwenden Sie die folgende Pipeline-Option, um Dataflow-Worker-VMs so zu konfigurieren, dass nur ein containerisierter Python-Prozess gestartet wird:

--experiments=no_use_multiple_sdk_containers

Bei dieser Konfiguration erstellen Python-Pipelines einen Apache Beam SDK-Prozess pro Worker. Diese Konfiguration verhindert, dass die freigegebenen Objekte und Daten für jeden Apache Beam SDK-Prozess mehrmals repliziert werden. Allerdings wird dadurch die effiziente Nutzung der für den Worker verfügbaren Rechenressourcen begrenzt.

Wenn Sie die Anzahl der Apache Beam SDK-Prozesse auf einen Prozess reduzieren, wird die Gesamtzahl der Threads, die auf dem Worker gestartet werden, nicht unbedingt verringert. Darüber hinaus kann das Ausführen aller Threads in einem einzelnen Apache Beam SDK-Prozess zu einer langsamen Verarbeitung führen oder dazu führen, dass die Pipeline hängen bleibt. Daher müssen Sie möglicherweise die Anzahl der Threads reduzieren, wie im Abschnitt Anzahl der Threads reduzieren auf dieser Seite beschrieben wird.

Sie können auch erzwingen, dass Worker nur einen Apache Beam SDK-Prozess verwenden, indem Sie einen Maschinentyp mit nur einer vCPU verwenden.