Apache Beam semplifica il flusso di lavoro di arricchimento dei dati fornendo una soluzione "chiavi in mano" una trasformazione dell'arricchimento che puoi aggiungere alla tua pipeline. In questa pagina viene spiegato come per utilizzare la trasformazione di arricchimento Apache Beam per arricchire i dati in modalità flusso.
Quando arricchisci i dati, aumenti i dati non elaborati provenienti da un'origine aggiungendo da una seconda origine. I dati aggiuntivi possono provenire da diverse come Bigtable o BigQuery. L'arricchimento di Apache Beam trasforma usa una ricerca di coppie chiave-valore per collegare i dati aggiuntivi ai dati non elaborati.
Gli esempi seguenti forniscono alcuni casi in cui l'arricchimento dei dati è utile:
- Vuoi creare una pipeline di e-commerce che acquisisca le attività utente da un sito web o un'app e fornisca consigli personalizzati. La trasformazione incorpora le attività nei dati della pipeline in modo che tu possa fornire i consigli personalizzati.
- Hai dati utente che vuoi unire a dati geografici per eseguire analisi basate sulla geografia.
- Vuoi creare una pipeline che raccolga dati dall'Internet of Things (IOT) che inviano eventi di telemetria.
Vantaggi
La trasformazione di arricchimento presenta i seguenti vantaggi:
- Trasforma i tuoi dati senza che tu debba scrivere codice complesso o gestire librerie sottostanti.
- Fornisce gestori di origine integrati.
- Utilizza la
Gestore
BigTableEnrichmentHandler
per arricchire i dati utilizzando una Origine Bigtable senza trasmettere dettagli di configurazione. - Utilizza la
Gestore
VertexAIFeatureStoreEnrichmentHandler
con Vertex AI Feature Store e Pubblicazione online di Bigtable.
- Utilizza la
Gestore
- Utilizza la limitazione lato client per gestire il limite di frequenza delle richieste. Le richieste vengono sottoposte a backoff esponenziale con una strategia di ripetizione predefinita. Puoi configurare la limitazione di frequenza in base al tuo caso d'uso.
Supporto e limitazioni
La trasformazione di arricchimento presenta i seguenti requisiti:
- Disponibile per le pipeline batch e in streaming.
- Il gestore
BigTableEnrichmentHandler
è disponibile in Apache Beam SDK Python 2.54.0 e versioni successive. - Il gestore
VertexAIFeatureStoreEnrichmentHandler
è disponibile nell'SDK Apache Beam per Python 2.55.0 e versioni successive. - Quando utilizzi le versioni 2.55.0 e successive dell'SDK Apache Beam per Python, devi anche installare il client Python per Redis.
- I job Dataflow devono utilizzare Runner v2.
Utilizzare la trasformazione di arricchimento
Per utilizzare la trasformazione dell'arricchimento, includi il seguente codice della tua pipeline:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
Poiché la trasformazione di arricchimento esegue un cross join per impostazione predefinita, un join personalizzato per arricchire i dati di input. Questa progettazione fa sì che il join includa solo i campi specificati.
Nell'esempio seguente, left
è l'elemento di input della trasformazione di arricchimento e right
sono i dati recuperati da un servizio esterno per quell'elemento di input.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
Parametri
Per utilizzare la trasformazione di arricchimento, è necessario il parametro EnrichmentHandler
.
Puoi anche utilizzare un parametro di configurazione per specificare una funzione lambda
per una funzione di join, un timeout, un regolatore o un ripetitore (strategia di ripetizione). Le seguenti
sono disponibili i seguenti parametri:
join_fn
: una funzionelambda
che prende i dizionari come input e restituisce una riga arricchita (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
). La la riga arricchita specifica come unire i dati recuperati dall'API. Il valore predefinito è un join croce.timeout
: il numero di secondi di attesa per il completamento della richiesta da parte dell'API prima del timeout. Il valore predefinito è 30 secondi.throttler
: specifica il meccanismo di limitazione. L'unica opzione supportata è la limitazione adattiva lato client.repeater
: specifica la strategia di ripetizione quando errori comeTooManyRequests
eTimeoutException
. Il valore predefinito èExponentialBackOffRepeater
.
Passaggi successivi
- Per altri esempi, consulta Trasformazione di arricchimento nel catalogo delle trasformazioni di Apache Beam.
- Utilizza Apache Beam e Bigtable per arricchire i dati.
- Utilizza Apache Beam e Vertex AI Feature Store per arricchire i dati.