Dynamische Thread-Skalierung

Die dynamische Threadskalierung ist Teil der vertikalen Skalierungsfunktionen von Dataflow. Sie ergänzt das horizontale Autoscaling-Feature von Dataflow und passt die Anzahl der parallelen Aufgaben (auch Bundles genannt) an, die jeder Dataflow-Worker ausführt. Ziel ist es, die Gesamteffizienz Ihrer Dataflow-Pipeline zu erhöhen.

Wenn Dataflow eine Pipeline ausführt, wird die Verarbeitung auf mehrere Compute Engine-VMs (VMs) verteilt, die auch als Worker bezeichnet werden. Ein Thread ist eine einzelne ausführbare Aufgabe, die in einem größeren Prozess ausgeführt wird. Dataflow startet auf jedem Worker mehrere Threads.

Wenn die dynamische Thread-Skalierung aktiviert ist, wählt der Dataflow-Dienst automatisch die entsprechende Anzahl von Threads aus, die auf jedem Dataflow-Worker ausgeführt werden sollen. Da jeder Thread eine Aufgabe ausführt, kann durch Erhöhen der Anzahl der Threads mehr Aufgaben parallel auf einem Worker ausgeführt werden. Wenn Sie dieses Feature mit dem horizontalen Autoscaling-Feature verwenden, bleibt die Gesamtzahl der von der Pipeline verwendeten Threads gleich, aber weniger Worker werden verwendet.

Die dynamische Thread-Skalierung verwendet einen Algorithmus, um zu ermitteln, wie viele Threads jeder Worker auf Grundlage der Ressourcenauslastungssignalen benötigt, die während der Pipelineausführung generiert werden. Weitere Informationen finden Sie im Abschnitt Funktionsweise auf dieser Seite.

Vorteile

Die dynamische Thread-Skalierung hat folgende potenzielle Vorteile.

  • Ermöglicht Dataflow-Workern, Daten effizienter zu verarbeiten, indem die CPU- und Speicherauslastung pro Worker verbessert wird.
  • Verbessert die parallele Verarbeitung, da die Anzahl der Worker-Threads angepasst wird, die zum Ausführen paralleler Aufgaben während der Pipelineausführung verfügbar sind.
  • Sie verringert die Anzahl der Worker, die zum Verarbeiten großer Datasets erforderlich sind. Dadurch können die Kosten sinken.

Unterstützung und Einschränkungen

  • Die dynamische Thread-Skalierung ist für Pipelines verfügbar, die die Java, Python und Go SDKs verwenden.
  • Der Dataflow-Job muss Runner v2 verwenden.
  • Es werden nur Batchpipelines unterstützt.
  • CPU- oder speicherintensive Pipelines profitieren möglicherweise nicht von der dynamischen Thread-Skalierung.
  • Die dynamische Thread-Skalierung reduziert nicht die Zeit, die ein Dataflow-Job benötigt.

Funktionsweise

Die dynamische Thread-Skalierung verwendet Autotuning-Prinzipien, um die Thread-Anzahl für jeden Worker im Dataflow-Worker-Pool dynamisch hoch- oder herunterzuskalieren. Die Anzahl der Threads wird unabhängig auf jedem Worker skaliert. Jeder Thread führt eine Aufgabe aus. Wenn Sie die Anzahl der Threads erhöhen, können mehr Aufgaben parallel auf einem Worker ausgeführt werden. Wenn Aufgaben abgeschlossen sind und die Threads nicht mehr benötigt werden, wird die Anzahl der Threads herunterskaliert. Ein Algorithmus bestimmt, wie viele Threads die einzelnen Worker erfordern.

Die Anzahl der Threads auf einem Worker wird auf maximal zwei Threads pro vCPU skaliert, wenn die beiden folgenden Bedingungen erfüllt sind:

  • Die Speicherauslastung des Workers beträgt weniger als 50 %.
  • Die CPU-Auslastung auf dem Worker liegt unter 65 %.

Die Anzahl der Threads auf einem Worker wird auf mindestens einen Thread pro vCPU skaliert, wenn die folgende Bedingung erfüllt ist:

  • Die Speicherauslastung des Workers beträgt mehr als 70 %.

Die Speicher- und CPU-Auslastung des Jobs finden Sie auf dem Tab Jobmesswerte der Dataflow-Weboberfläche.

Damit die Empfehlungen gültig sind, wartet Dataflow auf die Stabilisierung der Ressourcennutzung, bevor Empfehlungen an Worker gesendet werden. Zum Beispiel könnte die Speicher- und CPU-Auslastung im Bereich der Skalierung liegen, aber da die Ressourcenauslastung weiter wächst, sendet Dataflow keine Empfehlung. Nachdem sich die Ressourcennutzung stabilisiert hat, sendet Dataflow eine Empfehlung.

Wenn ein OOM-Fehler (Out of Memory) auftritt, wird die Threadskalierung automatisch deaktiviert und die Pipeline wird mit einem Thread pro vCPU ausgeführt.

Dynamische Thread-Skalierung aktivieren

Verwenden Sie die folgende Dataflow-Dienstoption, um die dynamische Thread-Skalierung zu aktivieren.

Java

--dataflowServiceOptions=enable_dynamic_thread_scaling

Python

--dataflow_service_options=enable_dynamic_thread_scaling

Go

--dataflow_service_options=enable_dynamic_thread_scaling

Wenn die dynamische Threadskalierung aktiviert ist, können Sie auch die anfängliche und maximale Anzahl an Workern festlegen, die während der Ausführung für Ihre Pipeline verfügbar sind. Weitere Informationen finden Sie unter Pipelineoptionen.

Prüfen, ob die dynamische Thread-Skalierung aktiviert ist

Wenn die dynamische Threadskalierung aktiviert ist, wird in Ihren Worker-Logdateien die folgende Meldung angezeigt:

Enabling thread vertical scaling feature in worker.

Verwenden Sie zum Aufrufen der Worker-Logdateien im Log-Explorer den Bereich Abfrage, um Folgendes auszuführen: filtern Sie die Logs nach Logname. Verwenden Sie in Ihrem Filter den folgenden Lognamen:

projects/PROJECT_ID/logs/dataflow.googleapis.com%2Fharness

Die empfohlene Anzahl von Threads finden Sie in den Worker-Logdateien. Die folgende Meldung enthält die empfohlene Anzahl von Threads:

worker_thread_scaling_report_response { recommended_thread_count: NUMBER }

Wenn die Ressourcennutzung nicht im Skalierungsbereich liegt, entspricht der angezeigte Wert der Anzahl der vCPUs auf dem Worker.

Sie können auch die Google Cloud Console verwenden, um zu sehen, ob die dynamische Thread-Skalierung aktiviert ist. Wenn es aktiviert ist, werden im Dataflow Jobinformationen in der Seitenleiste DataflowServiceOptions Zeile der Pipelineoptionen Abschnittenable_dynamic_thread_scaling angezeigt.

Fehlerbehebung

In diesem Abschnitt finden Sie eine Anleitung zur Behebung häufiger Probleme im Zusammenhang mit der dynamischen Threadskalierung.

Leistungsverschlechterung mit aktivierter dynamischer Thread-Skalierung

Das Erhöhen der Thread-Anzahl kann in den folgenden Fällen zu Leistungsproblemen führen:

  • Wenn mehrere Prozesse versuchen, dieselbe Ressource zu verwenden, kann ein Prozess die Ressource verwenden, während andere warten müssen. Dies wird als Ressourcenkonflikt bezeichnet. Wenn Ressourcenkonflikte auftreten, kann die Pipelineleistung sinken.
  • Wenn Fehler aufgrund fehlenden Speichers auftreten, ist die dynamische Thread-Skalierung deaktiviert. In einigen Fällen können Fehler aufgrund fehlenden Arbeitsspeichers dazu führen, dass die Pipeline fehlschlägt.

Prüfen Sie, ob die Anzahl der Threads erhöht wurde. Informationen zum Prüfen der empfohlenen Thread-Anzahl finden Sie auf dieser Seite unter Prüfen, ob die Thread-Skalierung aktiviert ist.

Wenn die Threadskalierung aktiviert ist, geben Sie beim Ausführen Ihrer Pipeline nicht die Option für die dynamische Threadskalierung an.

Unified Worker ... sowohl aktiviert als auch deaktiviert

Nachdem Sie die dynamische Threadskalierung aktiviert haben, schlägt Ihr Job möglicherweise mit dem folgenden Fehler fehl:

The workflow could not be created. Causes: (ID): Unified worker misconfigured by user and was both enabled and disabled.

Dieser Fehler tritt auf, wenn Runner v2 explizit deaktiviert ist.

Aktivieren Sie Runner v2, um dieses Problem zu beheben. Weitere Informationen finden Sie im Abschnitt Dataflow Runner v2 aktivieren auf der Seite "Dataflow Runner V2 verwenden".

SDK aktualisieren

Nachdem Sie die dynamische Threadskalierung aktiviert haben, schlägt Ihr Job möglicherweise mit dem folgenden Fehler fehl:

Java

Dataflow Runner v2 requires the Apache Beam Java SDK version 2.29.0 or higher. Please upgrade your SDK and resubmit your job.

Python

Dataflow Runner v2 requires the Apache Beam SDK, version 2.21.0 or higher. Please upgrade your SDK and resubmit your job.

Dieser Fehler tritt auf, wenn Runner v2 nicht aktiviert werden kann, da die SDK-Version dies nicht unterstützt.

Verwenden Sie eine SDK-Version, die Runner v2 unterstützt, um dieses Problem zu beheben.

Die vertikale Skalierungsthreadfunktion kann nicht aktiviert werden

Nachdem Sie die dynamische Threadskalierung aktiviert haben, schlägt Ihr Job möglicherweise mit dem folgenden Fehler fehl:

The workflow could not be created. Causes: (ID): Thread vertical scaling feature can not be enabled while number_of_worker_harness_threads is specified.

Dieser Fehler tritt auf, wenn die Pipeline mit der Pipelineoption numberOfWorkerHarnessThreads oder number_of_worker_harness_threads explizit die Anzahl der Threads pro Worker festlegt.

Entfernen Sie die Pipelineoption numberOfWorkerHarnessThreads oder number_of_worker_harness_threads aus der Pipeline, um dieses Problem zu beheben.