L'applicazione di esempio di e-commerce dimostra le best practice per l'utilizzo di Dataflow per implementare l'analisi dei flussi di dati e l'AI in tempo reale. L'esempio contiene pattern di attività che mostrano il modo migliore per svolgere attività di programmazione Java. Queste attività sono di solito necessarie per creare applicazioni di e-commerce.
L'applicazione contiene i seguenti pattern di attività Java:
- Utilizzare gli schemi Apache Beam per lavorare con i dati strutturati
- Utilizzare JsonToRow per convertire i dati JSON
- Utilizzare il generatore di codice
AutoValue
per generare POJO (Plain Old Java Object) - Inserire in coda i dati non elaborabili per ulteriori analisi
- Applicare in serie le trasformazioni della convalida dei dati
- Utilizzare
DoFn.StartBundle
per raggruppare in micro-batch le chiamate ai servizi esterni - Utilizzare un pattern di input aggiuntivi appropriato
Utilizzare gli schemi Apache Beam per lavorare con i dati strutturati
Puoi utilizzare gli schemi Apache Beam per semplificare l'elaborazione dei dati strutturati.
La conversione degli oggetti in Righe ti consente di produrre codice Java molto pulito, il che semplifica l'esercizio di creazione del grafo aciclico diretto (DAG). Puoi anche fare riferimento alle proprietà degli oggetti come campi all'interno delle istruzioni di analisi che crei, anziché dover chiamare metodi.
Esempio
Utilizzare JsonToRow per convertire i dati JSON
L'elaborazione di stringhe JSON in Dataflow è una necessità comune. Ad esempio, le stringhe JSON vengono elaborate durante lo streaming delle informazioni sui clickstream acquisite dalle applicazioni web. Per elaborare le stringhe JSON, devi convertirle in Righe o POJO (Plain Old Java Object) durante l'elaborazione della pipeline.
Puoi utilizzare la trasformazione JsonToRow integrata di Apache Beam per convertire le stringhe JSON in righe. Tuttavia, se vuoi una coda per l'elaborazione dei messaggi non riusciti, devi crearla separatamente. Consulta Inserimento in coda di dati non elaborabili per ulteriori analisi.
Se devi convertire una stringa JSON in un POJO utilizzando AutoValue, registra uno schema per il tipo utilizzando l'annotazione @DefaultSchema(AutoValueSchema.class)
, quindi utilizza la classe di utilità Convert. Il codice risultante è simile al seguente:
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))
Per ulteriori informazioni, inclusi i diversi tipi di Java da cui puoi dedurre gli schemi, consulta la sezione Creare schemi.
Se JsonToRow non funziona con i tuoi dati, Gson è un'alternativa. Gson è abbastanza rilassato nell'elaborazione predefinita dei dati, il che potrebbe richiedere di implementare una maggiore convalida nel processo di conversione dei dati.
Esempi
Utilizza il generatore di codice AutoValue
per generare POJO
Gli schemi Apache Beam
sono spesso il modo migliore per rappresentare gli oggetti in una pipeline, per il modo in cui
ti consentono di lavorare con i dati strutturati. Tuttavia, a volte è necessario un
oggetto Java semplice (POJO), ad esempio quando si ha a che fare con oggetti chiave-valore o con la gestione dello stato dell'oggetto.
La creazione manuale di POJO richiede la codifica di sostituzioni per i metodi equals()
e
hashcode()
, che può richiedere molto tempo e essere soggetta a errori. Sostituzioni errate
potrebbero comportare un comportamento incoerente dell'applicazione o la perdita di dati.
Per generare POJO, utilizza il generatore di classi AutoValue
. Questa opzione garantisce che vengano utilizzate le sostituzioni necessarie e consente di evitare potenziali errori.
AutoValue
è ampiamente utilizzato nel codice sorgente di Apache Beam, quindi la familiarità con questo generatore di classi è utile se vuoi sviluppare pipeline Apache Beam su Dataflow utilizzando Java.
Puoi anche AutoValue
con gli schemi Apache Beam se aggiungi un'annotazione @DefaultSchema(AutoValueSchema.class)
. Per ulteriori informazioni, consulta la sezione Creare schemi.
Per saperne di più su AutoValue
, consulta Perché AutoValue?
e la documentazione di AutoValue
.
Esempio
Inserire in coda i dati non elaborabili per ulteriori analisi
Nei sistemi di produzione, è importante gestire i dati problematici. Se possibile, convalida e correggi i dati in tempo reale. Quando la correzione non è possibile, registra il valore in una coda di messaggi non elaborati, a volte chiamata coda di lettere morte, per un'analisi successiva. I problemi si verificano di solito durante la conversione dei dati da un formato all'altro, ad esempio durante la conversione di stringhe JSON in Righe.
Per risolvere il problema, utilizza una trasformazione con più output per trasferire gli elementi contenenti i dati non elaborati in un'altra PCollection per ulteriori analisi. Questa elaborazione è un'operazione comune che potresti volere utilizzare in molti punti di una pipeline. Cerca di rendere la trasformazione sufficientemente generica da poter essere utilizzata in più punti. Per prima cosa, crea un oggetto errore per eseguire il wrapping delle proprietà comuni, inclusi i dati originali. A questo punto, crea una trasformazione di destinazione con più opzioni per la destinazione.
Esempi
Applicazione in serie di trasformazioni della convalida dei dati
I dati raccolti da sistemi esterni spesso richiedono la pulizia. Struttura la pipeline in modo che possa correggere i dati problematici in-stream, se possibile. Invia i dati a una coda per ulteriori analisi se necessario.
Poiché un singolo messaggio potrebbe presentare più problemi che richiedono correzione, pianifica il grafo aciclico diretto (DAG) necessario. Se un elemento contiene dati con più difetti, devi assicurarti che scorra attraverso le trasformazioni appropriate.
Ad esempio, immagina un elemento con i seguenti valori, nessuno dei quali deve essere null:
{"itemA": null,"itemB": null}
Assicurati che l'elemento venga sottoposto a trasformazioni che correggono entrambi i potenziali problemi:
badElements.apply(fixItemA).apply(fixItemB)
La pipeline potrebbe avere più passaggi in serie, ma la fusione consente di ridurre al minimo il sovraccarico di elaborazione introdotto.
Esempio
Utilizzare DoFn.StartBundle
per raggruppare in micro-batch le chiamate ai servizi esterni
Potresti dover invocare API esterne all'interno della pipeline. Poiché una pipeline distribuisce il lavoro su molte risorse di calcolo, effettuare una singola chiamata per ogni elemento che attraversa il sistema può sovraccaricare un endpoint di servizio esterno. Questo problema è particolarmente comune quando non hai applicato funzioni di riduzione.
Per evitare questo problema, raggruppa le chiamate ai sistemi esterni.
Puoi raggruppare le chiamate utilizzando una trasformazione GroupByKey
o l'API Timer di Apache Beam. Tuttavia, entrambi questi approcci richiedono il rimescolamento, che introduce un sovraccarico di elaborazione e la necessità di un numero magico per determinare lo spazio delle chiavi.
Utilizza invece gli elementi di ciclo di vita StartBundle
e FinishBundle
per raggruppare i dati. Con queste opzioni non è necessario mescolare i brani.
Un piccolo svantaggio di questa opzione è che le dimensioni dei bundle vengono determinate dinamicamente dall'implementazione del runner in base a ciò che sta accadendo all'interno della pipeline e dei relativi worker. In modalità stream, i bundle sono spesso di piccole dimensioni. Il raggruppamento Dataflow è influenzato da fattori di backend come l'utilizzo dello sharding, la quantità di dati disponibili per una determinata chiave e la velocità effettiva della pipeline.
Esempio
EventItemCorrectionService.java
Utilizza un pattern di input aggiuntivi appropriato per l'arricchimento dei dati
Nelle applicazioni di analisi dei flussi di dati, i dati vengono spesso arricchiti con informazioni aggiuntive che potrebbero essere utili per ulteriori analisi. Ad esempio, se hai l'ID negozio per una transazione, ti consigliamo di aggiungere informazioni sulla sede del negozio. Queste informazioni aggiuntive vengono spesso aggiunte prendendo un elemento e inserendo le informazioni da una tabella di ricerca.
Per le tabelle di ricerca che cambiano lentamente e sono di dimensioni ridotte, inserire la tabella nella pipeline come classe singleton che implementa l'interfaccia Map<K,V>
funziona bene. Questa opzione ti consente di evitare che ogni elemento effettui una chiamata all'API per la relativa ricerca. Dopo aver incluso una copia di una tabella
nella pipeline, devi aggiornarla periodicamente per mantenerla aggiornata.
Per gestire gli input laterali con aggiornamento lento, utilizza i pattern di input laterali di Apache Beam.
Memorizzazione nella cache
Gli input aggiuntivi vengono caricati in memoria e quindi memorizzati nella cache automaticamente.
Puoi impostare le dimensioni della cache utilizzando l'opzione --setWorkerCacheMb
.
Puoi condividere la cache tra più istanze DoFn
e utilizzare attivatori esterni per aggiornarla.
Esempio
SlowMovingStoreLocationDimension.java