Questa pagina illustra le best practice da utilizzare durante lo sviluppo delle pipeline Dataflow. L'utilizzo di queste best practice offre i seguenti vantaggi:
- Migliora l'osservabilità e le prestazioni della pipeline
- Maggiore produttività degli sviluppatori
- Migliora la testabilità della pipeline
Gli esempi di codice Apache Beam in questa pagina utilizzano Java, ma il contenuto si applica alla SDK Apache Beam Java, Python e Go.
Domande da considerare
Quando progetti la tua pipeline, poniti queste domande:
- Dove vengono archiviati i dati di input della pipeline? Quanti set di dati di input che hai?
- Che aspetto hanno i tuoi dati?
- Che cosa vuoi fare con i tuoi dati?
- Dove dovrebbero andare i dati di output della pipeline?
- Il job Dataflow utilizza Assured Workloads?
Utilizzare i modelli
Per accelerare lo sviluppo della pipeline, invece di crearne una scrivendo per il codice Apache Beam, utilizza Modello Dataflow quando possibile. I modelli offrono i seguenti vantaggi:
- I modelli sono riutilizzabili.
- I modelli consentono di personalizzare ogni job modificando parametri specifici della pipeline.
- Chiunque tu conceda le autorizzazioni può utilizzare il modello per eseguire il deployment della pipeline. Ad esempio, uno sviluppatore può creare un job da un modello e un data scientist dell'organizzazione possono eseguire il deployment del modello in un secondo momento.
Puoi utilizzare un modello fornito da Google, oppure crearne uno personalizzato. Alcuni modelli forniti da Google ti consentono di aggiungere la logica personalizzata come passaggio della pipeline. Ad esempio, Modello da Pub/Sub a BigQuery fornisce un parametro per eseguire una funzione JavaScript definita dall'utente in Cloud Storage.
I modelli forniti da Google sono open source con la licenza Apache 2.0, li puoi usare come base per pipeline di dati. I modelli sono utili anche come esempi di codice. Visualizza il modello il codice Repository GitHub.
Assured Workloads
Assured Workloads aiuta ad applicare requisiti di sicurezza e conformità per per i clienti Google Cloud. Ad esempio: Regioni e assistenza nell'UE con controlli di sovranità aiuta ad applicare garanzie di residenza e sovranità dei dati per gli ambienti nell'UE clienti. Per fornire queste funzionalità, alcune funzionalità di Dataflow limitato o limitato. Se utilizzi Assured Workloads con Dataflow, tutte le Le risorse a cui accede la pipeline devono trovarsi nel percorso Progetto o cartella Assured Workloads. Le risorse includono:
- Bucket Cloud Storage
- Set di dati di BigQuery
- Argomenti e sottoscrizioni Pub/Sub
- Set di dati Firestore
- Connettori I/O
In Dataflow, per i job di flussi di dati creati dopo il 7 marzo 2024, vengono criptati con CMEK.
Per i job di flussi di dati creati prima del 7 marzo 2024, le chiavi dei dati vengono utilizzate nelle come windowing, raggruppamento e unione, non sono protette da CMEK la crittografia. Per abilitare questa crittografia per i tuoi job, scarica o annulla il job, quindi riavvialo. Per ulteriori informazioni, vedi Crittografia degli artefatti dello stato della pipeline.
Condividi dati tra pipeline
Non esiste una comunicazione cross-pipeline specifica per Dataflow meccanismo per la condivisione dei dati o l'elaborazione del contesto tra le pipeline. Puoi utilizzare la modalità un'archiviazione durevole come Cloud Storage o una cache in memoria come App Engine e condividere i dati tra istanze della pipeline.
Pianifica job
Puoi automatizzare l'esecuzione della pipeline nei seguenti modi:
- Usa Cloud Scheduler.
- Utilizza l'operatore Dataflow di Apache Airflow, uno dei vari operatori di Google Cloud in un flusso di lavoro di Cloud Composer.
- Esegui processi personalizzati (cron) di job su Compute Engine.
Best practice per la scrittura del codice della pipeline
Le sezioni seguenti forniscono le best practice da utilizzare per la creazione delle pipeline scrivendo il codice Apache Beam.
Struttura il codice Apache Beam
Per creare pipeline, è comune utilizzare il modello
ParDo
con l'elaborazione parallela della trasformazione Apache Beam.
Quando applichi una trasformazione ParDo
, fornisci il codice sotto forma di
Oggetto DoFn
. DoFn
è una classe SDK Apache Beam che definisce un modello
di elaborazione.
Puoi considerare il codice DoFn
come piccole entità indipendenti: possono
potrebbero esserci molte istanze in esecuzione su macchine diverse, ciascuna senza
la conoscenza degli altri. Di conseguenza, consigliamo di creare funzioni pure, che
sono ideali per la natura parallela e distribuita degli elementi DoFn
.
Le funzioni pure hanno le seguenti caratteristiche:
- Le funzioni pure non dipendono dallo stato nascosto o esterno.
- Non hanno effetti collaterali osservabili.
- Sono deterministici.
Il modello di funzione puro non è strettamente rigido. Quando il codice non dipende
a ciò che non sono garantiti dal servizio Dataflow,
informazioni o i dati di inizializzazione esterna possono essere validi per DoFn
e altri
.
Quando strutturando ParDo
trasforma e crea i tuoi elementi DoFn
,
tieni in considerazione le seguenti linee guida:
- Quando utilizzi l'elaborazione "exactly-once",
il servizio Dataflow garantisce che ogni elemento
L'input
PCollection
viene elaborato da un'istanzaDoFn
esattamente una volta. - Il servizio Dataflow non garantisce quante volte un
DoFn
è richiamato. - Il servizio Dataflow non garantisce esattamente il modo in cui vengono raggruppati. Non garantisce quale, se tutti gli elementi vengono elaborati insieme.
- Il servizio Dataflow non garantisce il numero esatto di
DoFn
istanze create nel corso di una pipeline. - Il servizio Dataflow è a tolleranza di errore e potrebbe riprovare il codice più volte se i worker riscontrano problemi.
- Il servizio Dataflow potrebbe creare copie di backup del tuo codice. Potrebbero verificarsi problemi con gli effetti collaterali manuali, ad esempio se il codice si basa oppure crea file temporanei con nomi non univoci.
- Il servizio Dataflow serializza l'elaborazione degli elementi in base al
DoFn
in esecuzione in un'istanza Compute Engine. Il codice non deve essere rigorosamente sicuro tramite thread, ma condiviso tra più istanzeDoFn
deve essere sicuro per thread.
Crea librerie di trasformazioni riutilizzabili
Il modello di programmazione Apache Beam consente di riutilizzare le trasformazioni. Creando una libreria condivisa di trasformazioni comuni, puoi migliorare la riusabilità, la testabilità e la proprietà del codice da parte di vari team.
Considera i due seguenti esempi di codice Java, che leggono entrambi gli eventi di pagamento. Supponendo che entrambe le pipeline eseguano la stessa elaborazione, possono utilizzare lo stesso si trasforma tramite una libreria condivisa per le restanti fasi di elaborazione.
Il primo esempio proviene da un'origine Pub/Sub illimitata:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
// Initial read transform
PCollection<PaymentEvent> payments =
p.apply("Read from topic",
PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
.apply("Parse strings into payment events",
ParDo.of(new ParsePaymentEventFn()));
Il secondo esempio proviene da un'origine di database relazionale vincolata:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<PaymentEvent> payments =
p.apply(
"Read from database table",
JdbcIO.<PaymentEvent>read()
.withDataSourceConfiguration(...)
.withQuery(...)
.withRowMapper(new RowMapper<PaymentEvent>() {
...
}));
Il modo in cui implementi le best practice per la riusabilità del codice varia a seconda del linguaggio di programmazione e per lo strumento di creazione. Ad esempio, se utilizzi Maven puoi separare il codice Transformer in un modulo a sé stante. Puoi quindi includere come sottomodulo nella versione più grande progetti multimodulo per diverse pipeline, come mostrato nell'esempio di codice seguente:
// Reuse transforms across both pipelines
payments
.apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
.apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
...
Per ulteriori informazioni, consulta le seguenti pagine della documentazione di Apache Beam:
- Requisiti per la scrittura del codice utente per le trasformazioni Apache Beam
- Guida di stile per
PTransform
: una guida di stile per gli autori di nuove raccoltePTransform
riutilizzabili
Utilizzare le code di messaggi non recapitabili per la gestione degli errori
A volte la pipeline non è in grado di elaborare gli elementi. Una causa comune sono i problemi relativi ai dati. Ad esempio, un elemento che contiene un file JSON formattato in modo non corretto può causare errori di analisi.
Sebbene sia possibile cogliere le eccezioni all'interno
DoFn.ProcessElement
, registrare l'errore e rilasciare l'elemento, con questo approccio vengono persi i dati
ed evita che i dati vengano ispezionati in un secondo momento per la
gestione manuale o la risoluzione dei problemi.
Utilizza invece un pattern chiamato coda messaggi non recapitabili (coda di messaggi non elaborati).
Rileva le eccezioni nel metodo DoFn.ProcessElement
e registra i dati
errori. Invece di eliminare l'elemento con errori,
usa output con diramazioni per scrivere elementi non riusciti in un file PCollection
separato
. Questi elementi vengono quindi scritti in un data sink per ispezionarli in un secondo momento
e la gestione con una trasformazione separata.
Il seguente esempio di codice Java mostra come implementare la coda di messaggi non recapitabili pattern.
TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* ... */;
PCollectionTuple outputTuple =
input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead-letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
.apply(BigQueryIO.write(...));
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...
Utilizza Cloud Monitoring per applicare diversi tipi di monitoraggio i criteri per la coda messaggi non recapitabili della pipeline. Ad esempio, puoi visualizzare il numero e la dimensione degli elementi elaborati dalla trasformazione dei messaggi non recapitabili configurare gli avvisi in modo che vengano attivati se vengono soddisfatte determinate condizioni di soglia.
Gestire le mutazioni dello schema
Puoi gestire i dati che hanno schemi imprevisti ma validi utilizzando un messaggio non recapitabile
che scrive gli elementi con errori in un oggetto PCollection
separato.
In alcuni casi, potresti voler gestire
automaticamente gli elementi
che riflettono uno schema mutato come elementi validi. Ad esempio, se un elemento
riflette una mutazione come l'aggiunta di nuovi campi, puoi adattare
schema del sink di dati per contenere le mutazioni.
La mutazione automatica dello schema si basa sull'approccio della ramificazione-output utilizzato pattern messaggi non recapitabili. Tuttavia, in questo caso attiva una trasformazione che muta lo schema di destinazione ogni volta che vengono incontrati schemi additivi. Ad esempio, di questo approccio, vedi Come gestire gli schemi JSON mutanti in una pipeline di inserimento flussi con Square Enix sul blog di Google Cloud.
Decidi come unire i set di dati
L'unione di set di dati è un caso d'uso comune per le pipeline di dati. Puoi utilizzare la modalità
o la trasformazione CoGroupByKey
per eseguire i join nella pipeline.
Ognuna ha vantaggi e svantaggi.
Input aggiuntivi
Offrire un modo flessibile per risolvere i problemi comuni di elaborazione dei dati, come
l'arricchimento e le ricerche con chiave. A differenza degli oggetti PCollection
, gli input laterali vengono
è modificabile e può essere determinato in fase di runtime. Ad esempio, i valori in un
l'input aggiuntivo potrebbe essere calcolato da un altro ramo della pipeline o determinato
per chiamare un servizio remoto.
Dataflow supporta gli input aggiuntivi facendo in modo che i dati vengano resi persistenti simile a un disco condiviso. Questa configurazione rende l'input aggiuntivo completo disponibili per tutti i lavoratori.
Tuttavia, le dimensioni di input laterale possono essere molto grandi e potrebbero non rientrare nella memoria dei worker. Per lettura da un input di grandi dimensioni può causare problemi di prestazioni se i worker devono legge costantemente dallo spazio di archiviazione permanente.
La CoGroupByKey
e trasformerai
trasformazione Apache Beam core
che unisce (appiattisce) più oggetti PCollection
e raggruppa gli elementi che
hanno una chiave comune. A differenza di un input aggiuntivo, che rende tutti i dati di input aggiuntivi
disponibile per ogni worker, CoGroupByKey
esegue un'operazione di shuffle (raggruppamento)
per distribuire i dati tra i lavoratori. CoGroupByKey
è quindi ideale quando
PCollection
oggetti che vuoi unire sono molto grandi e non rientrano nel worker
la memoria.
Segui queste linee guida per decidere se usare gli input aggiuntivi o
CoGroupByKey
:
- Utilizza gli input laterali quando uno degli oggetti
PCollection
a cui stai unendo è sproporzionatamente più piccolo degli altri e minore L'oggettoPCollection
rientra nella memoria del worker. Memorizzazione nella cache dell'input aggiuntivo interamente in memoria rende veloce ed efficiente il recupero degli elementi. - Utilizza gli input aggiuntivi quando hai un oggetto
PCollection
che deve essere presenti più volte nella pipeline. Anziché utilizzare piùCoGroupByKey
, crea un singolo input aggiuntivo che può essere riutilizzato da più trasformazioniParDo
. - Utilizza
CoGroupByKey
se devi recuperare una grande proporzione di OggettoPCollection
che supera in modo significativo la memoria dei worker.
Per ulteriori informazioni, vedi Risolvi gli errori di esaurimento della memoria di Dataflow.
Riduci al minimo le costose operazioni per elemento
Un'istanza DoFn
elabora batch di elementi chiamati
set,
che sono unità atomiche di lavoro composte da zero o più
elementi. I singoli elementi vengono poi elaborati
DoFn.ProcessElement
, che viene eseguito per ogni elemento. Poiché DoFn.ProcessElement
viene chiamato per ogni elemento, indipendentemente
per le operazioni costose richiamate da quel metodo
per ogni singolo elemento elaborato con il metodo.
Se devi eseguire operazioni costose una sola volta per un gruppo di elementi,
Includi queste operazioni nel metodo DoFn.Setup
o DoFn.StartBundle
invece che nell'elemento DoFn.ProcessElement
. Alcuni esempi includono
le seguenti operazioni:
Analizzare un file di configurazione che controlla alcuni aspetti di
DoFn
il comportamento dell'istanza. Richiama questa azione solo una volta, quando L'istanzaDoFn
viene inizializzata utilizzandoDoFn.Setup
.Creare un'istanza per un client di breve durata che viene riutilizzato in tutti di elementi di un bundle, ad esempio quando tutti gli elementi del bundle inviati tramite un'unica connessione di rete. Richiama questa azione una volta ogni un bundle utilizzando
DoFn.StartBundle
.
Limita le dimensioni dei batch e le chiamate simultanee a servizi esterni
Quando chiami servizi esterni, puoi ridurre le spese generali per chiamata utilizzando il
GroupIntoBatches
e trasformerai automaticamente. Questa trasformazione crea batch di elementi di una dimensione specificata.
Il batch invia elementi a un servizio esterno come un unico payload,
singolarmente.
In combinazione con la modalità batch, limita il numero massimo (in parallelo) al servizio esterno scegliendo le chiavi appropriate per partizionare i dati in entrata. Il numero di partizioni determina il numero massimo caricamento in contemporanea. Ad esempio, se a ogni elemento viene assegnata la stessa chiave, la trasformazione downstream per la chiamata al servizio esterno non viene eseguita parallelo.
Prendi in considerazione uno dei seguenti approcci per produrre chiavi per gli elementi:
- Scegli un attributo del set di dati da utilizzare come chiavi dei dati, ad esempio gli ID utente.
- Genera chiavi di dati per suddividere gli elementi in modo casuale su un numero fisso di
partizioni, dove il numero di possibili valori chiave determina il numero
delle partizioni. Devi creare un numero sufficiente di partizioni per il parallelismo.
Ogni partizione deve avere abbastanza elementi per
GroupIntoBatches
e trasformare le risorse in modo che siano utili.
Il seguente esempio di codice Java mostra come suddividere casualmente gli elementi in dieci partizioni:
// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;
int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
sensitiveData
.apply("Assign data into partitions",
ParDo.of(new DoFn<String, KV<Long, String>>() {
Random random = new Random();
@ProcessElement
public void assignRandomPartition(ProcessContext context) {
context.output(
KV.of(randomPartitionNumber(), context.element()));
}
private static int randomPartitionNumber() {
return random.nextInt(numPartitions);
}
}))
.apply("Create batches of sensitive data",
GroupIntoBatches.<Long, String>ofSize(100L));
// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
.apply("Call Redaction API in batches", callRedactionApiOnBatch());
Identifica i problemi di rendimento causati da passaggi combinati
Dataflow crea un grafico di passaggi che rappresenta la tua pipeline in base alle trasformazioni e ai dati che hai usato per crearla. Questo grafico è chiamato il grafico di esecuzione della pipeline.
Quando esegui il deployment della pipeline, Dataflow potrebbe modificare
il grafico di esecuzione della pipeline per migliorare le prestazioni. Ad esempio, Dataflow
potrebbe fondere alcune operazioni, un processo noto come
ottimizzazione della fusione,
per evitare l'impatto in termini di prestazioni e costi della scrittura di ogni livello intermedio
PCollection
oggetto nella pipeline.
In alcuni casi, Dataflow potrebbe determinare il modo ottimale per fondere le operazioni nella pipeline, il che può limitare la capacità del tuo lavoro di utilizzare tutti i lavoratori disponibili. In questi casi, è possibile impedire l'unione delle operazioni.
Considera il seguente esempio di codice Apache Beam. R
GenerateSequence
la trasformazione crea un piccolo oggetto PCollection
limitato, che viene poi
ed è elaborato da due trasformazioni ParDo
downstream.
La trasformazione Find Primes Less-than-N
potrebbe essere costosa dal punto di vista del calcolo e
che probabilmente risulteranno rallentati per i numeri elevati. Al contrario,
La trasformazione Increment Number
probabilmente viene completata rapidamente.
import com.google.common.math.LongMath;
...
public class FusedStepsPipeline {
final class FindLowerPrimesFn extends DoFn<Long, String> {
@ProcessElement
public void processElement(ProcessContext c) {
Long n = c.element();
if (n > 1) {
for (long i = 2; i < n; i++) {
if (LongMath.isPrime(i)) {
c.output(Long.toString(i));
}
}
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(options);
PCollection<Long> sequence = p.apply("Generate Sequence",
GenerateSequence
.from(0)
.to(1000000));
// Pipeline branch 1
sequence.apply("Find Primes Less-than-N",
ParDo.of(new FindLowerPrimesFn()));
// Pipeline branch 2
sequence.apply("Increment Number",
MapElements.via(new SimpleFunction<Long, Long>() {
public Long apply(Long n) {
return ++n;
}
}));
p.run().waitUntilFinish();
}
}
Il seguente diagramma mostra una rappresentazione grafica della pipeline in Interfaccia di monitoraggio di Dataflow.
La
Interfaccia di monitoraggio di Dataflow
mostra che si verifica la stessa lenta velocità di elaborazione per entrambe le trasformazioni, in particolare 13
elementi al secondo. Potresti aspettarti che la trasformazione Increment Number
elabori
rapidamente, ma sembra essere legata allo stesso tasso di
in fase di elaborazione come Find Primes Less-than-N
.
Il motivo è che Dataflow ha unito i passaggi in un'unica
che ne impedisce l'esecuzione indipendente. Puoi utilizzare lo
gcloud dataflow jobs describe
per trovare ulteriori informazioni:
gcloud dataflow jobs describe --full job-id --format json
Nell'output risultante, i passaggi combinati sono descritti nel
ExecutionStageSummary
nel
ComponentTransform
array:
...
"executionPipelineStage": [
{
"componentSource": [
...
],
"componentTransform": [
{
"name": "s1",
"originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
"userName": "Generate Sequence/Read(BoundedCountingSource)"
},
{
"name": "s2",
"originalTransform": "Find Primes Less-than-N",
"userName": "Find Primes Less-than-N"
},
{
"name": "s3",
"originalTransform": "Increment Number/Map",
"userName": "Increment Number/Map"
}
],
"id": "S01",
"kind": "PAR_DO_KIND",
"name": "F0"
}
...
In questo scenario, poiché la trasformazione Find Primes Less-than-N
è il passaggio lento,
interrompere la fusione prima di quel passaggio è una strategia adeguata. Un metodo per
per annullare la fusione consiste nell'inserire
GroupByKey
trasforma e separa prima del passaggio, come mostrato nel seguente codice Java
esempio.
sequence
.apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
public KV<Long, Void> apply(Long n) {
return KV.of(n, null);
}
}))
.apply("Group By Key", GroupByKey.<Long, Void>create())
.apply("Emit Keys", Keys.<Long>create())
.apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));
Puoi anche combinare questi passaggi di annullamento della fusione in una trasformazione composita riutilizzabile.
Dopo aver annullato i passaggi, quando esegui la pipeline, Increment Number
viene completata in pochi secondi e la versione, molto più lunga
La trasformazione Find Primes Less-than-N
viene eseguita in una fase separata.
In questo esempio viene applicata un'operazione di raggruppamento e di separazione dei passaggi per annullare l'unione dei passaggi.
Per altre circostanze, puoi avvalerti di altri approcci. In questo caso, la gestione
un output duplicato non è un problema, dato l'output consecutivo
Trasformazione GenerateSequence
.
KV
gli oggetti con chiavi duplicate vengono deduplicati in una singola chiave nel gruppo
(GroupByKey
) trasforma
e separa le due
(Keys
)
e trasformerai automaticamente. Per mantenere i duplicati dopo le operazioni di gruppo e di separazione,
per creare coppie chiave-valore seguendo questa procedura:
- Utilizza una chiave casuale e l'input originale come valore.
- Raggruppare utilizzando la chiave casuale.
- Emetti i valori per ogni chiave come output.
Puoi anche utilizzare
Reshuffle
e trasformerai per evitare la fusione delle trasformazioni circostanti. Tuttavia, gli effetti collaterali del
Reshuffle
Transform non è portabile su diversi
Runner Apache Beam.
Per ulteriori informazioni sul parallelismo e l'ottimizzazione della fusione, consulta Ciclo di vita della pipeline.
Usa le metriche Apache Beam per raccogliere insight sulle pipeline
Le metriche Apache Beam sono una classe di utilità che produce metriche per con le proprietà di una pipeline in esecuzione. Quando utilizza Cloud Monitoring, Le metriche Apache Beam sono disponibili come metriche personalizzate di Cloud Monitoring.
L'esempio seguente mostra Apache Beam
Counter
metriche
utilizzata in una sottoclasse DoFn
.
Il codice di esempio utilizza due contatori. Un contatore monitora gli errori di analisi JSON
(malformedCounter
) e l'altro contatore monitora se il messaggio JSON è
valido, ma contiene un payload vuoto (emptyCounter
). In Cloud Monitoring,
i nomi metrica personalizzata sono custom.googleapis.com/dataflow/malformedJson
e
custom.googleapis.com/dataflow/emptyPayload
. Puoi utilizzare le metriche personalizzate
per creare visualizzazioni e criteri di avviso in Cloud Monitoring.
final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};
final class ParseEventFn extends DoFn<String, MyObject> {
private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
private Gson gsonParser;
@Setup
public setup() {
gsonParser = new Gson();
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
if (myObj.getPayload() != null) {
// Output the element if non-empty payload
c.output(successTag, myObj);
}
else {
// Increment empty payload counter
emptyCounter.inc();
}
}
catch (JsonParseException e) {
// Increment malformed JSON counter
malformedCounter.inc();
// Output the element to dead-letter queue
c.output(errorTag, c.element());
}
}
}
Scopri di più
Le pagine seguenti forniscono ulteriori informazioni su come strutturare la pipeline, come scegliere quali trasformazioni applicare ai dati e cosa considerare quando scegliendo i metodi di input e output della pipeline.
Per ulteriori informazioni su come creare il tuo codice utente, consulta requisiti per le funzioni fornite dall'utente.