L'applicazione di esempio per l'e-commerce illustra le best practice per l'utilizzo di Dataflow per implementare i flussi di dati analisi e AI in tempo reale. L'esempio contiene pattern di attività che mostrano il modo migliore per svolgere attività di programmazione Java. Queste attività sono comunemente necessarie per per la creazione di applicazioni di e-commerce.
L'applicazione contiene i seguenti pattern di attività Java:
- Utilizzare gli schemi Apache Beam per lavorare con i dati strutturati
- Utilizzare JsonToRow per convertire i dati JSON
- Utilizza il generatore di codice
AutoValue
per generare POJO (Plain Old Java Object) - Mettere in coda i dati non elaborabili per ulteriori analisi
- Applicare le trasformazioni della convalida dei dati in serie
- Utilizza
DoFn.StartBundle
per eseguire micro-batch delle chiamate a servizi esterni - Utilizzare un pattern di input aggiuntivi appropriato
Utilizzare gli schemi Apache Beam per lavorare con i dati strutturati
Puoi utilizzare gli schemi Apache Beam per semplificare l'elaborazione dei dati strutturati.
La conversione degli oggetti in Righe ti consente di produrre codice Java molto pulito, il che semplifica l'esercizio di creazione del grafo aciclico diretto (DAG). Puoi anche fare riferimento alle proprietà degli oggetti come campi all'interno delle istruzioni di analisi che crei, anziché dover chiamare metodi.
Esempio
Utilizzare JsonToRow per convertire i dati JSON
L'elaborazione delle stringhe JSON in Dataflow è un'esigenza comune. Ad esempio, JSON le stringhe vengono elaborate durante il flusso di informazioni dei flussi di clic acquisite dal web diverse applicazioni. Per elaborare le stringhe JSON, devi convertirle in Righe oppure oggetti Java semplici (POJO) durante l'elaborazione della pipeline.
Puoi utilizzare la trasformazione integrata di Apache Beam JsonToRow per convertire le stringhe JSON in righe. Tuttavia, se vuoi per l'elaborazione dei messaggi non riusciti, devi creare separatamente, consulta Coda di dati non elaborabili per ulteriori analisi.
Se devi convertire una stringa JSON in POJO utilizzando AutoValue,
registrare uno schema per il tipo utilizzando
@DefaultSchema(AutoValueSchema.class)
, quindi utilizza l'annotazione
Converti
in una classe di utenza. Il codice risultante è simile al seguente:
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))
Per ulteriori informazioni, incluso quali tipi di Java puoi dedurre schemi da, vedi Creazione di schemi.
Se JsonToRow non funziona con i tuoi dati, Gson è un'alternativa. Gson è piuttosto rilassato nell'elaborazione predefinita dei dati, il che potrebbe richiedono una maggiore convalida nel processo di conversione dei dati.
Esempi
Utilizza il generatore di codice AutoValue
per generare POJO
Schemi Apache Beam
sono spesso il modo migliore per rappresentare gli oggetti in una pipeline, per via del loro
ti consentono di lavorare con i dati strutturati. A volte, tuttavia,
Oggetto Java semplice (POJO)
come quando si gestiscono gli oggetti chiave-valore o lo stato degli oggetti.
La creazione manuale di POJO richiede la codifica di sostituzioni per i metodi equals()
e
hashcode()
, che può essere dispendiosa in termini di tempo e soggetta a errori. Sostituzioni errate
potrebbero comportare un comportamento incoerente dell'applicazione o la perdita di dati.
Per generare POJO, utilizza il generatore di classi AutoValue
. Questa opzione garantisce che vengano utilizzate le sostituzioni necessarie
consente di evitare potenziali errori.
AutoValue
è molto usato all'interno del codebase Apache Beam,
quindi la familiarità con lo strumento per la creazione di corsi è utile se vuoi sviluppare
Pipeline Apache Beam su Dataflow con Java.
Puoi anche AutoValue
con gli schemi Apache Beam se aggiungi un
Annotazione @DefaultSchema(AutoValueSchema.class)
. Per ulteriori informazioni, vedi
Creazione di schemi.
Per saperne di più su AutoValue
, consulta Perché AutoValue?
e la documentazione di AutoValue
.
Esempio
Inserire in coda i dati non elaborabili per ulteriori analisi
Nei sistemi di produzione, è importante gestire i dati problematici. Se possibile, convalidi e correggi i dati in-stream. Durante la correzione non è possibile, registrare il valore in una coda di messaggi non elaborati, a volte chiamata messaggi non recapitabili per analisi successive. I problemi si verificano di solito durante la conversione dei dati da un formato all'altro, ad esempio durante la conversione di stringhe JSON in Righe.
Per risolvere questo problema, utilizza una trasformazione con output multiplo per inviare elementi contenenti i dati non elaborati in un altro PCollection per ulteriori analisi. Questa elaborazione è un'operazione comune che potresti volere utilizzare in molti punti di una pipeline. Prova a rendere la trasformazione generica abbastanza da poter essere usata in più luoghi. Innanzitutto, crea un oggetto di errore aggrega le proprietà comuni, inclusi i dati originali. A questo punto, crea una trasformazione in cui sono disponibili più opzioni per la destinazione.
Esempi
Applica le trasformazioni di convalida dei dati in modo seriale
I dati raccolti da sistemi esterni spesso devono essere puliti. Strutturazione della pipeline in modo da poter correggere i dati problematici in-stream, se possibile. Invia i dati a una coda per ulteriori analisi quando necessario.
Poiché un singolo messaggio potrebbe presentare più problemi che devono essere corretti, pianificare le grafo diretto aciclico (DAG). Se un elemento contiene dati con più difetti, devi assicurarti che sia che passa attraverso le trasformazioni appropriate.
Ad esempio, immagina un elemento con i seguenti valori, nessuno dei quali deve essere null:
{"itemA": null,"itemB": null}
Assicurati che l'elemento venga sottoposto a trasformazioni che correggono entrambi i potenziali problemi:
badElements.apply(fixItemA).apply(fixItemB)
La pipeline potrebbe avere più passaggi seriali, fusione aiuta a ridurre al minimo l'overhead di elaborazione introdotto.
Esempio
Usa DoFn.StartBundle
per eseguire micro-batch delle chiamate a servizi esterni
Potresti dover invocare API esterne all'interno della pipeline. Poiché una pipeline distribuisce il lavoro su molte risorse di calcolo, effettuare una singola chiamata per ogni elemento che attraversa il sistema può sovraccaricare un endpoint di servizio esterno. Questo problema è particolarmente comune quando non hai qualsiasi funzione di riduzione.
Per evitare questo problema, esegui le chiamate in batch a sistemi esterni.
Puoi effettuare chiamate in batch utilizzando una trasformazione GroupByKey
o utilizzando
l'API Apache Beam Timer. Tuttavia, entrambi gli approcci richiedono
shuffling, che
introduce un certo overhead per l'elaborazione e la necessità
numero magico
per determinare lo spazio delle chiavi.
Utilizza invece
StartBundle
e
FinishBundle
del ciclo di vita per raggruppare i dati. Con queste opzioni, non è necessario eseguire lo shuffling.
Un piccolo svantaggio di questa opzione è che le dimensioni dei pacchetti vengono determinate in modo dinamico del runner in base a ciò che accade attualmente all'interno della pipeline e dei relativi worker. In modalità stream, i bundle sono spesso di dimensioni ridotte dimensioni. Il raggruppamento dei flussi di dati è influenzato da fattori di backend come l'utilizzo dello sharding, la quantità di dati disponibili per una determinata chiave e la velocità effettiva della pipeline.
Esempio
EventItemCorrectionService.java
Utilizza un pattern di input aggiuntivi appropriato per l'arricchimento dei dati
Nelle applicazioni di analisi dei flussi di dati, i dati sono spesso arricchiti con informazioni che potrebbero essere utili per ulteriori analisi. Ad esempio, se disponi l'ID negozio per una transazione, potresti aggiungere informazioni sul negozio in ogni località. Queste informazioni aggiuntive vengono spesso aggiunte prendendo e recuperare informazioni da una tabella di ricerca.
Per le tabelle di ricerca che cambiano lentamente e hanno dimensioni inferiori,
portando la tabella nella pipeline come una classe singleton
implementa l'interfaccia Map<K,V>
funziona bene. Questa opzione ti consente di evitare
ogni elemento esegue una chiamata API per la sua ricerca. Dopo aver incluso una copia di una tabella
devi aggiornarlo periodicamente per mantenerlo sempre al passo.
Per gestire gli input lato aggiornamento lento, utilizza Apache Beam Pattern di input aggiuntivi.
Memorizzazione nella cache
Gli input aggiuntivi vengono caricati in memoria e quindi memorizzati nella cache automaticamente.
Puoi impostare le dimensioni della cache utilizzando l'opzione --setWorkerCacheMb
.
Puoi condividere la cache tra DoFn
istanze e utilizzare trigger esterni per aggiornarla.
Esempio
SlowMovingStoreLocationDimension.java