Arricchisci i flussi di dati

Apache Beam semplifica il flusso di lavoro di arricchimento dei dati fornendo una trasformazione di arricchimento pronta all'uso che puoi aggiungere alla tua pipeline. Questa pagina spiega come utilizzare la trasformazione di arricchimento Apache Beam per arricchire i flussi di dati.

Quando arricchisci i dati, aumenti i dati non elaborati di un'origine aggiungendo dati correlati da una seconda origine. I dati aggiuntivi possono provenire da diverse origini, ad esempio Bigtable o BigQuery. La trasformazione di arricchimento Apache Beam utilizza una ricerca chiave-valore per connettere i dati aggiuntivi ai dati non elaborati.

I seguenti esempi forniscono alcuni casi in cui l'arricchimento dei dati è utile:

  • Vuoi creare una pipeline di e-commerce che acquisisca le attività degli utenti da un sito web o da un'app e fornisca suggerimenti personalizzati. La trasformazione incorpora le attività nei dati della pipeline in modo che tu possa fornire suggerimenti personalizzati.
  • Disponi di dati utente che vuoi unire a dati geografici per eseguire analisi basate sull'area geografica.
  • Vuoi creare una pipeline che raccolga i dati da dispositivi Internet of things (IOT) che inviano eventi di telemetria.

Vantaggi

La trasformazione dell'arricchimento offre i seguenti vantaggi:

  • Trasforma i tuoi dati senza che tu debba scrivere codice complesso o gestire le librerie sottostanti.
  • Fornisce gestori del codice sorgente integrati.
  • Utilizza la limitazione lato client per gestire la limitazione di frequenza delle richieste. Le richieste vengono sottoposte a backup in modo esponenziale con una strategia predefinita per i nuovi tentativi. Puoi configurare la limitazione di frequenza in base al tuo caso d'uso.

Assistenza e limitazioni

La trasformazione di arricchimento prevede i seguenti requisiti:

  • Disponibile per le pipeline in modalità batch e flusso.
  • Il gestore BigTableEnrichmentHandler è disponibile nell'SDK Apache Beam Python 2.54.0 e versioni successive.
  • Il gestore VertexAIFeatureStoreEnrichmentHandler è disponibile nell'SDK Apache Beam Python 2.55.0 e versioni successive.
  • Quando utilizzi l'SDK Apache Beam Python 2.55.0 e versioni successive, devi installare anche il client Python per Redis.
  • I job Dataflow devono utilizzare Runner v2.

Usa la trasformazione dell'arricchimento

Per utilizzare la trasformazione di arricchimento, includi il seguente codice nella pipeline:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Poiché la trasformazione di arricchimento esegue un cross join per impostazione predefinita, progetta il join personalizzato per arricchire i dati di input. Questo design garantisce che il join includa solo i campi specificati.

Nell'esempio seguente, left è l'elemento di input della trasformazione di arricchimento e right sono i dati recuperati da un servizio esterno per quell'elemento di input.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parametri

Per utilizzare la trasformazione di arricchimento, è obbligatorio il parametro EnrichmentHandler.

Puoi utilizzare un parametro di configurazione anche per specificare una funzione lambda per una funzione di join, un timeout, una limitazione o un ripetitore (strategia di nuovo tentativo). Sono disponibili i seguenti parametri di configurazione:

  • join_fn: una funzione lambda che prende i dizionari come input e restituisce una riga arricchita (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). La riga arricchita specifica come unire i dati recuperati dall'API. Il valore predefinito è un cross join.
  • timeout: il numero di secondi di attesa prima che la richiesta venga completata dall'API prima del timeout. Il valore predefinito è 30 secondi.
  • throttler: specifica il meccanismo di limitazione. L'unica opzione supportata è la limitazione adattiva predefinita lato client.
  • repeater: specifica la strategia per i nuovi tentativi quando si verificano errori come TooManyRequests e TimeoutException. Il valore predefinito è ExponentialBackOffRepeater.

Passaggi successivi