Pipeline bereitstellen

In diesem Dokument wird ausführlich erläutert, wie Sie mit Dataflow eine Pipeline bereitstellen und ausführen. Außerdem werden Themen für Fortgeschrittene wie die Optimierung und das Load-Balancing behandelt. Eine detaillierte Anleitung zum Erstellen und Bereitstellen Ihrer ersten Pipeline finden Sie in den Dataflow-Kurzanleitungen für Java, Python oder Vorlagen.

Nachdem Sie Ihre Apache Beam-Pipeline erstellt und getestet haben, können Sie sie mithilfe des verwalteten Dataflow-Dienstes bereitstellen und ausführen. Ihr Pipelinecode wird im Dataflow-Dienst zu einem Dataflow-Job.

Der Dataflow-Dienst bietet eine vollständige Verwaltung von Google Cloud-Diensten wie Compute Engine und Cloud Storage, um Ihren Dataflow-Job auszuführen, automatisch hochzufahren und die erforderlichen Ressourcen abzubauen. Der Dataflow-Dienst bietet Einblicke in Ihren Job mit Tools wie der Dataflow-Monitoring-Oberfläche und der Dataflow-Befehlszeile.

Sie können bestimmte Aspekte hinsichtlich der Ausführung des Jobs durch den Dataflow-Dienst steuern, indem Sie im Pipelinecode Ausführungsparameter festlegen. Die Ausführungsparameter geben beispielsweise an, ob die Schritte Ihrer Pipeline auf virtuellen Worker-Maschinen, im Dataflow-Dienst-Back-End oder lokal ausgeführt werden.

Neben der Verwaltung von Google Cloud-Ressourcen führt der Dataflow-Dienst viele Aspekte der verteilten parallelen Verarbeitung automatisch aus und optimiert sie. Dazu gehören:

  • Parallelisierung und Verteilung: Dataflow partitioniert die Daten automatisch und verteilt den Worker-Code zur parallelen Verarbeitung auf Compute Engine-Instanzen.
  • Optimierung Dataflow erstellt mithilfe des Pipelinecodes eine Ausführungsgrafik mit den PCollections der Pipeline. Die Grafik wird außerdem für eine möglichst effiziente Leistung und Ressourcennutzung transformiert und optimiert. Dataflow optimiert darüber hinaus automatisch potenziell kostspielige Vorgänge wie etwa Datenzusammenfassungen.
  • Automatische Feinabstimmungsfunktionen: Der Dataflow-Dienst umfasst mehrere Funktionen zur sofortigen Anpassung der Ressourcenzuweisung und Datenpartitionierung, wie beispielsweise Autoscaling und den dynamischen Work-Ausgleich. Diese Funktionen ermöglichen eine möglichst schnelle und effiziente Ausführung des Jobs durch den Dataflow-Dienst.

Pipelinelebenszyklus: Vom Pipelinecode zum Dataflow-Job

Wenn Sie Ihre Dataflow-Pipeline ausführen, erstellt Dataflow eine Ausführungsgrafik aus dem Code, der das Pipeline-Objekt erstellt, einschließlich aller Transformationen und der zugehörigen Verarbeitungsfunktionen wie DoFns. Dies ist die Grafikerstellungsphase. Sie wird lokal auf dem Computer ausgeführt, auf dem die Pipeline ausgeführt wird.

Während die Grafik erstellt wird, führt Apache Beam den Code lokal vom Haupteinstiegspunkt des Pipelinecodes aus, stoppt bei den Aufrufen einer Quelle, einer Senke oder eines Transformationsschritts und wandelt diese Aufrufe in Knoten der Grafik um. Aufgrund dessen wird ein Stück Code, das sich im Einstiegspunkt einer Pipeline (Java-Methode main() oder die oberste Ebene eines Python-Skripts) befindet, lokal auf dem Rechner ausgeführt, auf dem auch die Pipeline ausgeführt wird, während derselbe Code auf den Dataflow-Workern ausgeführt wird, wenn er in einer Methode eines DoFn-Objekts deklariert wird.

Außerdem prüft Apache Beam während der Grafikerstellung, ob alle Ressourcen, auf die die Pipeline verweist (wie Cloud Storage-Buckets, BigQuery-Tabellen und Pub/Sub-Themen und -Abos), tatsächlich vorhanden und zugänglich sind. Die Prüfung erfolgt über Standard-API-Aufrufe der jeweiligen Dienste. Daher ist es wichtig, dass das zum Ausführen einer Pipeline verwendete Nutzerkonto eine funktionierende Verbindung zu den erforderlichen Diensten und die Berechtigung zum Aufrufen von deren APIs hat. Bevor die Pipeline an den Dataflow-Dienst gesendet wird, prüft Apache Beam auch, ob weitere Fehler vorliegen, und sorgt dafür, dass die Pipelinegrafik keine ungültigen Vorgänge enthält.

Die Ausführungsgrafik wird dann in das JSON-Format übersetzt und an den Dataflow-Dienstendpunkt übertragen.

Hinweis: Die Grafik wird auch erstellt, wenn Sie die Pipeline lokal ausführen. Es erfolgt jedoch keine Übersetzung in das JSON-Format und keine Übertragung an den Dienst. Die Grafik wird stattdessen lokal auf demselben Computer ausgeführt, auf dem Sie Dataflow gestartet haben. Weitere Details finden Sie in der Dokumentation zur Konfiguration für die lokale Ausführung.

Der Dataflow-Dienst validiert dann die JSON-Ausführungsgrafik. Durch die Validierung wird die Grafik zu einem Job im Dataflow-Dienst. Sie können den Job, die Ausführungsgrafik, den Status und die Loginformationen auf der Dataflow-Monitoring-Oberfläche aufrufen.

Java: SDK 2.x

Der Dataflow-Dienst sendet eine Antwort an den Computer, auf dem Sie das Dataflow-Programm ausführen. Diese Antwort wird im Objekt DataflowPipelineJob gekapselt, das die jobId des Dataflow-Jobs enthält. Sie können jobId für Monitoring, Verfolgung und Fehlerbehebung Ihres Jobs mithilfe der Dataflow-Monitoring-Oberfläche und der Dataflow-Befehlszeile verwenden. Weitere Informationen finden Sie in der API-Referenz zu "DataflowPipelineJob".

Python

Der Dataflow-Dienst sendet eine Antwort an den Computer, auf dem Sie das Dataflow-Programm ausführen. Diese Antwort wird im Objekt DataflowPipelineResult gekapselt, das die job_id des Dataflow-Jobs enthält. Sie können job_id für Monitoring, Verfolgung und Fehlerbehebung Ihres Jobs mithilfe der Dataflow-Monitoring-Oberfläche und der Dataflow-Befehlszeile verwenden.

Java: SDK 1.x

Ausführungsgrafik

Dataflow erstellt eine Grafik der Schritte, die Ihre Pipeline darstellt, anhand der Transformationen und Daten, die Sie bei der Erstellung des Pipeline-Objekts verwendet haben. Dies ist die Ausführungsgrafik der Pipeline.

Das in den Apache Beam SDKs enthaltene WordCount-Beispiel enthält eine Reihe von Transformationen zum Lesen, Extrahieren, Zählen, Formatieren und Schreiben der einzelnen Wörter in einer Textsammlung. Außerdem wird zu jedem Wort angegeben, wie oft es vorkommt. Das folgende Diagramm zeigt, wie die Transformationen in der WordCount-Pipeline zu einer Ausführungsgrafik erweitert werden:

Grafik: Die Transformationen im Beispielprogramm WordCount werden in einer Ausführungsgrafik mit den vom Cloud Dataflow-Dienst auszuführenden Schritten gezeigt.
Abbildung 1: Ausführungsgrafik des Beispiels WordCount

Die Ausführungsgrafik weicht oft von der Reihenfolge ab, in der Sie die Transformationen beim Erstellen der Pipeline angegeben haben. Der Grund dafür ist, dass der Dataflow-Dienst verschiedene Optimierungen und Zusammenlegungen in der Ausführungsgrafik vornimmt, bevor diese in verwalteten Cloudressourcen ausgeführt wird. Der Dataflow-Dienst berücksichtigt beim Ausführen der Pipeline Datenabhängigkeiten. Die dazwischenliegenden Schritte ohne Datenabhängigkeiten können jedoch in beliebiger Reihenfolge ausgeführt werden.

Wenn Sie den Job auf der Dataflow-Monitoring-Oberfläche auswählen, sehen Sie die von Dataflow für die Pipeline erstellte, nicht optimierte Ausführungsgrafik. Weitere Informationen zum Aufrufen von Jobs finden Sie unter Dataflow-Monitoring-Oberfläche verwenden.

Parallelisierung und Verteilung

Der Dataflow-Dienst parallelisiert und verteilt die Verarbeitungslogik der Pipeline automatisch an die Worker, die Sie für die Ausführung des Jobs zugewiesen haben. In Dataflow werden die Abstraktionen im Programmiermodell zur Darstellung paralleler Prozessfunktionen verwendet. Beispiel: Die ParDo-Transformationen führen zu einer automatischen Verteilung des Verarbeitungscodes. Der Code wird durch DoFn-Elemente dargestellt. So lassen sich mehrere Worker parallel ausführen.

Nutzercode strukturieren

Sie können sich den DoFn-Code als kleine, unabhängige Entitäten vorstellen. Es besteht die Möglichkeit, zahlreiche Instanzen auf unterschiedlichen Maschinen auszuführen, ohne dass diesen die Existenz der anderen Instanzen bekannt ist. Daher eignen sich reine Funktionen optimal als Code für die parallele und verteilte Ausführungsart von DoFn-Objekten. Reine Funktionen sind Funktionen, die nicht von einem verborgenen oder externen Status abhängen, keine erkennbaren Nebenwirkungen haben und deterministisch sind.

Das reine Funktionsmodell ist jedoch nicht völlig starr. Statusinformationen oder externe Initialisierungsdaten können für DoFn und andere Funktionsobjekte gültig sein. Wichtig ist nur, dass der Code nicht von Faktoren abhängt, die der Cloud Dataflow-Dienst nicht garantieren kann. Beachten Sie beim Strukturieren der ParDo-Transformationen und beim Erstellen der DoFn-Objekte folgende Richtlinien:

  • Der Dataflow-Dienst sorgt dafür, dass alle Elemente in der Eingabe-PCollection von einer Instanz des Objekts DoFn genau einmal verarbeitet werden.
  • Der Dataflow-Dienst garantiert nicht, wie oft ein DoFn-Objekt aufgerufen wird.
  • Der Dataflow-Dienst gibt keine genaue Garantie dafür, wie die verteilten Elemente gruppiert werden. Es wird somit nicht garantiert, ob und welche Elemente zusammen verarbeitet werden.
  • Der Dataflow-Dienst garantiert nicht die genaue Anzahl von DoFn-Instanzen, die im Verlauf einer Pipeline erstellt werden.
  • Der Dataflow-Dienst toleriert Fehler. Bei Worker-Problemen kann die Ausführung des Codes mehrfach wiederholt werden. Der Dataflow-Dienst kann Sicherungskopien des Codes erstellen. Außerdem können Probleme mit manuellen Nebeneffekten auftreten, etwa wenn der Code auf temporären Dateien mit nicht eindeutigen Namen basiert oder solche Dateien erstellt.
  • Der Dataflow-Dienst serialisiert die Elementverarbeitung pro DoFn-Instanz. Der Code braucht nicht zwingend threadsicher zu sein. Wenn ein Status jedoch von mehreren DoFn-Instanzen gemeinsam genutzt wird, muss er threadsicher sein.

Weitere Informationen zum Erstellen von Nutzercode finden Sie im Abschnitt zu den Anforderungen für Funktionen, die von Nutzern bereitgestellt werden in der Dokumentation zum Programmiermodell.

Verarbeitung von Fehlern und Ausnahmen

Die Pipeline kann während der Datenverarbeitung Ausnahmen ausgeben. Einige dieser Fehler sind temporär, wie etwa vorübergehende Schwierigkeiten beim Zugriff auf einen externen Dienst. Andere treten hingegen permanent auf. Hierzu zählen beispielsweise durch beschädigte oder nicht parsingfähige Eingabedaten verursachte Fehler oder während der Berechnung ausgegebene Nullzeiger.

Dataflow verarbeitet Elemente in beliebigen Gruppierungen. Sollte für eines der Elemente in der Gruppierung ein Fehler ausgegeben werden, wird die gesamte Gruppierung noch einmal verarbeitet. Im Batchmodus wird die Verarbeitung von Gruppierungen mit einem fehlerhaften Element viermal wiederholt. Wenn eine Gruppierung viermal fehlgeschlagen ist, fällt die gesamte Pipeline aus. Im Streamingmodus wird die Verarbeitung einer Gruppierung mit einem fehlerhaften Element unendlich oft wiederholt. Dies kann zur permanenten Blockierung der Pipeline führen.

Fehler bei Start-Workern, z. B. die Nichtinstallation von Paketen auf den Workern, sind temporär, was zu unbeschränkten Wiederholungen führt und eine permanente Blockierung der Pipeline verursachen kann.

Zusammenführung optimieren

Nachdem die JSON-Form der Ausführungsgrafik Ihrer Pipeline validiert wurde, kann der Dataflow-Dienst die Grafik optimieren. Dies kann das Zusammenführen mehrerer Schritte oder Transformationen in der Pipeline-Ausführungsgrafik zu einem Schritt beinhalten. Durch das Zusammenführen von Schritten muss der Dataflow-Dienst nicht jede dazwischenliegende PCollection in der Pipeline erfassen, was im Hinblick auf den Speicher- und Verarbeitungsaufwand kostspielig werden kann.

Während alle beim Erstellen der Pipeline angegebenen Transformationen für den Dienst ausgeführt werden, kann die Ausführung in unterschiedlicher Reihenfolge oder als Teil einer umfangreicheren zusammengeführten Transformation erfolgen. Dies soll eine möglichst effektive Ausführung der Pipeline gewährleisten. Dabei berücksichtigt der Dataflow-Dienst Datenabhängigkeiten zwischen den Schritten in der Ausführungsgrafik. Ansonsten können die Schritte jedoch in beliebiger Reihenfolge ausgeführt werden.

Beispiel einer Zusammenführung

Das folgende Diagramm zeigt, wie der Dataflow-Dienst die Ausführungsgrafik des WordCount-Beispiels aus dem Apache Beam SDK for Java optimieren und zusammenführen kann, um die Ausführung effizienter zu gestalten:

Grafik: Die für das Beispielprogramm WordCount optimierte Ausführungsgrafik mit vom Cloud Dataflow-Dienst zusammengeführten Schritten
Abbildung 2: Für das Beispiel WordCount optimierte Ausführungsgrafik

Zusammenführung verhindern

Es gibt ein paar Fälle in der Pipeline, in denen Sie optimierende Zusammenführungen durch den Dataflow-Dienst verhindern sollten. Es kommt mitunter vor, dass der Dataflow-Dienst keine optimalen Zusammenführungen von Vorgängen in der Pipeline ermitteln kann. Dadurch wird möglicherweise die Fähigkeit des Dataflow-Dienstes eingeschränkt, alle verfügbaren Worker zu verwenden.

Ein Fall, bei dem die Zusammenführung eine optimale Worker-Nutzung durch Dataflow beeinträchtigen kann, ist beispielsweise eine ParDo mit "hohem Fan-Out". Bei einem solchen Vorgang kann die Eingabesammlung relativ wenig Elemente enthalten, der ParDo jedoch eine Ausgabe mit hundert- oder tausendmal so vielen Elementen erzeugen, gefolgt von einem anderen ParDo. Wenn der Dataflow-Dienst diese ParDo-Vorgänge zusammenführt, ist die Parallelität in diesem Schritt auf maximal die Anzahl der Elemente in der Eingabesammlung beschränkt, obwohl die dazwischenliegende PCollection viel mehr Elemente enthält.

Zur Verhinderung einer solchen Zusammenführung können Sie der Pipeline einen Vorgang hinzufügen, der den Dataflow-Dienst zwingt, die dazwischenliegende PCollection zu erfassen. Führen Sie gegebenenfalls einen der folgenden Vorgänge aus:

  • Fügen Sie einen GroupByKey ein und heben Sie die Gruppierung nach dem ersten ParDo auf. Der Dataflow-Dienst führt keine ParDo-Vorgänge über eine Zusammenfassung hinweg aus.
  • Sie können die dazwischenliegende PCollection als Nebeneingabe für ein anderes ParDo-Objekt übergeben. Nebeneingaben werden vom Dataflow-Dienst immer erfasst.
  • Sie können einen Reshuffle-Schritt einfügen. Reshuffle verhindert eine Zusammenführung, erstellt Prüfpunkte für die Daten und führt eine Deduplizierung von Datensätzen durch. Dataflow unterstützt Reshuffle, obwohl es in der Apache Beam-Dokumentation als verworfen gekennzeichnet ist.

Optimierung kombinieren

Zusammenführungen stellen bei umfangreichen Datenverarbeitungen ein wichtiges Konzept dar. Dabei werden konzeptionell völlig unterschiedliche Daten vereint. Dies ist für Korrelationen extrem nützlich. Das Programmiermodell von Dataflow stellt die Aggregationsvorgänge in Form der Transformationen GroupByKey, CoGroupByKey und Combine dar.

Bei den Zusammenführungen durch Dataflow werden Daten des gesamten Datensatzes kombiniert. Hierzu zählen auch auf mehrere Worker verteilte Daten. Während der Zusammenführung ist es häufig am effizientesten, vor einer instanzübergreifenden Kombination so viele Daten wie möglich lokal zusammenzuführen. Wenn Sie eine Transformation vom Typ GroupByKey oder eine andere Aggregationstransformation anwenden, werden die Daten vom Dataflow-Dienst automatisch teilweise lokal kombiniert, bevor die Hauptgruppierung erfolgt.

Bei der Durchführung einer teilweisen oder mehrstufigen Kombination trifft der Dataflow-Dienst abhängig davon, ob die Pipeline mit Batch- oder Streamingdaten arbeitet, unterschiedliche Entscheidungen. Bei eingeschränkten Daten priorisiert der Dienst eine effiziente Vorgehensweise und kombiniert möglichst viel lokal. Bei uneingeschränkten Daten bevorzugt der Dienst eine geringere Latenz und führt möglicherweise keine teilweise Kombination durch, da dies die Latenz erhöhen kann.

Autotuning-Features

Der Dataflow-Dienst enthält mehrere Autotuning-Features, mit denen Sie den Dataflow-Job während der Ausführung weiter dynamisch optimieren können. Zu diesen Features zählen Autoscaling und der dynamische Work-Ausgleich.

Autoscaling

Wenn das Autoscaling aktiviert ist, wählt der Dataflow-Dienst automatisch die zum Ausführen des Jobs erforderliche Anzahl von Worker-Instanzen. Der Dataflow-Dienst kann während der Laufzeit auch je nach Job dynamisch mehr oder weniger Worker erneut zuweisen. Bestimmte Teile der Pipeline sind möglicherweise rechenintensiver als andere. Außerdem kann der Dataflow-Dienst während dieser Jobphasen automatisch zusätzliche Worker erstellen – und deaktivieren, wenn sie nicht mehr benötigt werden.

Java: SDK 2.x

Autoscaling ist standardmäßig für alle Dataflow-Batchjobs und Streamingjobs mit Streaming Engine aktiviert. Zum Deaktivieren von Autoscaling können Sie das Flag --autoscalingAlgorithm=NONE explizit angeben, wenn Sie die Pipeline ausführen. In diesem Fall legt der Dataflow-Dienst die Anzahl der Worker auf Basis der Option --numWorkers fest, deren Standardwert 3 ist.

Bei aktiviertem Autoscaling lässt der Dataflow-Dienst nicht zu, dass Nutzer die genaue Anzahl der Worker-Instanzen festlegen, die dem Job zugeordnet werden. Sie können die Anzahl der Worker jedoch begrenzen. Dazu müssen Sie beim Ausführen der Pipeline die Option --maxNumWorkers angeben.

Bei Batchjobs ist das Flag --maxNumWorkers optional. Die Standardeinstellung ist 1000. Für Streamingjobs mit Streaming Engine ist das Flag --maxNumWorkers optional. Der Standardwert ist 100. Für Streamingjobs ohne Streaming Engine ist das Flag --maxNumWorkers erforderlich.

Python

Autoscaling ist standardmäßig für alle Dataflow-Batchjobs aktiviert, die mit dem Apache Beam SDK for Python ab Version 0.5.1 erstellt wurden. Zum Deaktivieren von Autoscaling können Sie das Flag --autoscaling_algorithm=NONE explizit angeben, wenn Sie die Pipeline ausführen. In diesem Fall legt der Dataflow-Dienst die Anzahl der Worker entsprechend der Option --num_workers fest, deren Standardwert 3 ist.

Bei aktiviertem Autoscaling lässt der Dataflow-Dienst nicht zu, dass Nutzer die genaue Anzahl der Worker-Instanzen festlegen, die dem Job zugeordnet werden. Sie können die Anzahl der Worker jedoch begrenzen. Dazu müssen Sie beim Ausführen der Pipeline die Option --max_num_workers angeben.

Bei Batchjobs ist das Flag --max_num_workers optional. Die Standardeinstellung ist 1000. Für Streamingjobs mit Streaming Engine ist das Flag --max_num_workers optional. Der Standardwert ist 100. Für Streamingjobs ohne Streaming Engine ist das Flag --max_num_workers erforderlich.

Java: SDK 1.x

Dataflow skaliert auf der Grundlage der Parallelität einer Pipeline. Die Parallelität einer Pipeline ist eine Schätzung der Anzahl der Threads, die für die effizienteste Verarbeitung von Daten zu einem bestimmten Zeitpunkt erforderlich sind.

Batch-Autoscaling

Bei Batchpipelines wählt Dataflow die Anzahl der Worker automatisch auf Basis des geschätzten gesamten Arbeitsaufwands der einzelnen Phasen Ihrer Pipeline aus. Dieser ist abhängig von der Eingabegröße und dem aktuellen Durchsatz. Dataflow wertet die Arbeitsmenge entsprechend des Fortschritts der Ausführung alle 30 Sekunden neu aus und skaliert die Anzahl der Worker dynamisch nach oben oder unten, wenn der geschätzte gesamte Arbeitsaufwand zu- bzw. abnimmt.

Wenn eine der folgenden Bedingungen eintritt, behält Dataflow die Anzahl der Worker entweder bei oder verringert sie, um inaktive Ressourcen zu sparen:

  • Die durchschnittliche CPU-Auslastung pro Worker liegt bei unter 5 %.
  • Die Parallelität ist aufgrund von nicht parallelisierbarer Arbeit beschränkt, z. B. nicht aufteilbare Daten wie komprimierte Dateien oder von E/A-Modulen verarbeitete Daten, die nicht aufgeteilt werden können.
  • Die Anzahl der parallelen Vorgänge ist festgelegt, z. B. das Schreiben von Daten in vorhandene Dateien in einem Cloud Storage-Ziel.
  • Wenn die Pipeline eine von Ihnen implementierte benutzerdefinierte Datenquelle verwendet, können Sie verschiedene Methoden implementieren, um weiter Informationen zum Autoscaling-Algorithmus des Dataflow-Dienstes zu erhalten und nach Möglichkeit die Leistung zu steigern:

    Java: SDK 2.x

    • Implementieren Sie in der Unterklasse BoundedSource die Methode getEstimatedSizeBytes. Der Dataflow-Dienst berechnet mithilfe von getEstimatedSizeBytes die Anzahl von Workern, die am Anfang für die Pipeline verwendet werden soll.
    • Implementieren Sie in der Unterklasse BoundedReader die Methode getFractionConsumed. Der Dataflow-Dienst verfolgt mithilfe von getFractionConsumed den Lesefortschritt und ermittelt die richtige Anzahl von Workern, die während eines Lesevorgangs verwendet werden soll.

    Python

    • Implementieren Sie in der Unterklasse BoundedSource die Methode estimate_size. Der Dataflow-Dienst berechnet mithilfe von estimate_size die Anzahl von Workern, die am Anfang für die Pipeline verwendet werden soll.
    • Implementieren Sie in der Unterklasse RangeTracker die Methode fraction_consumed. Der Dataflow-Dienst verfolgt mithilfe von fraction_consumed den Lesefortschritt und ermittelt die richtige Anzahl von Workern, die während eines Lesevorgangs verwendet werden soll.

    Java: SDK 1.x

    Streaming-Autoscaling

    Mithilfe des Streaming-Autoscalings kann der Dataflow-Dienst die Anzahl der zum Ausführen der Streamingpipeline verwendeten Worker bei Änderungen der Last und Ressourcennutzung anpassen. Streaming-Autoscaling ist eine kostenlose Funktion, die zu Kosteneinsparungen bei den für die Ausführung von Streamingpipelines verwendeten Ressourcen führt.

  • Hochskalieren: Wenn eine Streamingpipeline für einige Minuten Rückstände hat und die Worker durchschnittlich mehr als 20 % ihrer CPUs beanspruchen, wird Dataflow hochskaliert. Dataflow versucht, den Rückstand unter Berücksichtigung des aktuellen Durchsatzes pro Worker innerhalb von etwa 150 Sekunden nach dem Hochskalieren zu löschen.
  • Herunterskalieren: Wenn der Rückstand einer Streamingpipeline weniger als 10 Sekunden beträgt und die Worker für einen Zeitraum von einigen Minuten durchschnittlich weniger als 75 % der CPUs verwenden, wird Dataflow herunterskaliert. Nach dem Herunterskalieren verwenden die Worker durchschnittlich 75 % ihrer CPUs. Bei Streamingjobs, die Streaming Engine nicht verwenden, kann die CPU-Auslastung von 75 % manchmal aufgrund der Laufwerkverteilung nicht erreicht werden und es wird ein niedrigeres Ziel verwendet.
  • Keine Skalierung: Wenn es keinen Rückstand gibt, aber die CPU-Auslastung mindestens 75 % beträgt, wird die Pipeline nicht herunterskaliert. Wenn es einen Rückstand gibt, die CPU-Auslastung jedoch unter 20 % liegt, wird die Pipeline nicht hochskaliert.
  • Vorausschauende Skalierung: Die Streaming-Engine verwendet auch ein Verfahren für Vorausschauendes Autoscaling, das auf dem Backlog-Timer basiert. Im Dataflow-Modell werden die unbegrenzten Daten in einer Streaming-Pipeline in Fenster aufgeteilt, die nach Zeitstempeln gruppiert sind. Am Ende eines Fensters werden Timer für jeden in diesem Fenster verarbeiteten Schlüssel ausgelöst. Das Auslösen eines Timers zeigt an, dass das Fenster für einen bestimmten Schlüssel abgelaufen ist. Die Streaming-Engine kann den Timer-Backlog messen, was bedeutet, dass sie vorhersagen kann, wie viele Timer am Ende eines Fensters ausgelöst werden sollen. Durch die Nutzung des Timer-Backlogs als Signal kann Dataflow die zukünftige Zeit durch Schätzung der Verarbeitungsmenge schätzen, die bei zukünftigen Timern ausgeführt werden muss. Basierend auf der geschätzten zukünftigen Arbeitslast wird Dataflow automatisch im Voraus skaliert, um die erwartete Nachfrage zu erfüllen.
  • Ohne Autoscaling wählen Sie durch Angabe von numWorkers oder num_workers zum Ausführen der Pipeline eine feste Anzahl von Workern aus. Da die Eingabelast im Verlauf der Zeit variiert, kann diese Zahl zu hoch oder zu niedrig werden. Wenn Sie zu viele Worker bereitstellen, generieren Sie unnötige Zusatzkosten. Das Bereitstellen von zu wenigen Workern führt bei der Datenverarbeitung hingegen zu einer höheren Latenz. Durch Autoscaling werden Ressourcen nur bei Bedarf verwendet.

    Autoscaling von Streamingpipelines hat das Ziel, Rückstände zu minimieren und gleichzeitig die Worker-Auslastung und den Durchsatz zu maximieren. Außerdem sollte schnell auf Lastspitzen reagiert werden können. Wenn Sie das Autoscaling aktivieren, müssen Sie nicht zwischen der Bereitstellung für Spitzenlasten und aktuellen Ergebnissen wählen. Worker werden als CPU-Auslastung hinzugefügt und Rückstände steigen oder werden eliminiert, sobald diese Messwerte sinken. Sie bezahlen auf diese Weise nur, was Sie benötigen. Der Job wird außerdem so effektiv wie möglich verarbeitet.

    Java: SDK 2.x

    Benutzerdefinierte unbegrenzte Quellen

    Wenn die Pipeline eine benutzerdefinierte unbegrenzte Quelle verwendet, muss die Quelle den Dataflow-Dienst über Rückstände informieren. Ein Rückstand ist eine Schätzung der Eingaben in Byte, die noch nicht durch die Quelle verarbeitet wurden. Damit der Dienst über Rückstände informiert wird, müssen Sie in Ihrer UnboundedReader-Klasse eine der folgenden Methoden einbinden:

    • getSplitBacklogBytes(): Rückstand für die aktuelle Teilung der Quelle; der Dienst aggregiert den Rückstand für alle Teilungen.
    • getTotalBacklogBytes(): Der globale Rückstand für alle Teilungen; in einigen Fällen ist der Rückstand nicht für jede Teilung verfügbar und kann nur für alle Teilungen berechnet werden. Nur die erste Teilung (Teilungs-ID "0") muss den Gesamtrückstand bereitstellen.
    Das Apache Beam-Repository enthält mehrere Beispiele für benutzerdefinierte Quellen, die die Klasse UnboundedReader implementieren.
    Streaming-Autoscaling aktivieren

    Für Streamingjobs mit Streaming Engine ist Autoscaling standardmäßig aktiviert.

    Wenn Sie Autoscaling für Jobs ohne Streaming Engine aktivieren möchten, legen Sie die Ausführungsparameter beim Start der Pipeline fest:

    --autoscalingAlgorithm=THROUGHPUT_BASED
    --maxNumWorkers=N
    

    Bei Streamingjobs ohne Streaming Engine beträgt die Mindestanzahl von Workern 1/15 des Werts --maxNumWorkers und zwar aufgerundet.

    Streamingpipelines werden mit einem festen Pool nichtflüchtiger Speicher bereitgestellt, deren Anzahl --maxNumWorkers entspricht. Dies ist beim Festlegen von --maxNumWorkers zu berücksichtigen. Stellen Sie außerdem sicher, dass der Wert eine ausreichende Anzahl von Laufwerken für die Pipeline umfasst.

    Nutzung und Preise

    Die Compute Engine-Nutzung richtet sich nach der durchschnittlichen Anzahl von Workern. Die Nutzung nichtflüchtiger Speicher basiert hingegen auf dem exakten Wert von --maxNumWorkers. Nichtflüchtige Speicher werden so neu verteilt, dass jeder Worker mit der gleichen Anzahl von Laufwerken verbunden ist.

    Im obigen Beispiel mit --maxNumWorkers=15 werden Ihnen zwischen 1 und 15 Compute Engine-Instanzen und genau 15 nichtflüchtige Speicher in Rechnung gestellt.

    Python

    Streaming-Autoscaling aktivieren

    Legen Sie beim Start der Pipeline die folgenden Ausführungsparameter fest, um das Autoscaling zu aktivieren:

    --autoscaling_algorithm=THROUGHPUT_BASED
    --max_num_workers=N
    

    Bei Streamingjobs ohne Streaming Engine beträgt die Mindestanzahl von Workern 1/15 des Werts --maxNumWorkers und zwar aufgerundet.

    Streamingpipelines werden mit einem festen Pool nichtflüchtiger Speicher bereitgestellt, deren Anzahl --maxNumWorkers entspricht. Dies ist beim Festlegen von --maxNumWorkers zu berücksichtigen. Stellen Sie außerdem sicher, dass der Wert eine ausreichende Anzahl von Laufwerken für die Pipeline umfasst.

    Nutzung und Preise

    Die Compute Engine-Nutzung richtet sich nach der durchschnittlichen Anzahl von Workern. Die Nutzung nichtflüchtiger Speicher basiert hingegen auf dem exakten Wert von --max_num_workers. Nichtflüchtige Speicher werden so neu verteilt, dass jeder Worker mit der gleichen Anzahl von Laufwerken verbunden ist.

    Im obigen Beispiel mit --max_num_workers=15 werden Ihnen zwischen 1 und 15 Compute Engine-Instanzen und genau 15 nichtflüchtige Speicher in Rechnung gestellt.

    Java: SDK 1.x

    Streamingpipeline manuell skalieren

    Bis das Autoscaling im Streamingmodus generell verfügbar ist, können Sie die Anzahl der Worker, die die Streamingpipeline ausführen, manuell mit der Aktualisierungsfunktion von Dataflow skalieren.

    Java: SDK 2.x

    Wenn Sie die Streamingpipeline während der Ausführung skalieren möchten, müssen Sie beim Start der Pipeline unbedingt folgende Ausführungsparameter festlegen:

    • Legen Sie --maxNumWorkers auf die maximale Anzahl von Workern fest, die für die Pipeline verfügbar sein sollen.
    • Legen Sie --numWorkers auf die anfängliche Anzahl von Workern fest, die die Pipeline zu Beginn der Ausführung verwenden soll.

    Sobald die Pipeline ausgeführt wird, können Sie sie aktualisieren und mit dem Parameter--numWorkers eine neue Anzahl von Workern festlegen. Der neue Wert für --numWorkers muss zwischen N und --maxNumWorkers liegen, wobei N gleich --maxNumWorkers/15 ist.

    Durch Aktualisieren der Pipeline wird der ausgeführte Job durch einen neuen Job mit der neuen Anzahl von Workern ersetzt. Alle mit dem vorherigen Job verbundenen Statusinformationen bleiben jedoch erhalten.

    Python

    Wenn Sie die Streamingpipeline während der Ausführung skalieren möchten, müssen Sie beim Start der Pipeline unbedingt folgende Ausführungsparameter festlegen:

    • Legen Sie --max_num_workers auf die maximale Anzahl von Workern fest, die für die Pipeline verfügbar sein sollen.
    • Legen Sie --num_workers auf die anfängliche Anzahl von Workern fest, die die Pipeline zu Beginn der Ausführung verwenden soll.

    Sobald die Pipeline ausgeführt wird, können Sie sie aktualisieren und mit dem Parameter--num_workers eine neue Anzahl von Workern festlegen. Der neue Wert für --num_workers muss zwischen N und --max_num_workers liegen, wobei N gleich --max_num_workers/15 ist.

    Durch Aktualisieren der Pipeline wird der ausgeführte Job durch einen neuen Job mit der neuen Anzahl von Workern ersetzt. Alle mit dem vorherigen Job verbundenen Statusinformationen bleiben jedoch erhalten.

    Java: SDK 1.x

    Dynamischer Work-Ausgleich

    Mit dem dynamischen Work-Ausgleich kann der Dataflow-Dienst Work dynamisch entsprechend den gegebenen Laufzeitbedingungen neu partitionieren. Diese Bedingungen können Folgendes umfassen:

    • Ungleichgewichte bei Work-Zuweisungen
    • Längere Verarbeitungszeiten der Worker als erwartet
    • Kürzere Verarbeitungszeiten der Worker als erwartet

    Der Dataflow-Dienst erkennt diese Bedingungen automatisch und kann Work nicht verwendeten oder nicht ausgelasteten Workern dynamisch neu zuweisen. Dadurch verringert sich die gesamte Verarbeitungszeit des Jobs.

    Beschränkungen

    Der dynamische Work-Ausgleich erfolgt nur, wenn der Dataflow-Dienst einige Eingabedaten parallel verarbeitet. Dies ist der Fall, wenn Daten aus einer externen Eingabequelle gelesen werden oder eine erfasste dazwischenliegende PCollection bzw. das Ergebnis einer Aggregation wie GroupByKey verwendet wird. Wenn eine größere Anzahl von Schritten im Job zusammengeführt wird, enthält der Job weniger dazwischenliegende PCollection-Vorgänge und der dynamische Work-Ausgleich beschränkt sich auf die Anzahl der Elemente in der materialisierten Quellsammlung PCollection. Wenn Sie gewährleisten möchten, dass der dynamische Work-Ausgleich auf eine bestimmte PCollection in der Pipeline angewendet werden kann, damit die dynamische Parallelität gewahrt bleibt, können Sie auf verschiedene Methoden zurückgreifen, um die Zusammenführung zu verhindern.

    Der dynamische Work-Ausgleich kann keine Daten neu parallelisieren, die feiner als ein einzelnes Dataset sind. Wenn die Daten einzelne Datensätze enthalten, die bei der Verarbeitung erhebliche Verzögerungen verursachen, kann sich die Jobausführung dennoch verzögern. Der Grund dafür ist, dass Dataflow einen einzelnen aktiven Datensatz nicht auf mehrere Worker verteilen kann.

    Java: SDK 2.x

    Wenn Sie für die Endausgabe der Pipeline eine feste Anzahl von Shards festgelegt haben, beispielsweise durch Schreiben der Daten mithilfe von TextIO.Write.withNumShards, wird die Parallelisierung entsprechend der ausgewählten Shard-Anzahl eingeschränkt.

    Python

    Wenn Sie eine feste Anzahl von Shards für die endgültige Ausgabe Ihrer Pipeline festgelegt haben, z. B. durch Schreiben von Daten mit beam.io.WriteToText(..., num_shards=...), begrenzt Dataflow die Parallelisierung anhand der Anzahl der ausgewählten Shards.

    Java: SDK 1.x

    Die Einschränkung aufgrund der festen Shard-Anzahl ist voraussichtlich nur temporär. Sie kann sich in zukünftigen Releases des Dataflow-Dienstes ändern.

    Benutzerdefinierte Datenquellen verwenden

    Java: SDK 2.x

    Wenn die Pipeline eine von Ihnen bereitgestellte benutzerdefinierte Datenquelle verwendet, müssen Sie die Methode splitAtFraction implementieren, damit die Quelle den dynamischen Work-Ausgleich unterstützt.

    Python

    Wenn die Pipeline eine von Ihnen bereitgestellte benutzerdefinierte Datenquelle verwendet, müssen Sie für RangeTracker try_claim, try_split, position_at_fraction und fraction_consumed implementieren, damit die Quelle den dynamischen Work-Ausgleich unterstützt.

    Weitere Informationen finden Sie in der API-Referenz zu "RangeTracker".

    Java: SDK 1.x

    Ressourcen nutzen und verwalten

    Der Dataflow-Dienst verwaltet Ressourcen in Google Cloud auf Jobbasis vollständig. Dies umfasst das Erstellen und Beenden von Compute Engine-Instanzen, die auch Worker oder VMs genannt werden, sowie den Zugriff auf die Cloud Storage-Buckets Ihres Projekts für E/A-Vorgänge und das Staging von temporären Dateien. Wenn die Pipeline jedoch mit Google Cloud-Datenspeichertechnologien wie BigQuery und Cloud Pub/Sub interagiert, müssen Sie die Ressourcen und Kontingente für diese Dienste verwalten.

    Dataflow verwendet einen vom Nutzer bereitgestellten Standort in Cloud Storage speziell für das Staging von Dateien. Sie können diesen Speicherort steuern und sollten darauf achten, dass er so lange erhalten bleibt, wie Jobs daraus lesen. Sie können einen Staging-Speicherort für mehrere Jobausführungen verwenden, da das SDK-interne Caching die Startzeit der Jobs verkürzen kann.

    Jobs

    Sie können bis zu 25 gleichzeitige Dataflow-Jobs pro Google Cloud-Projekt ausführen. Diese Beschränkung kann jedoch aufgehoben werden, indem Sie den Google Cloud-Support kontaktieren. Weitere Informationen finden Sie unter Kontingente.

    Der Dataflow-Dienst ist derzeit auf die Verarbeitung von JSON-Jobanfragen mit maximal 20 MB beschränkt. Die Größe der Jobanfrage ist mit der JSON-Darstellung der Pipeline verbunden. Eine größere Pipeline führt zu einer größeren Anfrage.

    Führen Sie die Pipeline zum Schätzen der Größe der JSON-Anfrage mit der folgenden Option aus:

    Java: SDK 2.x

    --dataflowJobFile=< path to output file >
    

    Python

    --dataflow_job_file=< path to output file >
    

    Java: SDK 1.x

    Mit diesem Befehl wird eine JSON-Darstellung des Jobs in eine Datei geschrieben. Die Größe der Anfrage kann anhand der Größe der serialisierten Datei geschätzt werden. Aufgrund von zusätzlichen Informationen in der Anfrage ist die tatsächliche Datei etwas größer.

    Weitere Informationen finden Sie auf der Seite zur Fehlerbehebung unter 413 Anfrageentität zu groß/Die Größe der serialisierten JSON-Darstellung der Pipeline überschreitet das zulässige Limit.

    Die Grafikgröße Ihres Jobs darf 10 MB außerdem nicht überschreiten. Weitere Informationen finden Sie auf der Seite zur Fehlerbehebung unter "Die Jobgrafik ist zu groß. Versuchen Sie es mit einer kleineren Jobgrafik noch einmal oder teilen Sie den Job in zwei oder mehr kleinere Jobs auf."

    Worker

    Der Dataflow-Dienst erlaubt aktuell maximal 1.000 Compute Engine-Instanzen pro Job. Bei Batchjobs ist der Standardmaschinentyp n1-standard-1. Bei Streamingjobs ist der Standardmaschinentyp für Streaming Engine-fähige Jobs n1-standard-2 und der Standardmaschinentyp für nicht-Streaming Engine-fähige Jobs n1-standard-4. Daher kann der Dataflow-Dienst beim Verwenden der Standardmaschinentypen pro Job bis zu 4.000 Kerne zuweisen. Wenn Sie für Ihren Job mehr Kerne benötigen, können Sie einen größeren Maschinentyp auswählen.

    Sie können jede der verfügbaren Compute Engine-Maschinentypfamilien sowie benutzerdefinierte Maschinentypen verwenden. Die besten Ergebnisse erzielen Sie mit n1-Maschinentypen. Maschinentypen mit gemeinsam genutztem Kern, beispielsweise Worker der Serien f1 und g1, werden im Rahmen des Service Level Agreements von Dataflow nicht unterstützt.

    In Dataflow wird nach der Anzahl der vCPUs und der Größe des Arbeitsspeichers (in GB) der Worker abgerechnet. Die Abrechnung erfolgt unabhängig von der Maschinentypfamilie. Sie können beim Erstellen der Pipeline einen Maschinentyp für die Pipeline angeben, indem Sie den entsprechenden Ausführungsparameter festlegen.

    Java: SDK 2.x

    Nutzen Sie zum Ändern des Maschinentyps die Option --workerMachineType.

    Python

    Nutzen Sie zum Ändern des Maschinentyps die Option --worker_machine_type.

    Java: SDK 1.x

    Ressourcenkontingent

    Der Dataflow-Dienst prüft, ob das Compute Engine-Ressourcenkontingent Ihres Google Cloud-Projekts zur Ausführung des Jobs ausreicht, um den Job zu starten und auf die maximale Anzahl von Worker-Instanzen zu skalieren. Ohne ein ausreichendes Ressourcenkontingent kann der Job nicht gestartet werden.

    Die Autoscaling-Funktion von Dataflow ist auf das für Ihr Projekt verfügbare Compute Engine-Kontingent beschränkt. Wenn der Job beim Start über ein ausreichendes Kontingent verfügt, aber ein anderer Job das restliche verfügbare Kontingent des Projekts verwendet, kann der erste Job zwar ausgeführt aber nicht vollständig skaliert werden.

    Der Dataflow-Dienst verwaltet jedoch keine Kontingenterhöhungen für Jobs, die das Ressourcenkontingent des Projekts übersteigen. Es liegt in Ihrer Verantwortung, jegliche zusätzlich erforderlichen Ressourcenkontingente anzufordern. Sie können hierfür die Google Cloud Console verwenden.

    Nichtflüchtige Speicherressourcen

    Der Dataflow-Dienst ist derzeit zum Ausführen eines Streamingjobs pro Worker-Instanz auf 15 nichtflüchtige Speicher beschränkt. Jeder nichtflüchtige Speicher ist lokal einer einzelnen virtuellen Compute Engine-Maschine zugeordnet. Der Job kann nicht mehr Worker als nichtflüchtige Speicher haben. Die Mindestzuweisung sieht ein Verhältnis von 1:1 zwischen Workern und Laufwerken vor.

    Bei Jobs, die auf Worker-VMs ausgeführt werden, beträgt die Standardgröße der nichtflüchtigen Speicher 250 GB im Batchmodus und 400 GB im Streamingmodus. Jobs, die Streaming Engine oder Dataflow Shuffle verwenden, werden im Dataflow-Dienst-Back-End ausgeführt und verwenden kleinere Laufwerke.

    Orte

    Der Dataflow-Dienst stellt Compute Engine-Ressourcen standardmäßig in der Zone us-central1-f der Region us-central1 bereit. Sie können diese Einstellung durch Angabe des Parameters --region überschreiben. Wenn Sie eine bestimmte Zone für Ihre Ressourcen verwenden müssen, geben Sie beim Erstellen der Pipeline den Parameter --zone an. Wir empfehlen jedoch, nur die Region ohne Zone anzugeben. Dadurch kann der Dataflow-Dienst automatisch die beste Zone innerhalb der Region anhand der bei der Joberstellungsanfrage verfügbaren Zonenkapazität auswählen. Weitere Informationen finden Sie in der Dokumentation zu regionalen Endpunkten.

    Streaming Engine

    Momentan führt der Dataflow-Pipeline-Runner die Schritte der Streamingpipeline vollständig auf Worker-VMs aus und beansprucht CPU-Leistung, Arbeitsspeicher und nichtflüchtigen Speicherplatz auf dem Worker. Streaming Engine von Dataflow verschiebt die Pipelineausführung aus den Worker-VMs in das Dataflow-Dienst-Back-End.

    Vorteile von Streaming Engine

    Das Streaming Engine-Modell bietet folgende Vorteile:

    • Reduzierung der verbrauchten CPU-, Arbeitsspeicher- und nichtflüchtigen Speicherressourcen auf den Worker-VMs Streaming Engine funktioniert am besten mit kleineren Worker-Maschinentypen (n1-standard-2 statt n1-standard-4) und benötigt nichtflüchtigen Speicher nur für ein kleines Worker-Bootlaufwerk, was die Ressourcen- und Kontingentnutzung reduziert.
    • Reaktionsschnelleres Autoscaling bei Schwankungen im eingehenden Datenvolumen: Streaming Engine ermöglicht eine reibungslosere und detailliertere Skalierung von Workern.
    • Bessere Unterstützung, da Sie Ihre Pipelines nicht neu bereitstellen müssen, um Dienstaktualisierungen anzuwenden.

    Die Reduzierung der Worker-Ressourcen ist größtenteils auf die Übertragung der Arbeit an den Dataflow-Dienst zurückzuführen. Daher ist die Nutzung von Streaming Engine mit einer Gebühr verbunden. Allerdings sind die Gesamtkosten für Dataflow-Pipelines mit Streaming Engine voraussichtlich ungefähr gleich hoch wie für Dataflow-Pipelines, die diese Option nicht verwenden.

    Streaming Engine verwenden

    Diese Funktion ist in allen Regionen verfügbar, in denen Dataflow unterstützt wird. Informationen zu den verfügbaren Speicherorten finden Sie unter Dataflow-Standorte.

    Java: SDK 2.x

    Geben Sie folgenden Parameter an, um Streaming Engine für Ihre Streamingpipelines zu verwenden:

    • --enableStreamingEngine, wenn Sie das Apache Beam SDK for Java ab Version 2.11.0 verwenden.
    • --experiments=enable_streaming_engine, wenn Sie das Apache Beam SDK for Java Version 2.10.0 verwenden.

    Wenn Sie Dataflow Streaming Engine für Ihre Pipeline verwenden, geben Sie den Parameter --zone nicht an. Legen Sie stattdessen den Parameter --region fest und geben Sie als Wert eine der Regionen an, in denen Streaming Engine derzeit verfügbar ist. Dataflow wählt die Zone in der von Ihnen angegebenen Region automatisch aus. Wenn Sie den Parameter --zone angeben und ihn auf eine Zone außerhalb der verfügbaren Regionen festlegen, meldet Dataflow einen Fehler.

    Da Streaming Engine am besten mit kleineren Worker-Maschinentypen funktioniert, empfiehlt sich die Angabe von --workerMachineType=n1-standard-2. Sie können auch --diskSizeGb=30 festlegen, da Streaming Engine nur für das Worker-Boot-Image und lokale Logs Speicherplatz benötigt. Diese Werte sind die Standardwerte.

    Python

    Streaming Engine ist bei neuen Dataflow-Streamingpipelines standardmäßig aktiviert, wenn die folgenden Bedingungen erfüllt sind:

    Wenn Sie Streaming Engine in Ihrer Python-Streamingpipeline deaktivieren möchten, geben Sie folgenden Parameter an:

    --experiments=disable_streaming_engine

    Wenn Sie Python 2 verwenden, müssen Sie Streaming Engine dennoch aktivieren, indem Sie den folgenden Parameter angeben:

    --enable_streaming_engine

    Wenn Sie Dataflow Streaming Engine in Ihrer Pipeline verwenden, geben Sie nicht den Parameter --zone an. Legen Sie stattdessen den Parameter --region fest und geben Sie als Wert eine der Regionen an, in denen Streaming Engine derzeit verfügbar ist. Dataflow wählt die Zone in der von Ihnen angegebenen Region automatisch aus. Wenn Sie den Parameter --zone angeben und ihn auf eine Zone außerhalb der verfügbaren Regionen festlegen, meldet Dataflow einen Fehler.

    Da Streaming Engine am besten mit kleineren Worker-Maschinentypen funktioniert, empfiehlt sich die Angabe von --machine_type=n1-standard-2. Sie können auch --disk_size_gb=30 festlegen, da Streaming Engine nur für das Worker-Boot-Image und lokale Logs Speicherplatz benötigt. Diese Werte sind die Standardwerte.

    Java: SDK 1.x

    Dataflow Shuffle

    Dataflow Shuffle ist der Basisvorgang hinter Cloud Dataflow-Transformationen wie GroupByKey, CoGroupByKey und Combine. Beim Shuffle-Vorgang von Dataflow werden Daten nach Schlüsseln auf skalierte, effiziente und fehlertolerante Weise partitioniert und gruppiert. Derzeit verwendet Dataflow eine Shuffle-Implementierung, die vollständig auf virtuellen Worker-Maschinen ausgeführt wird und CPU-Leistung, Arbeitsspeicher sowie nichtflüchtigen Speicherplatz auf dem Worker beansprucht. Das dienstbasierte Feature Dataflow Shuffle, das nur für Batchpipelines verfügbar ist, verschiebt den Shuffle-Vorgang aus den Worker-VMs in das Dataflow-Dienst-Back-End.

    Batchjobs verwenden standardmäßig Dataflow Shuffle.

    Vorteile von Dataflow Shuffle

    Der dienstbasierte Dataflow-Shuffle hat folgende Vorteile:

    • Schnellere Ausführungszeit von Batchpipelines für die meisten Pipeline-Jobtypen
    • Reduzierung der verbrauchten CPU-, Arbeitsspeicher- und nichtflüchtigen Speicherressourcen auf den Worker-VMs
    • Besseres Autoscaling, da VMs keine Shuffle-Daten mehr enthalten und daher früher verkleinert werden können.
    • Bessere Fehlertoleranz: Eine fehlerhafte VM mit Dataflow Shuffle-Daten hat nicht zur Folge, dass der gesamte Job fehlschlägt, wie es der Fall wäre, wenn dieses Feature nicht verwendet würde.

    Der Großteil der Reduzierung der Worker-Ressourcen ist auf die Übertragung des Shuffle-Aufwands auf den Dataflow-Dienst zurückzuführen. Daher ist die Nutzung von Dataflow Shuffle kostenpflichtig. Die Gesamtkosten für Dataflow-Pipelines, die die dienstbasierte Dataflow-Implementierung verwenden, sind jedoch wahrscheinlich kleiner als oder gleich den Kosten von Dataflow-Pipelines, die diese Option nicht verwenden.

    Bei den meisten Pipeline-Jobtypen wird erwartet, dass Dataflow Shuffle schneller ausgeführt wird als die Shuffle-Implementierung, die auf Worker-VMs ausgeführt wird. Die Ausführungszeiten sind jedoch von Ausführung zu Ausführung unterschiedlich. Wenn Sie eine Pipeline mit wichtigen Fristen ausführen, empfiehlt es sich, genügend zeitlichen Puffer vor der Frist einzuplanen. Ziehen Sie außerdem in Betracht, ein höheres Shuffle-Kontingent anzufordern.

    Hinweise zu Laufwerken

    Wenn Sie das dienstbasierte Feature "Dataflow Shuffle" verwenden, müssen Sie keine großen nichtflüchtigen Speicher an Ihre Worker-VMs anhängen. In Dataflow wird automatisch ein kleines Bootlaufwerk mit 25 GB angehängt. Aufgrund dieser geringen Laufwerkgröße sind jedoch folgende wichtige Aspekte bei Verwendung von Dataflow Shuffle zu beachten:

    • Eine Worker-VM verwendet einen Teil des 25 GB Speicherplatzes für das Betriebssystem, Binärdateien, Logs und Container. Jobs, die eine erhebliche Menge an Speicherplatz beanspruchen und die verbleibende Speicherkapazität überschreiten, schlagen möglicherweise fehl, wenn Sie Dataflow Shuffle verwenden.
    • Jobs, die viel Laufwerks-E/A-Leistung beanspruchen, werden möglicherweise aufgrund der geringen Laufwerksleistung langsamer. Weitere Informationen zu Leistungsunterschieden zwischen Laufwerkgrößen finden Sie auf der Seite zur Leistung nichtflüchtiger Speicher von Compute Engine.

    Wenn einer dieser Aspekte auf Ihren Job zutrifft, können Sie mit den Pipelineoptionen eine größere Laufwerkgröße festlegen.

    Dataflow Shuffle verwenden

    Diese Funktion ist in allen Regionen verfügbar, in denen Dataflow unterstützt wird. Informationen zu den verfügbaren Speicherorten finden Sie unter Dataflow-Standorte. Wenn Sie Dataflow Shuffle verwenden, müssen die Worker auch in derselben Region wie der regionale Endpunkt bereitgestellt werden.

    Java: SDK 2.x

    Batchjobs verwenden standardmäßig Dataflow Shuffle. Wenn Sie die Verwendung von Dataflow Shuffle deaktivieren möchten, geben Sie die folgende Pipelineoption an:
    --experiments=shuffle_mode=appliance.

    Wenn Sie Dataflow Shuffle für Ihre Pipeline verwenden, geben Sie nicht die Pipelineoptionen zone an. Legen Sie stattdessen den Parameter region fest und geben Sie als Wert eine der Regionen an, in denen Shuffle derzeit verfügbar ist. Dataflow wählt automatisch die Zone in der von Ihnen angegebenen Region aus. Wenn Sie die Pipelineoption zone angeben und sie auf eine Zone außerhalb der verfügbaren Regionen festlegen, meldet Dataflow einen Fehler. Wenn Sie eine inkompatible Kombination aus region und zone festlegen, kann Ihr Job Dataflow Shuffle nicht verwenden.

    Python

    Batchjobs verwenden standardmäßig Dataflow Shuffle. Wenn Sie die Verwendung von Dataflow Shuffle deaktivieren möchten, geben Sie die folgende Pipelineoption an:
    --experiments=shuffle_mode=appliance.

    Wenn Sie Dataflow Shuffle für Ihre Pipeline verwenden, geben Sie nicht die Pipelineoptionen zone an. Legen Sie stattdessen den Parameter region fest und geben Sie als Wert eine der Regionen an, in denen Shuffle derzeit verfügbar ist. Dataflow wählt automatisch die Zone in der von Ihnen angegebenen Region aus. Wenn Sie die Pipelineoption zone angeben und sie auf eine Zone außerhalb der verfügbaren Regionen festlegen, meldet Dataflow einen Fehler. Wenn Sie eine inkompatible Kombination aus region und zone festlegen, kann Ihr Job Dataflow Shuffle nicht verwenden.

    Java: SDK 1.x

    Flexible Ressourcenplanung für Dataflow

    Dataflow FlexRS reduziert die Kosten für die Batchverarbeitung. Dazu werden erweiterte Planungsverfahren, der Dataflow Shuffle-Dienst sowie eine Kombination aus VM-Instanzen auf Abruf und normalen VMs verwendet. Durch das parallele Ausführen von VMs auf Abruf und normalen VMs wird die Nutzerfreundlichkeit verbessert, wenn Compute Engine bei einem Systemereignis Instanzen von VMs auf Abruf beendet. FlexRS gewährleistet, dass die Pipeline weiter verarbeitet wird und keine geleistete Arbeit verloren geht, wenn VMs auf Abruf von Compute Engine vorzeitig beendet werden. Weitere Informationen zu FlexRS finden Sie unter Flexible Ressourcenplanung in Dataflow verwenden.

    Dataflow Runner v2

    Der aktuelle Dataflow-Runner für die Produktion verwendet beim Ausführen von Apache Beam-Pipelines sprachspezifische Worker. Zum Verbessern der Skalierbarkeit, Allgemeingültigkeit, Erweiterbarkeit und Effizienz wird der Dataflow-Runner auf eine stärker dienstbasierte Architektur umgestellt. Zu diesen Änderungen gehört auch eine effizientere und portablere Worker-Architektur, die mit dem Shuffle-Dienst und Streaming Engine zusammengefasst wurde.

    Der neue Dataflow-Runner „Dataflow Runner v2“ ist jetzt der Standard für Python-Streaming-Pipelines (Version 2.21.0 oder höher) und wird im Februar 2021 standardmäßig für alle neuen Python-Batchpipelines (Version 2.21.0 oder höher) eingeführt. Sie müssen keine Änderungen am Pipeline-Code vornehmen, um diese neue Architektur nutzen zu können.

    Vorteile von Dataflow Runner v2

    Neue Features sind nur in Dataflow Runner v2 verfügbar, angefangen mit Python-Streaming- und -Batchpipelines. Darüber hinaus kann die verbesserte Effizienz der Architektur von Dataflow Runner v2 zu Leistungsverbesserungen bei Ihren Dataflow-Jobs führen.

    Möglicherweise verringert sich Ihr Rechnungsbetrag, wenn Sie Dataflow Runner v2 verwenden. Das Abrechnungsmodell für Dataflow Runner v2 steht jedoch noch nicht endgültig fest. Daher kann es sein, dass sich der Rechnungsbetrag wieder auf das ungefähre aktuelle Niveau erhöht, wenn der neue Runner für alle Pipelines aktiviert wurde.

    Mit dem Dataflow Runner v2 können Sie auch einen Python-Container erstellen, wodurch sich die VM-Startzeiten und die Autoscaling-Leistung verbessern lassen. Aktivieren Sie zum Testen dieses experimentellen Features die Cloud Build API in Ihrem Projekt und reichen Sie die Pipeline mit dem folgenden Parameter ein:
    --prebuild_sdk_container_engine=cloud_build.

    Sobald Ihr Job abgeschlossen oder gestoppt wurde, können Sie das vordefinierte Image aus Container Registry entfernen. Die Image-URL finden Sie in der Dataflow-Monitoring-UI unter Pipeline-Optionen.

    Dataflow Runner v2 verwenden

    Dataflow Runner v2 ist in Regionen mit regionalen Dataflow-Endpunkten verfügbar. Während dies eingeführt wird, können Sie, wenn Sie Dataflow Runner v2 ausprobieren möchten, das Folgende verwenden:

    Java: SDK 2.x

    Dataflow Runner v2 ist derzeit nicht für Java verfügbar.

    Python

    Dataflow Runner v2 erfordert Streaming Engine für Streaming-Jobs und Dataflow Shuffle für Batchjobs. Geben Sie den folgenden Parameter an, um diese für die entsprechenden Jobs zu aktivieren:
    --experiments=use_runner_v2

    Debugging von Dataflow Runner v2-Jobs

    Wenn Sie mit Dataflow Runner v2 Jobs debuggen möchten, sollten Sie die Standardschritte zum Debuggen ausführen. Beachten Sie jedoch Folgendes, wenn Sie Dataflow Runner v2 verwenden:

    • Dataflow Runner v2-Jobs führen auf der Worker-VM zwei Arten von Prozessen aus: einen SDK-Prozess und den Runner-Nutzungsprozess. Abhängig von der Pipeline und dem VM-Typ können ein oder mehrere SDK-Prozesse existieren, aber es gibt nur einen Runner-Nutzungsprozess pro VM.
    • Mit SDK-Prozessen werden Nutzercode und andere sprachspezifische Funktionen ausgeführt, während alles andere vom Runner-Nutzungsprozess verwaltet wird.
    • Der Runner-Nutzungsprozess wartet, bis alle SDK-Prozesse eine Verbindung zu ihm hergestellt haben, bevor er von Dataflow Arbeit anfordert.
    • Jobs können sich verzögern, wenn die Worker-VM beim Starten des SDK-Prozesses Abhängigkeiten herunterlädt und installiert. Wenn bei einem SDK-Prozess Probleme auftreten, z. B. beim Starten oder beim Installieren von Bibliotheken, meldet der Worker einen fehlerhaften Status. Wenn die Startzeiten erhöht sind, aktivieren Sie die Cloud Build API in Ihrem Projekt und senden Sie die Pipeline mit dem folgenden Parameter:
      --prebuild_sdk_container_engine=cloud_build.
    • Worker-VM-Logs, die über den Log-Explorer oder die Dataflow-Monitoring-Oberfläche verfügbar sind, schließen Logs aus dem Runner-Nutzungsprozess sowie Logs aus den SDK-Prozessen ein.
    • Zur Diagnose von Problemen mit Ihrem Nutzercode müssen Sie die Worker-Logs aus den SDK-Prozessen prüfen. Wenn Sie in den Runner-Nutzungslogs Fehler finden, wenden Sie sich an den Support, um sie zu melden.