Java-Aufgabenmuster

Die E-Commerce-Beispielanwendung zeigt Best Practices für die Verwendung von Dataflow zur Implementierung von Streamingdatenanalysen und Echtzeit-KI. Das Beispiel enthält Aufgabenmuster, die die beste Möglichkeit zum Ausführen von Java-Programmieraufgaben zeigen. Diese Aufgaben werden häufig zum Erstellen von E-Commerce-Anwendungen benötigt.

Die Anwendung enthält die folgenden Java-Aufgabenmuster:

Über Apache Beam-Schemas mit strukturierten Daten arbeiten

Sie können Apache Beam-Schemas verwenden, um die Verarbeitung strukturierter Daten zu vereinfachen.

Beim Umwandeln von Objekten in Zeilen können Sie sehr sauberen Java-Code erzeugen. Dies erleichtert die Erstellung von einem gerichteten azyklischen Graph (DAG). Sie können auch auf Objekteigenschaften als Felder in den von Ihnen erstellten Analyseanweisungen verweisen, anstatt Methoden aufrufen zu müssen.

Beispiel

CountViewsPerProduct.java.

JSON-Daten mithilfe von JsonToRow konvertieren

Es ist üblich, JSON-Strings in Dataflow zu verarbeiten. Beispielsweise werden JSON-Strings beim Streamen von Clickstream-Informationen verarbeitet, die von Webanwendungen erfasst wurden. Zur Verarbeitung von JSON-Strings müssen Sie diese während der Pipelineverarbeitung entweder in Zeilen oder alte Java-Objekte (POJOs) konvertieren.

Sie können die in Apache Beam integrierte Transformation JsonToRow verwenden, um JSON-Strings in Zeilen zu konvertieren. Wenn Sie jedoch eine Warteschlange für die Verarbeitung nicht erfolgreicher Nachrichten abrufen möchten, müssen Sie diese separat erstellen. Weitere Informationen finden Sie unter Nicht verarbeitbare Daten zur weiteren Analyse in einer Warteschlange bereitstellen.

Wenn Sie mithilfe von AutoValue einen JSON-String in einen POJO konvertieren möchten, registrieren Sie mithilfe der Annotation @DefaultSchema(AutoValueSchema.class) ein Schema für den Typ und verwenden Sie dann die Konvertierungs-Dienstprogrammklasse. Sie erhalten dann Code ähnlich dem folgenden:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

Weitere Informationen, einschließlich der verschiedenen Java-Typen, von denen Sie Schemas ableiten können, finden Sie unter Schemas erstellen.

Wenn JsonToRow mit Ihren Daten nicht funktioniert, ist Gson eine Alternative. Gson geht bei der Standardverarbeitung von Daten relativ locker vor, sodass Sie möglicherweise eine zusätzliche Validierung in den Prozess zur Datenkonvertierung integrieren müssen.

Beispiele

POJOs mithilfe des AutoValue-Codegenerators generieren

Apache Beam-Schemas sind oft die beste Methode, um Objekte in einer Pipeline darzustellen, da sie Ihnen die Arbeit mit strukturierten Daten ermöglichen. Gelegentlich ist jedoch ein einfaches altes Java-Objekt (POJO) erforderlich, z. B. beim Umgang mit Schlüssel/Wert-Paarobjekten oder dem Objektstatus. Bei der manuellen Erstellung von POJOs müssen Sie Überschreibungen für die Methoden equals() und hashcode() codieren, was zeitaufwendig und fehleranfällig sein kann. Falsche Überschreibungen können zu einem inkonsistenten Verhalten der Anwendung oder zu Datenverlust führen.

Verwenden Sie den Klassen-Builder AutoValue, um POJOs zu generieren. Mit dieser Option wird sichergestellt, dass die erforderlichen Überschreibungen verwendet werden und potenzielle Fehler vermieden werden. AutoValue wird in der Apache Beam-Codebasis stark genutzt. Wenn Sie Apache Beam-Pipelines mithilfe von Java entwickeln möchten, ist es deshalb hilfreich, mit diesem Klassen-Builder vertraut zu sein.

Sie können AutoValue auch mit Apache Beam-Schemas verwenden, wenn Sie eine @DefaultSchema(AutoValueSchema.class)-Annotation hinzufügen. Weitere Informationen finden Sie unter Schemas erstellen.

Weitere Informationen zu AutoValue finden Sie unter WarumAutoValue? und AutoValue Dokumente.

Beispiel

Clickstream.java

Nicht verarbeitbare Daten zur weiteren Analyse in die Warteschlange stellen

In Produktionssystemen ist es wichtig, mit problematischen Daten richtig umzugehen. Wenn möglich, validieren und korrigieren Sie Daten im Stream. Wenn eine Korrektur nicht möglich ist, protokollieren Sie den Wert für eine spätere Analyse in einer Warteschlange für nicht verarbeitete Nachrichten, die auch als Warteschlange für unzustellbare Nachrichten bezeichnet wird. Probleme treten häufig auf, wenn Sie Daten von einem Format in ein anderes konvertieren, z. B. wenn Sie JSON-Strings in Zeilen konvertieren.

Um dieses Problem zu beheben, verwenden Sie eine Transformation mit mehreren Ausgaben, um die Elemente, die die nicht verarbeiteten Daten enthalten, zur weiteren Analyse in eine andere PCollection zu transferieren. Diese Verarbeitung ist ein gängiger Vorgang, den Sie möglicherweise an vielen Stellen in einer Pipeline verwenden möchten. Versuchen Sie, die Transformation allgemein genug zu gestalten, um sie an mehreren Stellen zu verwenden. Erstellen Sie zuerst ein Fehlerobjekt, um gängige Attribute, einschließlich der Originaldaten, zusammenzufassen. Erstellen Sie als Nächstes eine Senkentransformation mit mehreren Optionen für das Ziel.

Beispiele

Transformationen der Datenvalidierung seriell anwenden

Daten, die von externen Systemen erhoben werden, müssen oft bereinigt werden. Strukturieren Sie Ihre Pipeline so, dass problematische Daten nach Möglichkeit als Stream korrigiert werden können. Senden Sie die Daten bei Bedarf an eine Warteschlange für weitere Analysen.

Da eine einzelne Nachricht durch mehrere Probleme beeinträchtigt werden kann, die behoben werden müssen, planen Sie den erforderlichen gerichteten azyklischen Graphen (Directed Acyclic Graph, DAG) ein. Wenn ein Element Daten mit mehreren Fehlern enthält, müssen Sie dafür sorgen, dass das Element die entsprechenden Transformationen durchläuft.

Stellen Sie sich beispielsweise ein Element mit den folgenden Werten vor, von denen keiner null sein sollte:

{"itemA": null,"itemB": null}

Achten Sie darauf, dass das Element Transformationen durchläuft, durch die beide potenziellen Probleme behoben werden:

badElements.apply(fixItemA).apply(fixItemB)

Die Pipeline umfasst möglicherweise mehrere serielle Schritte, aber durch Fusion kann der Verarbeitungsaufwand minimiert werden.

Beispiel

ValidateAndCorrectCSEvt.java

DoFn.StartBundle für Micro-Batch-Aufrufe von externen Diensten verwenden

Möglicherweise müssen Sie externe APIs als Teil Ihrer Pipeline aufrufen. Da eine Pipeline die Arbeit auf viele Computing-Ressourcen verteilt, kann ein einzelner Aufruf für jedes Element, das durch das System fließt, einen externen Dienstendpunkt überfordern. Dieses Problem tritt besonders häufig auf, wenn Sie keine Reduzierungsfunktionen angewendet haben.

Um dieses Problem zu vermeiden, führen Sie Batch-Aufrufe an externe Systeme durch.

Sie können Batchaufrufe mit einer GroupByKey-Transformation oder der Apache Beam Timer API ausführen. Beide Ansätze erfordern jedoch Shuffling. Hierdurch entsteht allerdings ein gewisser Verarbeitungsaufwand. Außerdem ist eine magische Zahl zur Bestimmung des Schlüsselbereichs erforderlich.

Verwenden Sie stattdessen die Lebenszykluselemente StartBundle und FinishBundle, um Ihre Daten im Batch zu verarbeiten. Bei diesen Optionen ist kein Shuffling erforderlich.

Ein kleiner Nachteil dieser Option ist, dass Bundle-Größen dynamisch durch die Implementierung des Runners bestimmt werden. Dabei hängt die Größe davon ab, was derzeit in der Pipeline und dessen Workern geschieht. Im Stream-Modus sind Bundles oft klein. Die Dataflow-Bündelung wird von Backend-Faktoren wie der Fragmentierung der Nutzung, der Menge von verfügbaren Daten für einen bestimmten Schlüssel und dem Durchsatz der Pipeline beeinflusst.

Beispiel

EventItemCorrectionService.java

Geeignetes Nebeneingabemuster für die Datenanreicherung verwenden

Bei Anwendungen für Streaminganalysen werden Daten häufig mit zusätzlichen Informationen angereichert, die für eine weitere Analyse nützlich sein könnten. Wenn Sie beispielsweise die storeId für eine Transaktion haben, können Sie Informationen zum Geschäftsstandort hinzufügen. Diese zusätzlichen Informationen werden häufig hinzugefügt, indem ein Element aus einer Suchtabelle hinzugefügt wird.

Bei Suchtabellen, die sich sowohl langsam ändern als auch kleiner sind, funktioniert das Verschieben der Tabelle als Singleton-Klasse, die die Map<K,V>-Schnittstelle in die Pipeline implementiert. Durch diese Option vermeiden Sie, dass jedes Element einen API-Aufruf für die Suche durchführt. Nachdem Sie eine Kopie einer Tabelle in die Pipeline aufgenommen haben, müssen Sie diese regelmäßig aktualisieren, damit sie aktuell bleibt.

Zur Verarbeitung von sich langsam aktualisierenden Nebeneingaben verwenden Sie die Nebeneingabemuster von Apache Beam.

Caching

Nebeneingaben werden in den Speicher geladen und werden daher automatisch im Cache gespeichert.

Sie können die Größe des Caches mit der Option --setWorkerCacheMb festlegen.

Sie können den Cache auf DoFn Instanzen freigeben und externe Trigger zum Aktualisieren des Caches verwenden.

Beispiel

SlowMovingStoreLocationDimension.java