Arricchisci i flussi di dati

Apache Beam semplifica il flusso di lavoro di arricchimento dei dati fornendo una soluzione "chiavi in mano" una trasformazione dell'arricchimento che puoi aggiungere alla tua pipeline. In questa pagina viene spiegato come per utilizzare la trasformazione di arricchimento Apache Beam per arricchire i dati in modalità flusso.

Quando arricchisci i dati, aumenti i dati non elaborati provenienti da un'origine aggiungendo da una seconda origine. I dati aggiuntivi possono provenire da diverse come Bigtable o BigQuery. L'arricchimento di Apache Beam trasforma usa una ricerca di coppie chiave-valore per collegare i dati aggiuntivi ai dati non elaborati.

Gli esempi seguenti forniscono alcuni casi in cui l'arricchimento dei dati è utile:

  • Vuoi creare una pipeline di e-commerce che acquisisca le attività utente da un sito web o un'app e fornisca consigli personalizzati. La trasformazione incorpora le attività nei dati della pipeline in modo che tu possa fornire i consigli personalizzati.
  • Hai dati utente che vuoi unire a dati geografici per eseguire analisi basate sulla geografia.
  • Vuoi creare una pipeline che raccolga dati dall'Internet of Things (IOT) che inviano eventi di telemetria.

Vantaggi

La trasformazione di arricchimento presenta i seguenti vantaggi:

  • Trasforma i tuoi dati senza che tu debba scrivere codice complesso o gestire librerie sottostanti.
  • Fornisce gestori di origine integrati.
  • Utilizza la limitazione lato client per gestire il limite di frequenza delle richieste. Le richieste vengono sottoposte a backoff esponenziale con una strategia di ripetizione predefinita. Puoi configurare la limitazione di frequenza in base al tuo caso d'uso.

Supporto e limitazioni

La trasformazione di arricchimento presenta i seguenti requisiti:

  • Disponibile per le pipeline batch e in streaming.
  • Il gestore BigTableEnrichmentHandler è disponibile in Apache Beam SDK Python 2.54.0 e versioni successive.
  • Il gestore VertexAIFeatureStoreEnrichmentHandler è disponibile nell'SDK Apache Beam per Python 2.55.0 e versioni successive.
  • Quando utilizzi le versioni 2.55.0 e successive dell'SDK Apache Beam per Python, devi anche installare il client Python per Redis.
  • I job Dataflow devono utilizzare Runner v2.

Utilizzare la trasformazione di arricchimento

Per utilizzare la trasformazione dell'arricchimento, includi il seguente codice della tua pipeline:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Poiché la trasformazione di arricchimento esegue un cross join per impostazione predefinita, un join personalizzato per arricchire i dati di input. Questa progettazione fa sì che il join includa solo i campi specificati.

Nell'esempio seguente, left è l'elemento di input della trasformazione di arricchimento e right sono i dati recuperati da un servizio esterno per quell'elemento di input.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parametri

Per utilizzare la trasformazione di arricchimento, è necessario il parametro EnrichmentHandler.

Puoi anche utilizzare un parametro di configurazione per specificare una funzione lambda per una funzione di join, un timeout, un regolatore o un ripetitore (strategia di ripetizione). Le seguenti sono disponibili i seguenti parametri:

  • join_fn: una funzione lambda che prende i dizionari come input e restituisce una riga arricchita (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). La la riga arricchita specifica come unire i dati recuperati dall'API. Il valore predefinito è un join croce.
  • timeout: il numero di secondi di attesa per il completamento della richiesta da parte dell'API prima del timeout. Il valore predefinito è 30 secondi.
  • throttler: specifica il meccanismo di limitazione. L'unica opzione supportata è la limitazione adattiva lato client.
  • repeater: specifica la strategia di ripetizione quando errori come TooManyRequests e TimeoutException. Il valore predefinito è ExponentialBackOffRepeater.

Passaggi successivi