Pattern di attività Java

L'applicazione di esempio di e-commerce dimostra le best practice per l'utilizzo di Dataflow per implementare l'analisi dei flussi di dati e l'AI in tempo reale. L'esempio contiene pattern di attività che mostrano il modo migliore per svolgere attività di programmazione Java. Queste attività sono di solito necessarie per creare applicazioni di e-commerce.

L'applicazione contiene i seguenti pattern di attività Java:

Utilizzare gli schemi Apache Beam per lavorare con i dati strutturati

Puoi utilizzare gli schemi Apache Beam per semplificare l'elaborazione dei dati strutturati.

La conversione degli oggetti in Righe ti consente di produrre codice Java molto pulito, il che semplifica l'esercizio di creazione del grafo aciclico diretto (DAG). Puoi anche fare riferimento alle proprietà degli oggetti come campi all'interno delle istruzioni di analisi che crei, anziché dover chiamare metodi.

Esempio

CountViewsPerProduct.java

Utilizzare JsonToRow per convertire i dati JSON

L'elaborazione di stringhe JSON in Dataflow è una necessità comune. Ad esempio, le stringhe JSON vengono elaborate durante lo streaming delle informazioni sui clickstream acquisite dalle applicazioni web. Per elaborare le stringhe JSON, devi convertirle in Righe o POJO (Plain Old Java Object) durante l'elaborazione della pipeline.

Puoi utilizzare la trasformazione JsonToRow integrata di Apache Beam per convertire le stringhe JSON in righe. Tuttavia, se vuoi una coda per l'elaborazione dei messaggi non riusciti, devi crearla separatamente. Consulta Inserimento in coda di dati non elaborabili per ulteriori analisi.

Se devi convertire una stringa JSON in un POJO utilizzando AutoValue, registra uno schema per il tipo utilizzando l'annotazione @DefaultSchema(AutoValueSchema.class), quindi utilizza la classe di utilità Convert. Il codice risultante è simile al seguente:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

Per ulteriori informazioni, inclusi i diversi tipi di Java da cui puoi dedurre gli schemi, consulta la sezione Creare schemi.

Se JsonToRow non funziona con i tuoi dati, Gson è un'alternativa. Gson è abbastanza rilassato nell'elaborazione predefinita dei dati, il che potrebbe richiedere di implementare una maggiore convalida nel processo di conversione dei dati.

Esempi

Utilizza il generatore di codice AutoValue per generare POJO

Gli schemi Apache Beam sono spesso il modo migliore per rappresentare gli oggetti in una pipeline, per il modo in cui ti consentono di lavorare con i dati strutturati. Tuttavia, a volte è necessario un oggetto Java semplice (POJO), ad esempio quando si ha a che fare con oggetti chiave-valore o con la gestione dello stato dell'oggetto. La creazione manuale di POJO richiede la codifica di sostituzioni per i metodi equals() e hashcode(), che può richiedere molto tempo e essere soggetta a errori. Sostituzioni errate potrebbero comportare un comportamento incoerente dell'applicazione o la perdita di dati.

Per generare POJO, utilizza il generatore di classi AutoValue. Questa opzione garantisce che vengano utilizzate le sostituzioni necessarie e consente di evitare potenziali errori. AutoValue è ampiamente utilizzato nel codice sorgente di Apache Beam, quindi la familiarità con questo generatore di classi è utile se vuoi sviluppare pipeline Apache Beam su Dataflow utilizzando Java.

Puoi anche AutoValue con gli schemi Apache Beam se aggiungi un'annotazione @DefaultSchema(AutoValueSchema.class). Per ulteriori informazioni, consulta la sezione Creare schemi.

Per saperne di più su AutoValue, consulta Perché AutoValue? e la documentazione di AutoValue.

Esempio

Clickstream.java

Inserire in coda i dati non elaborabili per ulteriori analisi

Nei sistemi di produzione, è importante gestire i dati problematici. Se possibile, convalida e correggi i dati in tempo reale. Quando la correzione non è possibile, registra il valore in una coda di messaggi non elaborati, a volte chiamata coda di lettere morte, per un'analisi successiva. I problemi si verificano di solito durante la conversione dei dati da un formato all'altro, ad esempio durante la conversione di stringhe JSON in Righe.

Per risolvere il problema, utilizza una trasformazione con più output per trasferire gli elementi contenenti i dati non elaborati in un'altra PCollection per ulteriori analisi. Questa elaborazione è un'operazione comune che potresti volere utilizzare in molti punti di una pipeline. Cerca di rendere la trasformazione sufficientemente generica da poter essere utilizzata in più punti. Per prima cosa, crea un oggetto errore per eseguire il wrapping delle proprietà comuni, inclusi i dati originali. A questo punto, crea una trasformazione di destinazione con più opzioni per la destinazione.

Esempi

Applicazione in serie di trasformazioni della convalida dei dati

I dati raccolti da sistemi esterni spesso richiedono la pulizia. Struttura la pipeline in modo che possa correggere i dati problematici in-stream, se possibile. Invia i dati a una coda per ulteriori analisi se necessario.

Poiché un singolo messaggio potrebbe presentare più problemi che richiedono correzione, pianifica il grafo aciclico diretto (DAG) necessario. Se un elemento contiene dati con più difetti, devi assicurarti che scorra attraverso le trasformazioni appropriate.

Ad esempio, immagina un elemento con i seguenti valori, nessuno dei quali deve essere null:

{"itemA": null,"itemB": null}

Assicurati che l'elemento venga sottoposto a trasformazioni che correggono entrambi i potenziali problemi:

badElements.apply(fixItemA).apply(fixItemB)

La pipeline potrebbe avere più passaggi in serie, ma la fusione consente di ridurre al minimo il sovraccarico di elaborazione introdotto.

Esempio

ValidateAndCorrectCSEvt.java

Utilizzare DoFn.StartBundle per raggruppare in micro-batch le chiamate ai servizi esterni

Potresti dover invocare API esterne all'interno della pipeline. Poiché una pipeline distribuisce il lavoro su molte risorse di calcolo, effettuare una singola chiamata per ogni elemento che attraversa il sistema può sovraccaricare un endpoint di servizio esterno. Questo problema è particolarmente comune quando non hai applicato funzioni di riduzione.

Per evitare questo problema, raggruppa le chiamate ai sistemi esterni.

Puoi raggruppare le chiamate utilizzando una trasformazione GroupByKey o l'API Timer di Apache Beam. Tuttavia, entrambi questi approcci richiedono il rimescolamento, che introduce un sovraccarico di elaborazione e la necessità di un numero magico per determinare lo spazio delle chiavi.

Utilizza invece gli elementi di ciclo di vita StartBundle e FinishBundle per raggruppare i dati. Con queste opzioni non è necessario mescolare i brani.

Un piccolo svantaggio di questa opzione è che le dimensioni dei bundle vengono determinate dinamicamente dall'implementazione del runner in base a ciò che sta accadendo all'interno della pipeline e dei relativi worker. In modalità stream, i bundle sono spesso di piccole dimensioni. Il raggruppamento Dataflow è influenzato da fattori di backend come l'utilizzo dello sharding, la quantità di dati disponibili per una determinata chiave e la velocità effettiva della pipeline.

Esempio

EventItemCorrectionService.java

Utilizza un pattern di input aggiuntivi appropriato per l'arricchimento dei dati

Nelle applicazioni di analisi dei flussi di dati, i dati vengono spesso arricchiti con informazioni aggiuntive che potrebbero essere utili per ulteriori analisi. Ad esempio, se hai l'ID negozio per una transazione, ti consigliamo di aggiungere informazioni sulla sede del negozio. Queste informazioni aggiuntive vengono spesso aggiunte prendendo un elemento e inserendo le informazioni da una tabella di ricerca.

Per le tabelle di ricerca che cambiano lentamente e sono di dimensioni ridotte, inserire la tabella nella pipeline come classe singleton che implementa l'interfaccia Map<K,V> funziona bene. Questa opzione ti consente di evitare che ogni elemento effettui una chiamata all'API per la relativa ricerca. Dopo aver incluso una copia di una tabella nella pipeline, devi aggiornarla periodicamente per mantenerla aggiornata.

Per gestire gli input laterali con aggiornamento lento, utilizza i pattern di input laterali di Apache Beam.

Memorizzazione nella cache

Gli input aggiuntivi vengono caricati in memoria e quindi memorizzati nella cache automaticamente.

Puoi impostare le dimensioni della cache utilizzando l'opzione --setWorkerCacheMb.

Puoi condividere la cache tra più istanze DoFn e utilizzare attivatori esterni per aggiornarla.

Esempio

SlowMovingStoreLocationDimension.java