L'applicazione di esempio per l'e-commerce illustra le best practice per l'utilizzo di Dataflow al fine di implementare l'analisi dei flussi di dati e l'AI in tempo reale. L'esempio contiene pattern di attività che mostrano il modo migliore per svolgere attività di programmazione Java. Queste attività sono in genere necessarie per creare applicazioni di e-commerce.
L'applicazione contiene i seguenti pattern di attività Java:
- Utilizzare gli schemi Apache Beam per lavorare con i dati strutturati
- Utilizzare JsonToRow per convertire i dati JSON
- Utilizzare il generatore di codice
AutoValue
per generare POJO (Semplici oggetti Java) - Mettere in coda i dati non elaborabili per ulteriori analisi
- Applicare le trasformazioni di convalida dei dati in modo seriale
- Utilizzare
DoFn.StartBundle
per creare micro-batch delle chiamate ai servizi esterni - Utilizza un pattern di input laterale appropriato
Utilizzare gli schemi Apache Beam per lavorare con i dati strutturati
Puoi utilizzare gli schemi di Apache Beam per semplificare l'elaborazione dei dati strutturati.
La conversione degli oggetti in Righe consente di produrre codice Java molto chiaro, che semplifica l'esercizio di creazione di grafi diretti aciclici (DAG). Puoi anche fare riferimento alle proprietà dell'oggetto come campi nelle istruzioni di analisi create, anziché dover chiamare i metodi.
Esempio
Utilizzare JsonToRow per convertire i dati JSON
L'elaborazione di stringhe JSON in Dataflow è un'esigenza comune. Ad esempio, le stringhe JSON vengono elaborate durante il flusso di informazioni sui flussi di clic acquisite dalle applicazioni web. Per elaborare le stringhe JSON, devi convertirle in Righe o Oggetti Java semplici (POJO) durante l'elaborazione della pipeline.
Puoi utilizzare la trasformazione integrata di Apache Beam JsonToRow per convertire le stringhe JSON in righe. Tuttavia, se vuoi una coda per l'elaborazione dei messaggi non riusciti, devi crearla separatamente, consulta Mettere in coda i dati non elaborabili per ulteriori analisi.
Se devi convertire una stringa JSON in un POJO utilizzando AutoValue, registra uno schema per il tipo utilizzando l'annotazione @DefaultSchema(AutoValueSchema.class)
, quindi utilizza la classe di utilità Converti. Il codice risultante è simile al seguente:
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))
Per ulteriori informazioni, inclusi i diversi tipi di Java da cui puoi dedurre gli schemi, consulta la sezione Creazione di schemi.
Se JsonToRow non funziona con i tuoi dati, Gson è un'alternativa. Gson è piuttosto rilassato nell'elaborazione predefinita dei dati, il che potrebbe richiedere un'ulteriore convalida nel processo di conversione dei dati.
Esempi
Utilizza il generatore di codice AutoValue
per generare POJO
Gli schemi di Apache Beam
sono spesso il modo migliore per rappresentare gli oggetti in una pipeline, grazie al modo in cui
consentono di lavorare con i dati strutturati. Tuttavia, a volte è necessario un plain vecchio oggetto Java (POJO), ad esempio per gestire oggetti chiave-valore o gestire lo stato degli oggetti.
La creazione manuale dei POJO richiede override del codice per i metodi equals()
e hashcode()
, che possono richiedere molto tempo e causare errori. Gli override errati potrebbero causare un comportamento incoerente dell'applicazione o una perdita di dati.
Per generare POJO, utilizza lo strumento per la creazione delle classi di AutoValue
. Questa opzione garantisce l'utilizzo degli override necessari e consente di evitare potenziali errori.
AutoValue
è molto utilizzato nel codebase Apache Beam, pertanto la familiarità con questo generatore di classi è utile se vuoi sviluppare pipeline Apache Beam su Dataflow utilizzando Java.
Puoi anche AutoValue
con gli schemi Apache Beam se aggiungi un'annotazione @DefaultSchema(AutoValueSchema.class)
. Per ulteriori informazioni, consulta la sezione Creazione di schemi.
Per ulteriori informazioni su AutoValue
,
consulta Perché AutoValue?
e la documentazione AutoValue
.
Esempio
Mettere in coda i dati non elaborabili per ulteriori analisi
Nei sistemi di produzione, è importante gestire i dati problematici. Se possibile, convalidi e correggi i dati in-stream. Quando la correzione non è possibile, registra il valore in una coda di messaggi non elaborati, a volte chiamata coda di messaggi non recapitabili, per un'analisi successiva. Di solito i problemi si verificano quando si convertono dati da un formato a un altro, ad esempio durante la conversione di stringhe JSON in Righe.
Per risolvere questo problema, utilizza una trasformazione a più output per trasferire gli elementi contenenti i dati non elaborati a un'altra PCollection per ulteriori analisi. Questa è un'operazione comune che potresti voler utilizzare in molte posizioni di una pipeline. Cerca di fare in modo che la trasformazione sia abbastanza generica da poter essere usata in più luoghi. Per prima cosa, crea un oggetto errore per aggregare le proprietà comuni, tra cui i dati originali. Poi, crea una trasformazione sink con più opzioni per la destinazione.
Esempi
Applica le trasformazioni di convalida dei dati in modo seriale
Spesso i dati raccolti da sistemi esterni devono essere puliti. Strutturare la pipeline in modo che possa correggere i dati problematici in-stream quando possibile. Invia i dati a una coda per ulteriori analisi quando necessario.
Poiché un singolo messaggio potrebbe essere interessato da più problemi che devono essere corretti, pianifica il grafo diretto aciclico (DAG) necessario. Se un elemento contiene dati con più difetti, devi assicurarti che l'elemento passi attraverso le trasformazioni appropriate.
Ad esempio, immagina un elemento con i seguenti valori, nessuno dei quali deve essere nullo:
{"itemA": null,"itemB": null}
Assicurati che l'elemento passi attraverso trasformazioni che risolvono entrambi i potenziali problemi:
badElements.apply(fixItemA).apply(fixItemB)
La pipeline potrebbe includere più passaggi seriali, ma fusion aiuta a ridurre al minimo l'overhead di elaborazione introdotto.
Esempio
Utilizza DoFn.StartBundle
per creare micro-batch delle chiamate ai servizi esterni
Potrebbe essere necessario richiamare API esterne come parte della pipeline. Poiché una pipeline distribuisce il lavoro tra molte risorse di computing, effettuare una singola chiamata per ogni elemento che passa attraverso il sistema può sovraccaricare un endpoint di servizio esterno. Questo problema è particolarmente comune se non hai applicato alcuna funzione di riduzione.
Per evitare questo problema, esegui chiamate collettive a sistemi esterni.
Puoi effettuare chiamate in batch utilizzando una trasformazione GroupByKey
o
l'API Apache Beam Timer. Tuttavia, entrambi questi approcci richiedono lo shuffling, il che introduce un certo overhead per l'elaborazione e la necessità di un numero magico per determinare lo spazio delle chiavi.
Utilizza invece gli elementi StartBundle
e FinishBundle
del ciclo di vita per raggruppare i dati. Con queste opzioni, non è necessario shuffling.
Uno svantaggio minore di questa opzione è che le dimensioni dei pacchetti vengono determinate in modo dinamico dall'implementazione dell'elemento runner in base a ciò che sta succedendo attualmente all'interno della pipeline e ai suoi worker. In modalità stream, i bundle sono spesso di dimensioni ridotte. Il raggruppamento di Dataflow è influenzato da fattori di backend come l'utilizzo della suddivisione in segmenti, la quantità di dati disponibili per una determinata chiave e la velocità effettiva della pipeline.
Esempio
EventItemCorrectionService.java
Utilizza un pattern di input laterale appropriato per l'arricchimento dei dati
Nelle applicazioni di analisi dei flussi di dati, i dati sono spesso arricchiti con informazioni aggiuntive che potrebbero essere utili per ulteriori analisi. Ad esempio, se hai l'ID negozio per una transazione, potresti aggiungere informazioni sulla posizione del negozio. Queste informazioni aggiuntive vengono spesso aggiunte prendendo un elemento e inserendo le informazioni da una tabella di ricerca.
Per le tabelle di ricerca che cambiano lentamente e di dimensioni inferiori,
portare la tabella nella pipeline come una classe singleton che
implementa l'interfaccia Map<K,V>
funziona bene. Questa opzione ti consente di evitare che ogni elemento effettui una chiamata API per la sua ricerca. Dopo aver incluso una copia di una tabella nella pipeline, devi aggiornarla periodicamente per mantenerla aggiornata.
Per gestire gli input laterali che si aggiornano lentamente, utilizza i pattern di input laterale di Apache Beam.
Memorizzazione nella cache
Gli input collaterali vengono caricati in memoria e pertanto vengono memorizzati automaticamente nella cache.
Puoi impostare le dimensioni della cache utilizzando l'opzione --setWorkerCacheMb
.
Puoi condividere la cache tra DoFn
istanze e utilizzare trigger esterni per aggiornarla.
Esempio
SlowMovingStoreLocationDimension.java