Java-Aufgabenmuster

Die E-Commerce-Beispielanwendung zeigt Best Practices für die Verwendung von Dataflow zur Implementierung von Streamingdatenanalysen und Echtzeit-KI. Das Beispiel enthält Aufgabenmuster, die die beste Möglichkeit darstellen, Java-Programmieraufgaben auszuführen. Diese Aufgaben sind häufig erforderlich, um E-Commerce-Anwendungen zu erstellen.

Die Anwendung enthält die folgenden Java-Aufgabenmuster:

Über Apache Beam-Schemas mit strukturierten Daten arbeiten

Sie können Apache Beam-Schemas verwenden, um die Verarbeitung strukturierter Daten zu vereinfachen.

Beim Umwandeln von Objekten in Zeilen können Sie sehr sauberen Java-Code erzeugen. Dies erleichtert die Erstellung von einem gerichteten azyklischen Graph (DAG). Sie können auch auf Objekteigenschaften als Felder in den von Ihnen erstellten Analyseanweisungen verweisen, anstatt Methoden aufrufen zu müssen.

Beispiel

CountViewsPerProduct.java

JSON-Daten mithilfe von JsonToRow konvertieren

Es ist üblich, JSON-Strings in Dataflow zu verarbeiten. Beispielsweise werden JSON-Strings beim Streamen von Clickstream-Informationen verarbeitet, die von Webanwendungen erfasst wurden. Zur Verarbeitung von JSON-Strings müssen Sie diese während der Pipelineverarbeitung entweder in Zeilen oder alte Java-Objekte (POJOs) konvertieren.

Sie können die in Apache Beam integrierte Transformation JsonToRow verwenden, um JSON-Strings in Zeilen zu konvertieren. Wenn Sie jedoch eine Warteschlange für die Verarbeitung nicht erfolgreicher Nachrichten abrufen möchten, müssen Sie diese separat erstellen. Weitere Informationen finden Sie unter Nicht verarbeitbare Daten zur weiteren Analyse in einer Warteschlange bereitstellen.

Wenn Sie mithilfe von AutoValue einen JSON-String in einen POJO konvertieren möchten, registrieren Sie mithilfe der Annotation @DefaultSchema(AutoValueSchema.class) ein Schema für den Typ und verwenden Sie dann die Konvertierungs-Dienstprogrammklasse. Sie erhalten dann Code ähnlich dem folgenden:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

Weitere Informationen, einschließlich der verschiedenen Java-Typen, von denen Sie Schemas ableiten können, finden Sie unter Schemas erstellen.

Wenn JsonToRow mit Ihren Daten nicht funktioniert, ist Gson eine Alternative. Gson geht bei der Standardverarbeitung von Daten relativ locker vor, sodass Sie möglicherweise eine zusätzliche Validierung in den Prozess zur Datenkonvertierung integrieren müssen.

Beispiele

POJOs mithilfe des AutoValue-Codegenerators generieren

Apache Beam-Schemas sind oft die beste Methode, um Objekte in einer Pipeline darzustellen, da sie Ihnen die Arbeit mit strukturierten Daten ermöglichen. Gelegentlich ist jedoch ein einfaches altes Java-Objekt (POJO) erforderlich, z. B. beim Umgang mit Schlüssel/Wert-Paarobjekten oder dem Objektstatus. Bei der manuellen Erstellung von POJOs müssen Sie Überschreibungen für die Methoden equals() und hashcode() codieren, was zeitaufwendig und fehleranfällig sein kann. Falsche Überschreibungen können zu einem inkonsistenten Verhalten der Anwendung oder zu Datenverlust führen.

Verwenden Sie den Klassen-Builder AutoValue, um POJOs zu generieren. Mit dieser Option wird sichergestellt, dass die erforderlichen Überschreibungen verwendet werden und potenzielle Fehler vermieden werden. AutoValue wird viel innerhalb der Apache Beam-Codebasis verwendet. Wenn Sie Apache Beam-Pipelines mithilfe von Java auf Dataflow entwickeln möchten, ist es deshalb hilfreich, mit diesem Klassen-Builder vertraut zu sein.

Sie können AutoValue auch mit Apache Beam-Schemas verwenden, wenn Sie eine @DefaultSchema(AutoValueSchema.class)-Annotation hinzufügen. Weitere Informationen finden Sie unter Schemas erstellen.

Weitere Informationen zu AutoValue finden Sie unter Warum AutoValue? und AutoValue-Dokumente.

Beispiel

Clickstream.java

Nicht verarbeitbare Daten zur weiteren Analyse in die Warteschlange stellen

In Produktionssystemen ist es wichtig, mit problematischen Daten richtig umzugehen. Validieren und korrigieren Sie Daten nach Möglichkeit als Stream. Wenn jedoch keine Korrektur möglich ist, protokollieren Sie den Wert zur späteren Analyse in einer unverarbeiteten Nachrichtenwarteschlange, manchmal auch als Warteschlange für unzustellbare Nachrichten bezeichnet. Probleme treten häufig auf, wenn Sie Daten von einem Format in ein anderes konvertieren, z. B. wenn Sie JSON-Strings in Zeilen konvertieren.

Um dieses Problem zu beheben, verwenden Sie eine Transformation mit mehreren Ausgaben, um die Elemente, die die nicht verarbeiteten Daten enthalten, zur weiteren Analyse in eine andere PCollection zu transferieren. Diese Verarbeitung ist ein gängiger Vorgang, den Sie möglicherweise an vielen Stellen in einer Pipeline verwenden möchten. Die Transformation sollte so allgemein gestaltet sein, dass sie an verschiedenen Stellen verwendet werden kann. Erstellen Sie zuerst ein Fehlerobjekt, um gängige Attribute, einschließlich der Originaldaten, zusammenzufassen. Erstellen Sie als Nächstes eine Senkentransformation mit mehreren Optionen für das Ziel.

Beispiele

Transformationen der Datenvalidierung seriell anwenden

Daten, die von externen Systemen erhoben werden, müssen oft bereinigt werden. Strukturieren Sie Ihre Pipeline so, dass problematische Daten nach Möglichkeit als Stream korrigiert werden können. Senden Sie die Daten bei Bedarf in eine Warteschlange zur weiteren Analyse.

Da eine einzelne Nachricht durch mehrere Probleme beeinträchtigt werden kann, die behoben werden müssen, planen Sie den erforderlichen gerichteten azyklischen Graphen (Directed Acyclic Graph, DAG) ein. Wenn ein Element Daten mit mehreren Fehlern enthält, müssen Sie dafür sorgen, dass das Element die entsprechenden Transformationen durchläuft.

Stellen Sie sich beispielsweise ein Element mit den folgenden Werten vor, von denen keiner null sein sollte:

{"itemA": null,"itemB": null}

Achten Sie darauf, dass das Element Transformationen durchläuft, durch die beide potenziellen Probleme behoben werden:

badElements.apply(fixItemA).apply(fixItemB)

Die Pipeline umfasst möglicherweise mehrere serielle Schritte, aber durch Fusion kann der Verarbeitungsaufwand minimiert werden.

Beispiel

ValidateAndCorrectCSEvt.java

DoFn.StartBundle für Micro-Batch-Aufrufe von externen Diensten verwenden

Möglicherweise müssen Sie externe APIs als Teil Ihrer Pipeline aufrufen. Da eine Pipeline Arbeit auf viele Computingressourcen verteilt, kann ein externer Dienstendpunkt überlastet werden, wenn für jedes Element im System ein einzelner Aufruf ausgeführt wird. Dieses Problem tritt besonders häufig auf, wenn keine Funktionen zur Reduzierung angewendet wurden.

Um dieses Problem zu vermeiden, führen Sie Batch-Aufrufe an externe Systeme durch.

Sie können Batchaufrufe mit einer GroupByKey-Transformation oder der Apache Beam Timer API ausführen. Beide Ansätze erfordern jedoch Shuffling. Hierdurch entsteht allerdings ein gewisser Verarbeitungsaufwand. Außerdem ist eine magische Zahl zur Bestimmung des Schlüsselbereichs erforderlich.

Verwenden Sie stattdessen die Lebenszykluselemente StartBundle und FinishBundle, um Ihre Daten im Batch zu verarbeiten. Bei diesen Optionen ist kein Shuffling erforderlich.

Ein kleiner Nachteil dieser Option ist, dass Bundle-Größen dynamisch durch die Implementierung des Runners bestimmt werden. Dabei hängt die Größe davon ab, was derzeit in der Pipeline und dessen Workern geschieht. Im Stream-Modus sind Bundles oft klein. Die Dataflow-Bündelung wird von Backend-Faktoren wie der Fragmentierung der Nutzung, der Menge von verfügbaren Daten für einen bestimmten Schlüssel und dem Durchsatz der Pipeline beeinflusst.

Beispiel

EventItemCorrectionService.java

Geeignetes Nebeneingabemuster für die Datenanreicherung verwenden

Bei Anwendungen für Streaminganalysen werden Daten häufig mit zusätzlichen Informationen angereichert, die für eine weitere Analyse nützlich sein könnten. Wenn Sie beispielsweise die storeId für eine Transaktion haben, können Sie Informationen zum Geschäftsstandort hinzufügen. Diese zusätzlichen Informationen werden häufig hinzugefügt, indem ein Element aus einer Suchtabelle hinzugefügt wird.

Bei Suchtabellen, die sich sowohl langsam ändern als auch kleiner sind, funktioniert das Verschieben der Tabelle als Singleton-Klasse, die die Map<K,V>-Schnittstelle in die Pipeline implementiert. Durch diese Option vermeiden Sie, dass jedes Element einen API-Aufruf für die Suche durchführt. Nachdem Sie eine Kopie einer Tabelle in die Pipeline aufgenommen haben, müssen Sie diese regelmäßig aktualisieren, damit sie aktuell bleibt.

Zur Verarbeitung von sich langsam aktualisierenden Nebeneingaben verwenden Sie die Nebeneingabemuster von Apache Beam.

Caching

Nebeneingaben werden in den Speicher geladen und werden daher automatisch im Cache gespeichert.

Sie können die Größe des Caches mit der Option --setWorkerCacheMb festlegen.

Sie können den Cache auf DoFn Instanzen freigeben und externe Trigger zum Aktualisieren des Caches verwenden.

Beispiel

SlowMovingStoreLocationDimension.java