Genau einmal in Dataflow

Dataflow unterstützt die genau einmalige Verarbeitung von Datensätzen. Auf dieser Seite wird erläutert, wie Dataflow eine genau einmalige Verarbeitung implementiert und gleichzeitig eine niedrige Latenz gewährleistet.

Überblick

Batchpipelines verwenden immer eine genau einmalige Verarbeitung. Streamingpipelines verwenden standardmäßig die genau einmalige Verarbeitung, können aber auch die mindestens einmalige Verarbeitung verwenden.

Die genau einmalige Verarbeitung bietet Garantien für die Ergebnisse der Verarbeitung von Datensätzen, einschließlich der Ergebnisse aus jeder Pipelinephase. Insbesondere gewährleistet Dataflow für jeden Datensatz, der von einer Quelle an die Pipeline oder in einer Phase aus einer vorherigen Phase ankommt, Folgendes:

  • Der Datensatz wird verarbeitet und geht nicht verloren.
  • Alle Verarbeitungsergebnisse, die in der Pipeline verbleiben, werden höchstens einmal angezeigt.

Mit anderen Worten: Datensätze werden mindestens einmal verarbeitet und die Ergebnisse werden genau einmal übergeben.

Die genau einmalige Verarbeitung sorgt dafür, dass die Ergebnisse genau sind, ohne doppelte Datensätze in der Ausgabe. Dataflow ist so optimiert, dass die Latenz minimiert wird, während die "Genau einmal"-Semantik beibehalten wird. Die genau einmalige Verarbeitung verursacht jedoch weiterhin Kosten für die Deduplizierung. Für Anwendungsfälle, in denen doppelte Datensätze toleriert werden, können Sie häufig die Kosten senken und die Latenz verbessern, indem Sie den "Mindestens einmal"-Modus aktivieren. Weitere Informationen zur Auswahl zwischen genau einmaligem und mindestens einmaligen Streaming finden Sie unter Pipeline-Streamingmodus festlegen.

Verspätete Daten

Die genau einmalige Verarbeitung gewährleistet die Genauigkeit der Pipeline: Wenn die Pipeline einen Datensatz verarbeitet, sorgt Dataflow dafür, dass der Datensatz in der Ausgabe angezeigt wird und der Datensatz nicht dupliziert wird. “

In einer Streamingpipeline kann die genau einmalige Verarbeitung jedoch nicht garantieren, dass die Ergebnisse vollständig sind, da Datensätze spät eingehen können. Angenommen, Ihre Pipeline führt eine Aggregation über ein Zeitfenster wie Count aus. Bei der genau einmaligen Verarbeitung ist das Ergebnis für die Datensätze, die zeitnah im Fenster eingehen, korrekt, aber späte Datensätze können gelöscht werden.

Im Allgemeinen gibt es keine Möglichkeit, die Vollständigkeit in einer Streaming-Pipeline zu garantieren, da theoretische Datensätze willkürlich spät eintreffen können. In dem begrenzenden Fall müssen Sie endlos warten, bis Sie ein Ergebnis erhalten. In der Praxis können Sie mit Apache Beam den Schwellenwert für das Löschen von verspäteten Daten und den Zeitpunkt konfigurieren, zu dem aggregierte Ergebnisse ausgegeben werden sollen. Weitere Informationen finden Sie in der Apache Beam-Dokumentation unter Wasserzeichen und verspätete Daten.

Nebeneffekte

Es kann nicht garantiert werden, dass Nebeneffekte eine „Genau einmal“-Semantik haben. Dazu gehört das Schreiben der Ausgabe in einen externen Speicher, es sei denn, die Senke implementiert auch eine "Genau einmal"-Semantik.

Insbesondere garantiert Dataflow nicht, dass jeder Datensatz jede Transformation genau einmal durchläuft. Aufgrund von Wiederholungsversuchen oder Worker-Fehlern kann Dataflow einen Datensatz mehrmals oder sogar gleichzeitig auf mehreren Workern durch eine Transformation senden.

Im Rahmen der genau einmaligen Verarbeitung dedupliziert Dataflow die Ausgaben. Wenn der Code in einer Transformation jedoch Nebeneffekte hat, können diese Effekte mehrmals auftreten. Wenn beispielsweise eine Transformation einen Remotedienstaufruf durchführt, kann dieser Aufruf für denselben Datensatz mehrmals erfolgen. Nebeneffekte können in manchen Situationen sogar zu Datenverlust führen. Angenommen, eine Transformation liest eine Datei, um die Ausgabe zu erzeugen, und löscht die Datei dann sofort, ohne auf das Committen der Ausgabe zu warten. Wenn beim Commit des Ergebnisses ein Fehler auftritt, wiederholt Dataflow die Transformation, aber diese kann die gelöschte Datei jetzt nicht lesen.

Logging

Die Logausgabe der Verarbeitung gibt an, dass die Verarbeitung erfolgt ist, gibt jedoch nicht an, ob ein Commit für die Daten durchgeführt wurde. Deshalb können Logdateien darauf hinweisen, dass die Daten mehrmals verarbeitet wurden, obwohl die Ergebnisse der verarbeiteten Daten nur einmal im nichtflüchtigen Speicher übergeben werden. Darüber hinaus spiegeln Logs nicht immer verarbeitete und übergebene Daten wider. Logs können aufgrund von Drosselung oder aufgrund anderer Probleme mit dem Logging-Dienst gelöscht werden.

Genau einmaliges Streaming

In diesem Abschnitt wird erläutert, wie Dataflow eine genau einmalige Verarbeitung für Streamingjobs implementiert, einschließlich der Art und Weise, wie Dataflow Komplexitäten wie nicht deterministische Verarbeitung, verspätete Daten und benutzerdefinierten Code verwaltet.

Dataflow-Streaming-Shuffle

Streaming-Dataflow-Jobs werden auf vielen verschiedenen Workern parallel ausgeführt, indem jedem Worker Arbeitsbereiche zugewiesen werden. Obwohl sich Zuweisungen im Laufe der Zeit als Reaktion auf Worker-Fehler, Autoscaling oder andere Ereignisse ändern können, sind nach jeder GroupByKey-Transformation alle Datensätze mit demselben Schlüssel auf demselben Worker verarbeitet. Die Transformation GroupByKey wird häufig von zusammengesetzten Transformationen wie Count, FileIO usw. verwendet. Damit Datensätze für einen bestimmten Schlüssel immer auf demselben Worker enden, verteilt Dataflow-Worker Daten mithilfe von Remote-Prozeduraufrufen (RPCs) zwischen ihnen.

Damit Datensätze nicht während des Shuffles verloren gehen, verwendet Dataflow eine vorgelagerte Sicherung. Bei der vorgelagerten Sicherung wiederholt der Worker, der die Datensätze sendet, RPCs, bis er eine positive Bestätigung des Datensatzes empfängt. Die Nebeneffekte der Verarbeitung des Datensatzes werden nachgelagert auf nichtflüchtigen Speicher übertragen. Wenn der Worker, der die Datensätze sendet, nicht mehr verfügbar ist, wiederholt Dataflow die RPCs, wodurch jeder Datensatz mindestens einmal zugestellt wird.

Da durch diese Wiederholungsversuche Duplikate entstehen können, wird jede Nachricht mit einer eindeutigen ID gekennzeichnet. Jeder Empfänger speichert einen Katalog aller IDs, die bereits erfasst und verarbeitet wurden. Wenn ein Datensatz empfangen wird, sucht Dataflow nach seiner ID im Katalog. Wird die ID gefunden, wurde der Datensatz bereits empfangen und übergeben und wird als Duplikat gelöscht. Damit die Datensatz-IDs stabil sind, wird jede Ausgabe von Schritt zu Schritt bis zum Speicher geprüft. Wenn dieselbe Nachricht aufgrund wiederholter RPC-Aufrufe mehrmals gesendet wird, wird die Nachricht nur einmal an den Speicher übergeben.

Niedrige Latenz sicherstellen

Für eine genau einmalige Verarbeitung muss E/A reduziert werden, insbesondere durch Verhindern der E/A bei jedem Datensatz. Um dieses Ziel zu erreichen, verwendet Dataflow Bloom-Filter und die automatische Speicherbereinigung.

Bloom-Filter

Bloom-Filter sind kompakte Datenstrukturen, die schnelle Prüfungen der Mitgliedschaft ermöglichen. In Dataflow behält jeder Worker einen Bloom-Filter jeder erkannten ID bei. Wenn eine neue Datensatz-ID eingeht, sucht der Worker die ID im Filter. Wenn der Filter „false“ zurückgibt, ist dieser Eintrag kein Duplikat und der Worker sucht die ID im stabilen Speicher nicht.

Dataflow speichert eine Reihe von rollierenden Bloom-Filtern nach Zeitgruppe. Wenn ein Datensatz eingeht, wählt Dataflow den entsprechenden zu prüfenden Filter basierend auf dem Systemzeitstempel aus. Dieser Schritt verhindert, dass die Bloom-Filter gesättigt sind, wenn Filter automatisch bereinigt werden, und begrenzt die Datenmenge, die beim Start gescannt werden muss.

Automatische Speicherbereinigung

Damit der Speicher nicht mit Datensatz-IDs gefüllt wird, verwendet Dataflow die automatische Speicherbereinigung, um alte Datensätze zu entfernen. Dataflow verwendet den Systemzeitstempel, um ein Wasserzeichen für die automatische Speicherbereinigung zu berechnen.

Dieses Wasserzeichen basiert auf der Menge der physischen Zeit, die in einer bestimmten Phase aufgewendet wird. Sie enthält daher auch Informationen darüber, welche Teile der Pipeline langsam sind. Diese Metadaten sind die Grundlage für den Systemverzögerungsmesswert, der auf der Dataflow-Monitoring-Oberfläche angezeigt wird.

Wenn ein Datensatz mit einem Zeitstempel eingeht, der älter als das Wasserzeichen ist, und wenn die IDs für diese Zeit bereits automatisch bereinigt wurden, wird der Eintrag ignoriert. Da das niedrige Wasserzeichen, das die automatische Speicherbereinigung auslöst, erst voranschreitet, wenn Datensätze zugestellt werden, sind diese spät ankommenden Datensätze Duplikate.

Nicht deterministische Quellen

Dataflow verwendet das Apache Beam SDK, um Daten in Pipelines zu lesen. Wenn die Verarbeitung fehlschlägt, kann Dataflow Lesevorgänge aus einer Quelle wiederholen. In diesem Fall muss Dataflow sicherstellen, dass jeder von einer Quelle erzeugte eindeutige Datensatz genau einmal aufgezeichnet wird. Bei deterministischen Quellen wie Pub/Sub Lite oder Kafka werden Datensätze basierend auf einem aufgezeichneten Offset gelesen, sodass der Schritt nicht erforderlich ist.

Da Dataflow Datensatz-IDs nicht automatisch zuweisen kann, müssen nicht deterministische Quellen Dataflow mitteilen, welche Datensatz-IDs vorhanden sind, um Duplikate zu vermeiden. Wenn eine Quelle für jeden Datensatz eindeutige IDs bereitstellt, verwendet der Connector ein Shuffle in der Pipeline, um Duplikate zu entfernen. Datensätze mit derselben ID werden herausgefiltert. Ein Beispiel dafür, wie Dataflow eine genau einmalige Verarbeitung implementiert, wenn Pub/Sub als Quelle verwendet wird, finden Sie im Abschnitt Effiziente Deduplizierung auf der Seite „Streaming mit Pub/Sub“.

Wenn Sie benutzerdefinierte DoFns als Teil Ihrer Pipeline ausführen, garantiert Dataflow nicht, dass dieser Code nur einmal pro Datensatz ausgeführt wird. Um eine mindestens einmalige Verarbeitung bei Worker-Ausfällen zu gewährleisten, kann Dataflow einen bestimmten Datensatz mehrmals ausführen oder denselben Datensatz gleichzeitig auf mehreren Workern ausführen. Wenn Sie in Ihre Pipeline Code einfügen, mit dem beispielsweise ein externer Dienst kontaktiert wird, können die Aktionen für einen bestimmten Datensatz mehr als einmal ausgeführt werden.

Verwenden Sie die Prüfpunktausführung, um die nicht deterministische Verarbeitung effektiv deterministisch zu machen. Wenn Sie Prüfpunktausführung verwenden, wird jede Ausgabe einer Transformation mit ihrer eindeutigen ID in den stabilen Speicher übertragen, bevor sie an die nächste Phase gesendet wird. Die Wiederholung der Shuffle-Zustellung von Dataflow leitet die Prüfpunktausführung weiter. Obwohl Ihr Code mehrmals ausgeführt werden kann, sorgt Dataflow dafür, dass nur die Ausgabe dieser Ausführungen gespeichert wird. Dataflow verwendet einen konsistenten Speicher, der verhindert, dass Duplikate in den stabilen Speicher geschrieben werden.

Genau einmalige Ausgabe

Das Apache Beam SDK enthält integrierte Senken, die darauf ausgelegt sind, dass keine Duplikate erzeugt werden. Verwenden Sie nach Möglichkeit eine dieser integrierten Senken.

Wenn Sie eine eigene Senke schreiben müssen, empfiehlt es sich, das Funktionsobjekt idempotent zu machen, damit es und die Wiederholungsversuche so oft wie nötig ausführen, ohne unbeabsichtigte Nebenwirkungen zu verursachen. Dennoch sind einige Komponenten der Transformation, die die Funktionalität der Senke implementieren, oft nicht deterministisch und können sich bei einer Wiederholung ändern.

Beispielsweise kann bei der Fensteraggregation eine Reihe von Datensätzen im Fenster nicht deterministisch sein. Genauer gesagt kann das Fenster versuchen, mit den Elementen „e0”, „e1” und „e2” auszulösen. Der Worker kann abstürzen, bevor er die Fensterverarbeitung festschreibt, aber nicht, bevor diese Elemente als Nebeneffekt gesendet werden. Wenn der Worker neu gestartet wird, wird das Fenster noch einmal ausgelöst und ein spätes Element e3 erscheint. Da dieses Element ankommt, bevor das Fenster übergeben wird, wird es nicht als späte Daten gezählt, sodass DoFn mit den Elementen e0, e1, e2 und e3 noch einmal aufgerufen wird. Diese Elemente werden dann an den Nebeneffektvorgang gesendet. Idempotenz ist in diesem Szenario nicht hilfreich, da jedes Mal verschiedene logische Datensätze gesendet werden.

Verwenden Sie die integrierte Reshuffle-Transformation, um Nicht-Bestimmtheit in Dataflow zu beheben. Wenn Dataflow Daten nach dem Zufallsprinzip umverteilt, werden die Daten dauerhaft geschrieben, sodass alle nicht deterministisch generierten Elemente stabil sind, wenn Vorgänge nach dem Shuffle wiederholt werden. Die Verwendung der Reshuffle-Transformation garantiert, dass nur eine Version der DoFn-Ausgabe eine Shuffle-Grenze überschreiten kann. Das folgende Muster sorgt dafür, dass der Nebenwirkungsvorgang immer einen deterministischen Datensatz empfängt:

c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
 .apply(GroupByKey.<..>.create())
 .apply(new PrepareOutputData())
 .apply(Reshuffle.<..>of())
 .apply(WriteToSideEffect());

Damit der Dataflow-Runner weiß, dass Elemente vor der Ausführung eines DoFn stabil sein müssen, fügen Sie dem DoFn die Annotation RequiresStableInput hinzu.

Weitere Informationen