Apache Beam vereinfacht den Datenanreicherungs-Workflow, indem es eine schlüsselfertige Anreicherungstransformation bietet, die Sie Ihrer Pipeline hinzufügen können. Auf dieser Seite erfahren Sie, wie Sie die Apache Beam-Anreicherungstransformation verwenden, um Ihre Streamingdaten anzureichern.
Wenn Sie Daten anreichern, ergänzen Sie die Rohdaten aus einer Quelle durch zugehörige Daten aus einer zweiten Quelle. Die zusätzlichen Daten können aus einer Vielzahl von Quellen stammen, z. B. aus Bigtable oder BigQuery. Die Apache Beam-Anreicherungstransformation verwendet einen Schlüssel/Wert-Paar-Lookup, um die zusätzlichen Daten mit den Rohdaten zu verbinden.
Die folgenden Beispiele zeigen, in welchen Fällen die Datenanreicherung nützlich ist:
- Sie möchten eine E-Commerce-Pipeline erstellen, die Nutzeraktivitäten auf einer Website oder in einer App erfasst und personalisierte Empfehlungen liefert. Die Transformation fügt die Aktivitäten in Ihre Pipeline-Daten ein, damit Sie die personalisierten Empfehlungen geben können.
- Sie haben Nutzerdaten, die Sie mit geografischen Daten zusammenführen möchten, um geografische Analysen durchzuführen.
- Sie möchten eine Pipeline erstellen, die Daten von IOT-Geräten (Internet of Things) erfasst, die Telemetrieereignisse senden.
Vorteile
Für die Anreicherungstransformation gelten folgende Vorteile:
- Transformiert Daten, ohne dass Sie komplexen Code schreiben oder die zugrunde liegenden Bibliotheken verwalten müssen.
- Stellt integrierte Quell-Handler bereit.
- Verwenden Sie den Handler
BigTableEnrichmentHandler
, um Ihre Daten mithilfe einer Bigtable-Quelle anzureichern, ohne Konfigurationsdetails zu übergeben. - Verwenden Sie den Handler
VertexAIFeatureStoreEnrichmentHandler
mit Vertex AI Feature Store und Bigtable-Onlinebereitstellung.
- Verwenden Sie den Handler
- Verwendet clientseitige Drosselung, um die Ratenbegrenzung der Anfragen zu verwalten. Die Anfragen werden mit einer Standardwiederholstrategie exponentiell verzögert. Sie können die Ratenbegrenzung an Ihren Anwendungsfall anpassen.
Unterstützung und Einschränkungen
Für die Anreicherungstransformation gelten folgende Anforderungen:
- Verfügbar für Batch- und Streamingpipelines.
- Der
BigTableEnrichmentHandler
-Handler ist in der Apache Beam Python SDK-Version 2.54.0 und höher verfügbar. - Der
VertexAIFeatureStoreEnrichmentHandler
-Handler ist in der Apache Beam Python SDK-Version 2.55.0 und höher verfügbar. - Wenn Sie die Apache Beam Python SDK-Version 2.55.0 und höher verwenden, müssen Sie auch den Python-Client für Redis installieren.
- Dataflow-Jobs müssen Runner v2 verwenden.
Datenanreicherungs-Transformation verwenden
Wenn Sie die Datenanreicherung verwenden möchten, fügen Sie den folgenden Code in Ihre Pipeline ein:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
Da die Datenanreicherungstransformation standardmäßig einen Kreuzverweis ausführt, müssen Sie den benutzerdefinierten Join so gestalten, dass die Eingabedaten angereichert werden. So wird sichergestellt, dass die Zusammenführung nur die angegebenen Felder enthält.
Im folgenden Beispiel ist left
das Eingabeelement der Datenanreicherungstransformation und right
sind Daten, die für dieses Eingabeelement aus einem externen Dienst abgerufen wurden.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
Parameter
Für die Verwendung der Datenanreicherung ist der Parameter EnrichmentHandler
erforderlich.
Sie können auch einen Konfigurationsparameter verwenden, um eine lambda
-Funktion für eine Join-Funktion, ein Zeitlimit, einen Drosselmechanismus oder einen Repeater (Wiederholstrategie) anzugeben. Folgende Konfigurationsparameter sind verfügbar:
join_fn
: Einelambda
-Funktion, die Wörterbücher als Eingabe nimmt und eine angereicherte Zeile (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
) zurückgibt. In der angereicherten Zeile wird angegeben, wie die Daten aus der API zusammengeführt werden sollen. Standardmäßig ist ein Kreuzungszusammenhang festgelegt.timeout
: Die Anzahl der Sekunden, die gewartet werden soll, bis die Anfrage von der API abgeschlossen wurde, bevor eine Zeitüberschreitung auftritt. Standardmäßig 30 Sekunden.throttler
: Gibt den Drosselungsmechanismus an. Die einzige unterstützte Option ist die standardmäßige clientseitige adaptive Drosselung.repeater
: Gibt die Wiederholungsstrategie an, wenn Fehler wieTooManyRequests
undTimeoutException
auftreten. Die Standardeinstellung istExponentialBackOffRepeater
.
Nächste Schritte
- Weitere Beispiele finden Sie unter Anreicherungstransformation im Apache Beam-Transformationskatalog.
- Mit Apache Beam und Bigtable Daten anreichern
- Mit Apache Beam und Vertex AI Feature Store Daten anreichern