Streamingdaten anreichern

Apache Beam vereinfacht den Datenanreicherungs-Workflow, indem es eine schlüsselfertige Anreicherungstransformation bietet, die Sie Ihrer Pipeline hinzufügen können. Auf dieser Seite erfahren Sie, wie Sie die Apache Beam-Anreicherungstransformation verwenden, um Ihre Streamingdaten anzureichern.

Wenn Sie Daten anreichern, ergänzen Sie die Rohdaten aus einer Quelle durch zugehörige Daten aus einer zweiten Quelle. Die zusätzlichen Daten können aus einer Vielzahl von Quellen stammen, z. B. aus Bigtable oder BigQuery. Die Apache Beam-Anreicherungstransformation verwendet einen Schlüssel/Wert-Paar-Lookup, um die zusätzlichen Daten mit den Rohdaten zu verbinden.

Die folgenden Beispiele zeigen, in welchen Fällen die Datenanreicherung nützlich ist:

  • Sie möchten eine E-Commerce-Pipeline erstellen, die Nutzeraktivitäten auf einer Website oder in einer App erfasst und personalisierte Empfehlungen liefert. Die Transformation fügt die Aktivitäten in Ihre Pipeline-Daten ein, damit Sie die personalisierten Empfehlungen geben können.
  • Sie haben Nutzerdaten, die Sie mit geografischen Daten zusammenführen möchten, um geografische Analysen durchzuführen.
  • Sie möchten eine Pipeline erstellen, die Daten von IOT-Geräten (Internet of Things) erfasst, die Telemetrieereignisse senden.

Vorteile

Für die Anreicherungstransformation gelten folgende Vorteile:

  • Transformiert Daten, ohne dass Sie komplexen Code schreiben oder die zugrunde liegenden Bibliotheken verwalten müssen.
  • Stellt integrierte Quell-Handler bereit.
  • Verwendet clientseitige Drosselung, um die Ratenbegrenzung der Anfragen zu verwalten. Die Anfragen werden mit einer Standardwiederholstrategie exponentiell verzögert. Sie können die Ratenbegrenzung an Ihren Anwendungsfall anpassen.

Unterstützung und Einschränkungen

Für die Anreicherungstransformation gelten folgende Anforderungen:

  • Verfügbar für Batch- und Streamingpipelines.
  • Der BigTableEnrichmentHandler-Handler ist in der Apache Beam Python SDK-Version 2.54.0 und höher verfügbar.
  • Der VertexAIFeatureStoreEnrichmentHandler-Handler ist in der Apache Beam Python SDK-Version 2.55.0 und höher verfügbar.
  • Wenn Sie die Apache Beam Python SDK-Version 2.55.0 und höher verwenden, müssen Sie auch den Python-Client für Redis installieren.
  • Dataflow-Jobs müssen Runner v2 verwenden.

Datenanreicherungs-Transformation verwenden

Wenn Sie die Datenanreicherung verwenden möchten, fügen Sie den folgenden Code in Ihre Pipeline ein:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Da die Datenanreicherungstransformation standardmäßig einen Kreuzverweis ausführt, müssen Sie den benutzerdefinierten Join so gestalten, dass die Eingabedaten angereichert werden. So wird sichergestellt, dass die Zusammenführung nur die angegebenen Felder enthält.

Im folgenden Beispiel ist left das Eingabeelement der Datenanreicherungstransformation und right sind Daten, die für dieses Eingabeelement aus einem externen Dienst abgerufen wurden.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parameter

Für die Verwendung der Datenanreicherung ist der Parameter EnrichmentHandler erforderlich.

Sie können auch einen Konfigurationsparameter verwenden, um eine lambda-Funktion für eine Join-Funktion, ein Zeitlimit, einen Drosselmechanismus oder einen Repeater (Wiederholstrategie) anzugeben. Folgende Konfigurationsparameter sind verfügbar:

  • join_fn: Eine lambda-Funktion, die Wörterbücher als Eingabe nimmt und eine angereicherte Zeile (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]) zurückgibt. In der angereicherten Zeile wird angegeben, wie die Daten aus der API zusammengeführt werden sollen. Standardmäßig ist ein Kreuzungszusammenhang festgelegt.
  • timeout: Die Anzahl der Sekunden, die gewartet werden soll, bis die Anfrage von der API abgeschlossen wurde, bevor eine Zeitüberschreitung auftritt. Standardmäßig 30 Sekunden.
  • throttler: Gibt den Drosselungsmechanismus an. Die einzige unterstützte Option ist die standardmäßige clientseitige adaptive Drosselung.
  • repeater: Gibt die Wiederholungsstrategie an, wenn Fehler wie TooManyRequests und TimeoutException auftreten. Die Standardeinstellung ist ExponentialBackOffRepeater.

Nächste Schritte