Apache Beam semplifica il flusso di lavoro di arricchimento dei dati fornendo una trasformazione di arricchimento pronta all'uso che puoi aggiungere alla tua pipeline. Questa pagina spiega come utilizzare la trasformazione di arricchimento Apache Beam per arricchire i flussi di dati.
Quando arricchisci i dati, aumenti i dati non elaborati di un'origine aggiungendo dati correlati da una seconda origine. I dati aggiuntivi possono provenire da diverse origini, ad esempio Bigtable o BigQuery. La trasformazione di arricchimento Apache Beam utilizza una ricerca chiave-valore per connettere i dati aggiuntivi ai dati non elaborati.
I seguenti esempi forniscono alcuni casi in cui l'arricchimento dei dati è utile:
- Vuoi creare una pipeline di e-commerce che acquisisca le attività degli utenti da un sito web o da un'app e fornisca suggerimenti personalizzati. La trasformazione incorpora le attività nei dati della pipeline in modo che tu possa fornire suggerimenti personalizzati.
- Disponi di dati utente che vuoi unire a dati geografici per eseguire analisi basate sull'area geografica.
- Vuoi creare una pipeline che raccolga i dati da dispositivi Internet of things (IOT) che inviano eventi di telemetria.
Vantaggi
La trasformazione dell'arricchimento offre i seguenti vantaggi:
- Trasforma i tuoi dati senza che tu debba scrivere codice complesso o gestire le librerie sottostanti.
- Fornisce gestori del codice sorgente integrati.
- Utilizza il gestore
BigTableEnrichmentHandler
per arricchire i dati utilizzando un'origine Bigtable senza trasmettere i dettagli di configurazione. - Utilizza il gestore
VertexAIFeatureStoreEnrichmentHandler
con Vertex AI Feature Store e Pubblicazione online di Bigtable.
- Utilizza il gestore
- Utilizza la limitazione lato client per gestire la limitazione di frequenza delle richieste. Le richieste vengono sottoposte a backup in modo esponenziale con una strategia predefinita per i nuovi tentativi. Puoi configurare la limitazione di frequenza in base al tuo caso d'uso.
Assistenza e limitazioni
La trasformazione di arricchimento prevede i seguenti requisiti:
- Disponibile per le pipeline in modalità batch e flusso.
- Il gestore
BigTableEnrichmentHandler
è disponibile nell'SDK Apache Beam Python 2.54.0 e versioni successive. - Il gestore
VertexAIFeatureStoreEnrichmentHandler
è disponibile nell'SDK Apache Beam Python 2.55.0 e versioni successive. - Quando utilizzi l'SDK Apache Beam Python 2.55.0 e versioni successive, devi installare anche il client Python per Redis.
- I job Dataflow devono utilizzare Runner v2.
Usa la trasformazione dell'arricchimento
Per utilizzare la trasformazione di arricchimento, includi il seguente codice nella pipeline:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
Poiché la trasformazione di arricchimento esegue un cross join per impostazione predefinita, progetta il join personalizzato per arricchire i dati di input. Questo design garantisce che il join includa solo i campi specificati.
Nell'esempio seguente, left
è l'elemento di input della trasformazione
di arricchimento e right
sono i dati recuperati da un servizio esterno per quell'elemento
di input.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
Parametri
Per utilizzare la trasformazione di arricchimento, è obbligatorio il parametro EnrichmentHandler
.
Puoi utilizzare un parametro di configurazione anche per specificare una funzione lambda
per una funzione di join, un timeout, una limitazione o un ripetitore (strategia di nuovo tentativo). Sono disponibili i seguenti
parametri di configurazione:
join_fn
: una funzionelambda
che prende i dizionari come input e restituisce una riga arricchita (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
). La riga arricchita specifica come unire i dati recuperati dall'API. Il valore predefinito è un cross join.timeout
: il numero di secondi di attesa prima che la richiesta venga completata dall'API prima del timeout. Il valore predefinito è 30 secondi.throttler
: specifica il meccanismo di limitazione. L'unica opzione supportata è la limitazione adattiva predefinita lato client.repeater
: specifica la strategia per i nuovi tentativi quando si verificano errori comeTooManyRequests
eTimeoutException
. Il valore predefinito èExponentialBackOffRepeater
.
Passaggi successivi
- Per altri esempi, consulta la pagina relativa alla trasformazione di Enrichment nel catalogo delle trasformazioni di Apache Beam.
- Utilizza Apache Beam e Bigtable per arricchire i dati.
- Utilizza Apache Beam e Vertex AI Feature Store per arricchire i dati.