Streamingdaten anreichern

Apache Beam vereinfacht den Workflow zur Datenanreicherung. Dazu wird eine sofort einsatzbereite Anreicherungstransformation bereitgestellt, die Sie Ihrer Pipeline hinzufügen können. Auf dieser Seite wird erläutert, wie Sie die Apache Beam-Anreicherungs-Transformation verwenden, um Ihre Streamingdaten anzureichern.

Wenn Sie Daten anreichern, erweitern Sie die Rohdaten aus einer Quelle, indem Sie zugehörige Daten aus einer zweiten Quelle hinzufügen. Die zusätzlichen Daten können aus einer Vielzahl von Quellen wie Bigtable oder BigQuery stammen. Die Apache Beam-Anreicherungstransformation verwendet einen Schlüssel/Wert-Paar-Lookup, um die zusätzlichen Daten mit den Rohdaten zu verbinden.

Die folgenden Beispiele zeigen einige Fälle, in denen die Datenanreicherung hilfreich ist:

  • Sie möchten eine E-Commerce-Pipeline erstellen, die Nutzeraktivitäten von einer Website oder Anwendung erfasst und benutzerdefinierte Empfehlungen bereitstellt. Die Transformation bindet die Aktivitäten in Ihre Pipelinedaten ein, damit Sie die benutzerdefinierten Empfehlungen bereitstellen können.
  • Sie haben Nutzerdaten, die Sie mit geografischen Daten zusammenführen möchten, um geografische Analysen durchzuführen.
  • Sie möchten eine Pipeline erstellen, die Daten von IOT-Geräten (Internet of Things) erfasst, die Telemetrieereignisse senden.

Vorteile

Die Anreicherungstransformation hat folgende Vorteile:

  • Transformiert Daten, ohne dass Sie komplexen Code schreiben oder die zugrunde liegenden Bibliotheken verwalten müssen.
  • Stellt integrierte Quell-Handler bereit.
  • Verwendet clientseitige Drosselung, um die Ratenbegrenzung der Anfragen zu verwalten. Die Anfragen werden mit einer Standardstrategie für Wiederholungsversuche exponentiell abgesichert. Sie können die Ratenbegrenzung entsprechend Ihrem Anwendungsfall konfigurieren.

Unterstützung und Einschränkungen

Für die Anreicherungstransformation gelten folgende Anforderungen:

  • Verfügbar für Batch- und Streamingpipelines.
  • Der BigTableEnrichmentHandler-Handler ist in der Apache Beam Python SDK-Version 2.54.0 und höher verfügbar.
  • Der VertexAIFeatureStoreEnrichmentHandler-Handler ist in der Apache Beam Python SDK-Version 2.55.0 und höher verfügbar.
  • Wenn Sie die Apache Beam Python SDK-Version 2.55.0 und höher verwenden, müssen Sie auch den Python-Client für Redis installieren.
  • Dataflow-Jobs müssen Runner v2 verwenden.

Anreicherungstransformation verwenden

Fügen Sie folgenden Code in Ihre Pipeline ein, um die Anreicherungstransformation zu verwenden:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Da die Anreicherungstransformation standardmäßig einen Cross Join ausführt, sollten Sie den benutzerdefinierten Join zur Anreicherung der Eingabedaten entwerfen. Dieses Design sorgt dafür, dass der Join nur die angegebenen Felder enthält.

Im folgenden Beispiel ist left das Eingabeelement der Anreicherungstransformation und right die Daten, die von einem externen Dienst für dieses Eingabeelement abgerufen wurden.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parameter

Zur Verwendung der Anreicherungstransformation ist der Parameter EnrichmentHandler erforderlich.

Sie können auch einen Konfigurationsparameter verwenden, um eine lambda-Funktion für eine Join-Funktion, ein Zeitlimit, einen Drosseler oder einen Wiederholer (Wiederholungsstrategie) anzugeben. Die folgenden Konfigurationsparameter sind verfügbar:

  • join_fn: Eine lambda-Funktion, die Wörterbücher als Eingabe verwendet und eine angereicherte Zeile zurückgibt (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). Die angereicherte Zeile gibt an, wie die von der API abgerufenen Daten zusammengeführt werden. Die Standardeinstellung ist ein Cross Join.
  • timeout: Die Anzahl der Sekunden, die gewartet wird, bis die Anfrage von der API abgeschlossen wird, bevor eine Zeitüberschreitung auftritt. Der Standardwert ist 30 Sekunden.
  • throttler: Gibt den Drosselungsmechanismus an. Die einzige unterstützte Option ist die standardmäßige clientseitige adaptive Drosselung.
  • repeater: Gibt die Wiederholungsstrategie an, wenn Fehler wie TooManyRequests und TimeoutException auftreten. Die Standardeinstellung ist ExponentialBackOffRepeater.

Nächste Schritte