Pattern di attività Java

L'applicazione di esempio per l'e-commerce illustra le best practice per l'utilizzo di Dataflow al fine di implementare l'analisi dei flussi di dati e l'AI in tempo reale. L'esempio contiene pattern di attività che mostrano il modo migliore per svolgere attività di programmazione Java. Queste attività sono in genere necessarie per creare applicazioni di e-commerce.

L'applicazione contiene i seguenti pattern di attività Java:

Utilizzare gli schemi Apache Beam per lavorare con i dati strutturati

Puoi utilizzare gli schemi di Apache Beam per semplificare l'elaborazione dei dati strutturati.

La conversione degli oggetti in Righe consente di produrre codice Java molto chiaro, che semplifica l'esercizio di creazione di grafi diretti aciclici (DAG). Puoi anche fare riferimento alle proprietà dell'oggetto come campi nelle istruzioni di analisi create, anziché dover chiamare i metodi.

Esempio

CountViewsPerProduct.java

Utilizzare JsonToRow per convertire i dati JSON

L'elaborazione di stringhe JSON in Dataflow è un'esigenza comune. Ad esempio, le stringhe JSON vengono elaborate durante il flusso di informazioni sui flussi di clic acquisite dalle applicazioni web. Per elaborare le stringhe JSON, devi convertirle in Righe o Oggetti Java semplici (POJO) durante l'elaborazione della pipeline.

Puoi utilizzare la trasformazione integrata di Apache Beam JsonToRow per convertire le stringhe JSON in righe. Tuttavia, se vuoi una coda per l'elaborazione dei messaggi non riusciti, devi crearla separatamente, consulta Mettere in coda i dati non elaborabili per ulteriori analisi.

Se devi convertire una stringa JSON in un POJO utilizzando AutoValue, registra uno schema per il tipo utilizzando l'annotazione @DefaultSchema(AutoValueSchema.class), quindi utilizza la classe di utilità Converti. Il codice risultante è simile al seguente:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

Per ulteriori informazioni, inclusi i diversi tipi di Java da cui puoi dedurre gli schemi, consulta la sezione Creazione di schemi.

Se JsonToRow non funziona con i tuoi dati, Gson è un'alternativa. Gson è piuttosto rilassato nell'elaborazione predefinita dei dati, il che potrebbe richiedere un'ulteriore convalida nel processo di conversione dei dati.

Esempi

Utilizza il generatore di codice AutoValue per generare POJO

Gli schemi di Apache Beam sono spesso il modo migliore per rappresentare gli oggetti in una pipeline, grazie al modo in cui consentono di lavorare con i dati strutturati. Tuttavia, a volte è necessario un plain vecchio oggetto Java (POJO), ad esempio per gestire oggetti chiave-valore o gestire lo stato degli oggetti. La creazione manuale dei POJO richiede override del codice per i metodi equals() e hashcode(), che possono richiedere molto tempo e causare errori. Gli override errati potrebbero causare un comportamento incoerente dell'applicazione o una perdita di dati.

Per generare POJO, utilizza lo strumento per la creazione delle classi di AutoValue. Questa opzione garantisce l'utilizzo degli override necessari e consente di evitare potenziali errori. AutoValue è molto utilizzato nel codebase Apache Beam, pertanto la familiarità con questo generatore di classi è utile se vuoi sviluppare pipeline Apache Beam su Dataflow utilizzando Java.

Puoi anche AutoValue con gli schemi Apache Beam se aggiungi un'annotazione @DefaultSchema(AutoValueSchema.class). Per ulteriori informazioni, consulta la sezione Creazione di schemi.

Per ulteriori informazioni su AutoValue, consulta Perché AutoValue? e la documentazione AutoValue.

Esempio

Clickstream.java

Mettere in coda i dati non elaborabili per ulteriori analisi

Nei sistemi di produzione, è importante gestire i dati problematici. Se possibile, convalidi e correggi i dati in-stream. Quando la correzione non è possibile, registra il valore in una coda di messaggi non elaborati, a volte chiamata coda di messaggi non recapitabili, per un'analisi successiva. Di solito i problemi si verificano quando si convertono dati da un formato a un altro, ad esempio durante la conversione di stringhe JSON in Righe.

Per risolvere questo problema, utilizza una trasformazione a più output per trasferire gli elementi contenenti i dati non elaborati a un'altra PCollection per ulteriori analisi. Questa è un'operazione comune che potresti voler utilizzare in molte posizioni di una pipeline. Cerca di fare in modo che la trasformazione sia abbastanza generica da poter essere usata in più luoghi. Per prima cosa, crea un oggetto errore per aggregare le proprietà comuni, tra cui i dati originali. Poi, crea una trasformazione sink con più opzioni per la destinazione.

Esempi

Applica le trasformazioni di convalida dei dati in modo seriale

Spesso i dati raccolti da sistemi esterni devono essere puliti. Strutturare la pipeline in modo che possa correggere i dati problematici in-stream quando possibile. Invia i dati a una coda per ulteriori analisi quando necessario.

Poiché un singolo messaggio potrebbe essere interessato da più problemi che devono essere corretti, pianifica il grafo diretto aciclico (DAG) necessario. Se un elemento contiene dati con più difetti, devi assicurarti che l'elemento passi attraverso le trasformazioni appropriate.

Ad esempio, immagina un elemento con i seguenti valori, nessuno dei quali deve essere nullo:

{"itemA": null,"itemB": null}

Assicurati che l'elemento passi attraverso trasformazioni che risolvono entrambi i potenziali problemi:

badElements.apply(fixItemA).apply(fixItemB)

La pipeline potrebbe includere più passaggi seriali, ma fusion aiuta a ridurre al minimo l'overhead di elaborazione introdotto.

Esempio

ValidateAndCorrectCSEvt.java

Utilizza DoFn.StartBundle per creare micro-batch delle chiamate ai servizi esterni

Potrebbe essere necessario richiamare API esterne come parte della pipeline. Poiché una pipeline distribuisce il lavoro tra molte risorse di computing, effettuare una singola chiamata per ogni elemento che passa attraverso il sistema può sovraccaricare un endpoint di servizio esterno. Questo problema è particolarmente comune se non hai applicato alcuna funzione di riduzione.

Per evitare questo problema, esegui chiamate collettive a sistemi esterni.

Puoi effettuare chiamate in batch utilizzando una trasformazione GroupByKey o l'API Apache Beam Timer. Tuttavia, entrambi questi approcci richiedono lo shuffling, il che introduce un certo overhead per l'elaborazione e la necessità di un numero magico per determinare lo spazio delle chiavi.

Utilizza invece gli elementi StartBundle e FinishBundle del ciclo di vita per raggruppare i dati. Con queste opzioni, non è necessario shuffling.

Uno svantaggio minore di questa opzione è che le dimensioni dei pacchetti vengono determinate in modo dinamico dall'implementazione dell'elemento runner in base a ciò che sta succedendo attualmente all'interno della pipeline e ai suoi worker. In modalità stream, i bundle sono spesso di dimensioni ridotte. Il raggruppamento di Dataflow è influenzato da fattori di backend come l'utilizzo della suddivisione in segmenti, la quantità di dati disponibili per una determinata chiave e la velocità effettiva della pipeline.

Esempio

EventItemCorrectionService.java

Utilizza un pattern di input laterale appropriato per l'arricchimento dei dati

Nelle applicazioni di analisi dei flussi di dati, i dati sono spesso arricchiti con informazioni aggiuntive che potrebbero essere utili per ulteriori analisi. Ad esempio, se hai l'ID negozio per una transazione, potresti aggiungere informazioni sulla posizione del negozio. Queste informazioni aggiuntive vengono spesso aggiunte prendendo un elemento e inserendo le informazioni da una tabella di ricerca.

Per le tabelle di ricerca che cambiano lentamente e di dimensioni inferiori, portare la tabella nella pipeline come una classe singleton che implementa l'interfaccia Map<K,V> funziona bene. Questa opzione ti consente di evitare che ogni elemento effettui una chiamata API per la sua ricerca. Dopo aver incluso una copia di una tabella nella pipeline, devi aggiornarla periodicamente per mantenerla aggiornata.

Per gestire gli input laterali che si aggiornano lentamente, utilizza i pattern di input laterale di Apache Beam.

Memorizzazione nella cache

Gli input collaterali vengono caricati in memoria e pertanto vengono memorizzati automaticamente nella cache.

Puoi impostare le dimensioni della cache utilizzando l'opzione --setWorkerCacheMb.

Puoi condividere la cache tra DoFn istanze e utilizzare trigger esterni per aggiornarla.

Esempio

SlowMovingStoreLocationDimension.java