Pattern di attività Java

L'applicazione di esempio per l'e-commerce illustra le best practice per l'utilizzo di Dataflow per implementare i flussi di dati analisi e AI in tempo reale. L'esempio contiene pattern di attività che mostrano il modo migliore per svolgere attività di programmazione Java. Queste attività sono comunemente necessarie per per la creazione di applicazioni di e-commerce.

L'applicazione contiene i seguenti pattern di attività Java:

Utilizzare gli schemi Apache Beam per lavorare con i dati strutturati

Puoi utilizzare gli schemi Apache Beam per semplificare l'elaborazione dei dati strutturati.

La conversione degli oggetti in Righe ti consente di produrre codice Java molto pulito, il che semplifica l'esercizio di creazione del grafo aciclico diretto (DAG). Puoi anche fare riferimento alle proprietà degli oggetti come campi all'interno delle istruzioni di analisi che crei, anziché dover chiamare metodi.

Esempio

CountViewsPerProduct.java

Utilizzare JsonToRow per convertire i dati JSON

L'elaborazione delle stringhe JSON in Dataflow è un'esigenza comune. Ad esempio, JSON le stringhe vengono elaborate durante il flusso di informazioni dei flussi di clic acquisite dal web diverse applicazioni. Per elaborare le stringhe JSON, devi convertirle in Righe oppure oggetti Java semplici (POJO) durante l'elaborazione della pipeline.

Puoi utilizzare la trasformazione integrata di Apache Beam JsonToRow per convertire le stringhe JSON in righe. Tuttavia, se vuoi per l'elaborazione dei messaggi non riusciti, devi creare separatamente, consulta Coda di dati non elaborabili per ulteriori analisi.

Se devi convertire una stringa JSON in POJO utilizzando AutoValue, registrare uno schema per il tipo utilizzando @DefaultSchema(AutoValueSchema.class), quindi utilizza l'annotazione Converti in una classe di utenza. Il codice risultante è simile al seguente:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

Per ulteriori informazioni, incluso quali tipi di Java puoi dedurre schemi da, vedi Creazione di schemi.

Se JsonToRow non funziona con i tuoi dati, Gson è un'alternativa. Gson è piuttosto rilassato nell'elaborazione predefinita dei dati, il che potrebbe richiedono una maggiore convalida nel processo di conversione dei dati.

Esempi

Utilizza il generatore di codice AutoValue per generare POJO

Schemi Apache Beam sono spesso il modo migliore per rappresentare gli oggetti in una pipeline, per via del loro ti consentono di lavorare con i dati strutturati. A volte, tuttavia, Oggetto Java semplice (POJO) come quando si gestiscono gli oggetti chiave-valore o lo stato degli oggetti. La creazione manuale di POJO richiede la codifica di sostituzioni per i metodi equals() e hashcode(), che può essere dispendiosa in termini di tempo e soggetta a errori. Sostituzioni errate potrebbero comportare un comportamento incoerente dell'applicazione o la perdita di dati.

Per generare POJO, utilizza il generatore di classi AutoValue. Questa opzione garantisce che vengano utilizzate le sostituzioni necessarie consente di evitare potenziali errori. AutoValue è molto usato all'interno del codebase Apache Beam, quindi la familiarità con lo strumento per la creazione di corsi è utile se vuoi sviluppare Pipeline Apache Beam su Dataflow con Java.

Puoi anche AutoValue con gli schemi Apache Beam se aggiungi un Annotazione @DefaultSchema(AutoValueSchema.class). Per ulteriori informazioni, vedi Creazione di schemi.

Per saperne di più su AutoValue, consulta Perché AutoValue? e la documentazione di AutoValue.

Esempio

Clickstream.java

Inserire in coda i dati non elaborabili per ulteriori analisi

Nei sistemi di produzione, è importante gestire i dati problematici. Se possibile, convalidi e correggi i dati in-stream. Durante la correzione non è possibile, registrare il valore in una coda di messaggi non elaborati, a volte chiamata messaggi non recapitabili per analisi successive. I problemi si verificano di solito durante la conversione dei dati da un formato all'altro, ad esempio durante la conversione di stringhe JSON in Righe.

Per risolvere questo problema, utilizza una trasformazione con output multiplo per inviare elementi contenenti i dati non elaborati in un altro PCollection per ulteriori analisi. Questa elaborazione è un'operazione comune che potresti volere utilizzare in molti punti di una pipeline. Prova a rendere la trasformazione generica abbastanza da poter essere usata in più luoghi. Innanzitutto, crea un oggetto di errore aggrega le proprietà comuni, inclusi i dati originali. A questo punto, crea una trasformazione in cui sono disponibili più opzioni per la destinazione.

Esempi

Applica le trasformazioni di convalida dei dati in modo seriale

I dati raccolti da sistemi esterni spesso devono essere puliti. Strutturazione della pipeline in modo da poter correggere i dati problematici in-stream, se possibile. Invia i dati a una coda per ulteriori analisi quando necessario.

Poiché un singolo messaggio potrebbe presentare più problemi che devono essere corretti, pianificare le grafo diretto aciclico (DAG). Se un elemento contiene dati con più difetti, devi assicurarti che sia che passa attraverso le trasformazioni appropriate.

Ad esempio, immagina un elemento con i seguenti valori, nessuno dei quali deve essere null:

{"itemA": null,"itemB": null}

Assicurati che l'elemento venga sottoposto a trasformazioni che correggono entrambi i potenziali problemi:

badElements.apply(fixItemA).apply(fixItemB)

La pipeline potrebbe avere più passaggi seriali, fusione aiuta a ridurre al minimo l'overhead di elaborazione introdotto.

Esempio

ValidateAndCorrectCSEvt.java

Usa DoFn.StartBundle per eseguire micro-batch delle chiamate a servizi esterni

Potresti dover invocare API esterne all'interno della pipeline. Poiché una pipeline distribuisce il lavoro su molte risorse di calcolo, effettuare una singola chiamata per ogni elemento che attraversa il sistema può sovraccaricare un endpoint di servizio esterno. Questo problema è particolarmente comune quando non hai qualsiasi funzione di riduzione.

Per evitare questo problema, esegui le chiamate in batch a sistemi esterni.

Puoi effettuare chiamate in batch utilizzando una trasformazione GroupByKey o utilizzando l'API Apache Beam Timer. Tuttavia, entrambi gli approcci richiedono shuffling, che introduce un certo overhead per l'elaborazione e la necessità numero magico per determinare lo spazio delle chiavi.

Utilizza invece StartBundle e FinishBundle del ciclo di vita per raggruppare i dati. Con queste opzioni, non è necessario eseguire lo shuffling.

Un piccolo svantaggio di questa opzione è che le dimensioni dei pacchetti vengono determinate in modo dinamico del runner in base a ciò che accade attualmente all'interno della pipeline e dei relativi worker. In modalità stream, i bundle sono spesso di dimensioni ridotte dimensioni. Il raggruppamento dei flussi di dati è influenzato da fattori di backend come l'utilizzo dello sharding, la quantità di dati disponibili per una determinata chiave e la velocità effettiva della pipeline.

Esempio

EventItemCorrectionService.java

Utilizza un pattern di input aggiuntivi appropriato per l'arricchimento dei dati

Nelle applicazioni di analisi dei flussi di dati, i dati sono spesso arricchiti con informazioni che potrebbero essere utili per ulteriori analisi. Ad esempio, se disponi l'ID negozio per una transazione, potresti aggiungere informazioni sul negozio in ogni località. Queste informazioni aggiuntive vengono spesso aggiunte prendendo e recuperare informazioni da una tabella di ricerca.

Per le tabelle di ricerca che cambiano lentamente e hanno dimensioni inferiori, portando la tabella nella pipeline come una classe singleton implementa l'interfaccia Map<K,V> funziona bene. Questa opzione ti consente di evitare ogni elemento esegue una chiamata API per la sua ricerca. Dopo aver incluso una copia di una tabella devi aggiornarlo periodicamente per mantenerlo sempre al passo.

Per gestire gli input lato aggiornamento lento, utilizza Apache Beam Pattern di input aggiuntivi.

Memorizzazione nella cache

Gli input aggiuntivi vengono caricati in memoria e quindi memorizzati nella cache automaticamente.

Puoi impostare le dimensioni della cache utilizzando l'opzione --setWorkerCacheMb.

Puoi condividere la cache tra DoFn istanze e utilizzare trigger esterni per aggiornarla.

Esempio

SlowMovingStoreLocationDimension.java