Best practice per la pipeline Dataflow

Questa pagina descrive le best practice da utilizzare durante lo sviluppo delle pipeline Dataflow. L'utilizzo di queste best practice offre i seguenti vantaggi:

  • Migliora l'osservabilità e le prestazioni della pipeline
  • Maggiore produttività degli sviluppatori
  • Migliora la testbilità delle pipeline

Gli esempi di codice Apache Beam in questa pagina utilizzano Java, ma i contenuti si applicano agli SDK Apache Beam, Python e Go.

Domande da considerare

Durante la progettazione della pipeline, poniti le seguenti domande:

  • Dove sono archiviati i dati di input della tua pipeline? Quanti set di dati di input hai?
  • Che aspetto hanno i tuoi dati?
  • Cosa vuoi fare con i tuoi dati?
  • Dove devono essere inviati i dati di output della tua pipeline?
  • Il job Dataflow utilizza Assured Workloads?

Utilizzare i modelli

Per accelerare lo sviluppo della pipeline, anziché creare una pipeline scrivendo codice Apache Beam, se possibile, utilizza un modello Dataflow. I modelli offrono i seguenti vantaggi:

  • I modelli sono riutilizzabili.
  • I modelli ti consentono di personalizzare ogni job modificando parametri specifici della pipeline.
  • Chiunque fornisci le autorizzazioni può utilizzare il modello per eseguire il deployment della pipeline. Ad esempio, uno sviluppatore può creare un job da un modello e un data scientist dell'organizzazione può eseguire il deployment di quel modello in un secondo momento.

Puoi utilizzare un modello fornito da Google o crearne uno tuo. Alcuni modelli forniti da Google consentono di aggiungere una logica personalizzata come passaggio della pipeline. Ad esempio, il modello Pub/Sub a BigQuery fornisce un parametro per eseguire una funzione JavaScript definita dall'utente (UDF) archiviata in Cloud Storage.

Poiché i modelli forniti da Google sono open source secondo la licenza Apache 2.0, puoi utilizzarli come base per nuove pipeline. I modelli sono utili anche come esempi di codice. Visualizza il codice del modello nel repository di GitHub.

Assured Workloads

Assured Workloads aiuta ad applicare i requisiti di sicurezza e conformità per i clienti di Google Cloud. Ad esempio, Le regioni e l'assistenza nell'UE con controlli di sovranità aiutano a garantire la residenza e le garanzie di sovranità dei dati per i clienti residenti nell'UE. Per fornire queste funzionalità, alcune funzionalità di Dataflow sono limitate o limitate. Se utilizzi Assured Workloads con Dataflow, tutte le risorse a cui accede la tua pipeline devono trovarsi nel progetto o nella cartella Assured Workloads dell'organizzazione. Le risorse includono:

  • Bucket Cloud Storage
  • Set di dati di BigQuery
  • Argomenti e sottoscrizioni Pub/Sub
  • Set di dati Firestore
  • Connettori I/O

In Dataflow, per i job di elaborazione in modalità flusso creati dopo il 7 marzo 2024, tutti i dati utente sono criptati con CMEK.

Per i job di flussi di dati creati prima del 7 marzo 2024, le chiavi dati utilizzate nelle operazioni basate sulle chiavi, come windowing, raggruppamento e unione, non sono protette dalla crittografia CMEK. Per abilitare questa crittografia per i tuoi job, scarica o annulla il job, quindi riavvialo. Per maggiori informazioni, consulta Crittografia degli artefatti dello stato della pipeline.

Condividi i dati tra le pipeline

Non esiste un meccanismo di comunicazione tra pipeline tra pipeline specifico di Dataflow per la condivisione dei dati o del contesto di elaborazione tra le pipeline. Puoi utilizzare uno spazio di archiviazione durevole come Cloud Storage o una cache in memoria come App Engine per condividere i dati tra istanze della pipeline.

Pianifica job

Puoi automatizzare l'esecuzione della pipeline nei seguenti modi:

Best practice per la scrittura del codice della pipeline

Le seguenti sezioni forniscono le best practice da utilizzare quando si creano pipeline scrivendo codice Apache Beam.

Struttura il codice Apache Beam

Per creare pipeline, è prassi comune utilizzare la trasformazione generica Apache Beam con elaborazione parallela di ParDo. Quando applichi una trasformazione ParDo, il codice viene fornito sotto forma di oggetto DoFn. DoFn è una classe dell'SDK Apache Beam che definisce una funzione di elaborazione distribuita.

Puoi considerare il codice di DoFn come piccole entità indipendenti: potenzialmente molte istanze in esecuzione su macchine diverse, senza alcuna conoscenza delle altre. Di conseguenza, consigliamo di creare funzioni pure, che sono ideali per la natura parallela e distribuita degli elementi DoFn. Le funzioni pure hanno le seguenti caratteristiche:

  • Le funzioni pure non dipendono dallo stato nascosto o esterno.
  • Non hanno effetti collaterali osservabili.
  • Sono deterministici.

Il modello a funzione pura non è strettamente rigido. Quando il codice non dipende da cose non garantite dal servizio Dataflow, le informazioni sullo stato o i dati di inizializzazione esterni possono essere validi per DoFn e altri oggetti delle funzioni.

Quando struttura le trasformazioni ParDo e crei gli elementi DoFn, segui queste linee guida:

  • Quando utilizzi l'elaborazione "exactly-once", il servizio Dataflow garantisce che ogni elemento nell'input PCollection venga elaborato da un'istanza DoFn esattamente una volta.
  • Il servizio Dataflow non garantisce quante volte viene richiamato un elemento DoFn.
  • Il servizio Dataflow non garantisce esattamente il modo in cui gli elementi distribuiti vengono raggruppati. Non garantisce se gli elementi vengono elaborati insieme.
  • Il servizio Dataflow non garantisce il numero esatto di istanze DoFn create nel corso di una pipeline.
  • Il servizio Dataflow è a tolleranza di errore e potrebbe riprovare a eseguire il codice più volte se i worker riscontrano problemi.
  • Il servizio Dataflow potrebbe creare copie di backup del tuo codice. Potrebbero verificarsi problemi a causa di effetti collaterali manuali, ad esempio se il codice si basa su file temporanei con nomi non univoci o se ne crea uno.
  • Il servizio Dataflow serializza l'elaborazione degli elementi per ogni istanza DoFn. Il tuo codice non deve essere rigorosamente sicuro per i thread, ma qualsiasi stato condiviso tra più istanze DoFn deve essere sicuro per i thread.

Creare librerie di trasformazioni riutilizzabili

Il modello di programmazione Apache Beam consente di riutilizzare le trasformazioni. Creando una libreria condivisa di trasformazioni comuni, puoi migliorare la riutilizzabilità, la testbilità e la proprietà del codice da parte di diversi team.

Considera i seguenti due esempi di codice Java, che leggono entrambi gli eventi di pagamento. Supponendo che entrambe le pipeline eseguano la stessa elaborazione, possono utilizzare le stesse trasformazioni tramite una libreria condivisa per i passaggi di elaborazione rimanenti.

Il primo esempio proviene da un'origine Pub/Sub illimitata:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

// Initial read transform
PCollection<PaymentEvent> payments =
    p.apply("Read from topic",
        PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
        .apply("Parse strings into payment events",
            ParDo.of(new ParsePaymentEventFn()));

Il secondo esempio proviene da un'origine di database relazionale limitato:

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<PaymentEvent> payments =
    p.apply(
        "Read from database table",
        JdbcIO.<PaymentEvent>read()
            .withDataSourceConfiguration(...)
            .withQuery(...)
            .withRowMapper(new RowMapper<PaymentEvent>() {
              ...
            }));

L'implementazione delle best practice per la riusabilità del codice varia a seconda del linguaggio di programmazione e dello strumento di creazione. Ad esempio, se usi Maven, puoi separare il codice di trasformazione in un modulo dedicato. Puoi quindi includere il modulo come sottomodulo in progetti multimodulo più grandi per pipeline diverse, come mostrato nell'esempio di codice che segue:

// Reuse transforms across both pipelines
payments
    .apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
    .apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
  ...

Per ulteriori informazioni, consulta le seguenti pagine della documentazione di Apache Beam:

Utilizzare code di messaggi non recapitabili per la gestione degli errori

A volte la pipeline non è in grado di elaborare gli elementi. I problemi relativi ai dati sono una causa comune. Ad esempio, un elemento che contiene codice JSON con un formato errato può causare errori di analisi.

Anche se puoi rilevare le eccezioni all'interno del metodo DoFn.ProcessElement, registrare l'errore e rilasciare l'elemento, questo approccio perde i dati e impedisce che i dati vengano ispezionati in un secondo momento per la gestione o la risoluzione dei problemi manuali.

Utilizza invece un pattern chiamato coda di messaggi non recapitabili (coda di messaggi non elaborati). Individua le eccezioni nel metodo DoFn.ProcessElement e registra gli errori. Anziché eliminare l'elemento con errori, utilizza gli output con diramazione per scrivere gli elementi con errori in un oggetto PCollection separato. Questi elementi vengono poi scritti in un data sink per ispezione e gestione in seguito con una trasformazione separata.

Il seguente esempio di codice Java mostra come implementare il pattern della coda dei messaggi non recapitabili.

TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};

PCollection<Input> input = /* ... */;

PCollectionTuple outputTuple =
    input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
        try {
          c.output(process(c.element()));
        } catch (Exception e) {
          LOG.severe("Failed to process input {} -- adding to dead-letter file",
              c.element(), e);
          c.sideOutput(deadLetterTag, c.element());
        }
      }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
    .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...

Utilizza Cloud Monitoring per applicare criteri di monitoraggio e avviso diversi per la coda di messaggi non recapitabili della pipeline. Ad esempio, puoi visualizzare il numero e la dimensione degli elementi elaborati dalla trasformazione dei messaggi non recapitabili e configurare gli avvisi per l'attivazione se vengono soddisfatte determinate condizioni di soglia.

Gestire le mutazioni dello schema

Puoi gestire i dati che presentano schemi imprevisti ma validi utilizzando un modello messaggi non recapitabili, che scrive gli elementi con errori in un oggetto PCollection separato. In alcuni casi, potresti voler gestire automaticamente come elementi validi gli elementi che riflettono uno schema mutato. Ad esempio, se lo schema di un elemento riflette una mutazione come l'aggiunta di nuovi campi, puoi adattare lo schema del sink di dati in base alle mutazioni.

La mutazione automatica dello schema si basa sull'approccio di diramazione dell'output utilizzato dal pattern dei messaggi non recapitabili. Tuttavia, in questo caso attiva una trasformazione che modifica lo schema di destinazione ogni volta che vengono riscontrati schemi additivi. Per un esempio di questo approccio, consulta Come gestire gli schemi JSON mutanti in una pipeline di inserimento flussi, con Square Enix sul blog di Google Cloud.

Decidi come unire i database

L'unione dei set di dati è un caso d'uso comune per le pipeline di dati. Puoi utilizzare gli input laterali o la trasformazione CoGroupByKey per eseguire i join nella pipeline. Ognuna ha vantaggi e svantaggi.

Gli input aggiuntivi forniscono un modo flessibile per risolvere problemi comuni di elaborazione dei dati, come l'arricchimento dei dati e le ricerche con chiave. A differenza degli oggetti PCollection, gli input secondari sono modificabili e possono essere determinati in fase di runtime. Ad esempio, i valori in un input laterale potrebbero essere calcolati da un altro ramo della pipeline o determinati chiamando un servizio remoto.

Dataflow supporta gli input laterali memorizzando i dati in modo permanente nell'archiviazione permanente, come in un disco condiviso. Con questa configurazione, l'input laterale completo è disponibile per tutti i worker.

Tuttavia, le dimensioni dell'input laterale possono essere molto grandi e potrebbero non rientrare nella memoria del worker. La lettura da un input laterale di grandi dimensioni può causare problemi di prestazioni se i worker hanno bisogno di leggere costantemente dall'archiviazione permanente.

La trasformazione CoGroupByKey è una trasformazione Apache Beam principale che unisce (unisce) più oggetti PCollection e raggruppa elementi che hanno una chiave comune. A differenza di un input secondario, che rende disponibili per ogni worker tutti i dati di input laterale, CoGroupByKey esegue un'operazione di shuffling (raggruppamento) per distribuire i dati tra i worker. CoGroupByKey è quindi l'ideale quando gli oggetti PCollection che vuoi unire sono molto grandi e non rientrano nella memoria dei worker.

Segui queste linee guida per decidere se utilizzare input collaterali o CoGroupByKey:

  • Utilizza input collaterali quando uno degli oggetti PCollection che stai unendo è sproporzionatamente più piccolo degli altri e l'oggetto PCollection più piccolo si adatta alla memoria dei worker. L'intera memorizzazione nella cache dell'input laterale consente di recuperare gli elementi in modo rapido ed efficiente.
  • Utilizza gli input secondari quando hai un oggetto PCollection che deve essere unito più volte nella pipeline. Anziché utilizzare più trasformazioni CoGroupByKey, crea un singolo input laterale che può essere riutilizzato da più trasformazioni ParDo.
  • Utilizza CoGroupByKey se devi recuperare una proporzione elevata di un oggetto PCollection che supera in modo significativo la memoria del worker.

Per maggiori informazioni, consulta Risolvere i problemi di esaurimento della memoria di Dataflow.

Riduci al minimo le costose operazioni per elemento

Un'istanza DoFn elabora batch di elementi chiamati set, unità di lavoro atomiche composte da zero o più elementi. I singoli elementi vengono quindi elaborati con il metodo DoFn.ProcessElement, che viene eseguito per ogni elemento. Poiché il metodo DoFn.ProcessElement viene chiamato per ogni elemento, qualsiasi operazione, dispendiosa in termini di tempo o di costo dal punto di vista di calcolo, che viene richiamata da quel metodo, viene eseguita per ogni singolo elemento elaborato dal metodo.

Se devi eseguire operazioni costose solo una volta per un batch di elementi, includile nel metodo DoFn.Setup o DoFn.StartBundle anziché nell'elemento DoFn.ProcessElement. Alcuni esempi includono le seguenti operazioni:

  • È in corso l'analisi di un file di configurazione che controlla alcuni aspetti del comportamento dell'istanza DoFn. Richiama questa azione una sola volta, quando l'istanza DoFn viene inizializzata, utilizzando il metodo DoFn.Setup.

  • Creare un'istanza di un client di breve durata che viene riutilizzato in tutti gli elementi di un bundle, ad esempio quando tutti gli elementi del bundle vengono inviati tramite un'unica connessione di rete. Richiama questa azione una volta per bundle utilizzando il metodo DoFn.StartBundle.

Limita le dimensioni dei batch e le chiamate simultanee a servizi esterni

Quando chiami servizi esterni, puoi ridurre l'overhead per chiamata utilizzando la trasformazione GroupIntoBatches. Questa trasformazione crea batch di elementi di una dimensione specificata. Il batch invia gli elementi a un servizio esterno come un payload anziché individualmente.

In combinazione con il raggruppamento, limita il numero massimo di chiamate parallele (in parallelo) al servizio esterno scegliendo le chiavi appropriate per partizionare i dati in entrata. Il numero di partizioni determina il Parallelizzazione massimo. Ad esempio, se a ogni elemento viene assegnata la stessa chiave, una trasformazione downstream per la chiamata al servizio esterno non viene eseguita in parallelo.

Per produrre chiavi per gli elementi, prendi in considerazione uno dei seguenti approcci:

  • Scegli un attributo del set di dati da utilizzare come chiavi di dati, ad esempio gli ID utente.
  • Genera chiavi di dati per suddividere gli elementi in modo casuale in un numero fisso di partizioni, dove il numero di possibili coppie chiave-valore determina il numero di partizioni. Devi creare un numero sufficiente di partizioni per il parallelismo. Ogni partizione deve avere elementi sufficienti affinché la trasformazione GroupIntoBatches sia utile.

Il seguente esempio di codice Java mostra come suddividere casualmente gli elementi in dieci partizioni:

// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;

int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
    sensitiveData
        .apply("Assign data into partitions",
            ParDo.of(new DoFn<String, KV<Long, String>>() {
              Random random = new Random();

              @ProcessElement
              public void assignRandomPartition(ProcessContext context) {
                context.output(
                  KV.of(randomPartitionNumber(), context.element()));
              }
              private static int randomPartitionNumber() {
                return random.nextInt(numPartitions);
              }
            }))
        .apply("Create batches of sensitive data",
            GroupIntoBatches.<Long, String>ofSize(100L));

// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
    .apply("Call Redaction API in batches", callRedactionApiOnBatch());

Identificare i problemi di prestazioni causati da passaggi fusi

Dataflow crea un grafico dei passaggi che rappresenta la pipeline in base alle trasformazioni e ai dati utilizzati per costruirla. Questo grafico è chiamato grafico di esecuzione della pipeline.

Quando esegui il deployment della pipeline, Dataflow può modificare il grafico di esecuzione della pipeline per migliorarne le prestazioni. Ad esempio, Dataflow potrebbe fondere insieme alcune operazioni, un processo noto come ottimizzazione di fusione, per evitare l'impatto sulle prestazioni e sui costi della scrittura di ogni oggetto PCollection intermedio nella tua pipeline.

In alcuni casi, Dataflow potrebbe determinare in modo errato il modo ottimale per fondere le operazioni nella pipeline, il che può limitare la capacità del job di utilizzare tutti i worker disponibili. In questi casi, puoi impedire che le operazioni vengano fusi.

Considera il codice Apache Beam riportato di seguito. Una trasformazione GenerateSequence crea un oggetto PCollection delimitato di piccole dimensioni, che viene poi elaborato ulteriormente da due trasformazioni ParDo downstream.

La trasformazione Find Primes Less-than-N potrebbe essere costosa dal punto di vista del calcolo ed è probabile che venga eseguita lentamente per grandi numeri. Al contrario, la trasformazione Increment Number probabilmente verrà completata rapidamente.

import com.google.common.math.LongMath;
...

public class FusedStepsPipeline {

  final class FindLowerPrimesFn extends DoFn<Long, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Long n = c.element();
      if (n > 1) {
        for (long i = 2; i < n; i++) {
          if (LongMath.isPrime(i)) {
            c.output(Long.toString(i));
          }
        }
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(options);

    PCollection<Long> sequence = p.apply("Generate Sequence",
        GenerateSequence
            .from(0)
            .to(1000000));

    // Pipeline branch 1
    sequence.apply("Find Primes Less-than-N",
        ParDo.of(new FindLowerPrimesFn()));

    // Pipeline branch 2
    sequence.apply("Increment Number",
        MapElements.via(new SimpleFunction<Long, Long>() {
          public Long apply(Long n) {
            return ++n;
          }
        }));

    p.run().waitUntilFinish();
  }
}

Il seguente diagramma mostra una rappresentazione grafica della pipeline nell'interfaccia di monitoraggio di Dataflow.

Rappresentazione del flusso della pipeline nell&#39;interfaccia di Dataflow.

L'interfaccia di monitoraggio di Dataflow mostra che si verifica la stessa velocità di elaborazione lenta per entrambe le trasformazioni, in particolare 13 elementi al secondo. Normalmente, la trasformazione Increment Number elaborerà gli elementi rapidamente, ma sembra essere legata alla stessa velocità di elaborazione di Find Primes Less-than-N.

Il motivo è che Dataflow ha fuso i passaggi in un'unica fase, impedendone l'esecuzione in modo indipendente. Puoi utilizzare il comando gcloud dataflow jobs describe per trovare ulteriori informazioni:

gcloud dataflow jobs describe --full job-id --format json

Nell'output risultante, i passaggi combinati sono descritti nell'oggetto ExecutionStageSummary nell'array ComponentTransform:

...

    "executionPipelineStage": [
      {
        "componentSource": [
          ...
        ],
        "componentTransform": [
          {
            "name": "s1",
            "originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
            "userName": "Generate Sequence/Read(BoundedCountingSource)"
          },
          {
            "name": "s2",
            "originalTransform": "Find Primes Less-than-N",
            "userName": "Find Primes Less-than-N"
          },
          {
            "name": "s3",
            "originalTransform": "Increment Number/Map",
            "userName": "Increment Number/Map"
          }
        ],
        "id": "S01",
        "kind": "PAR_DO_KIND",
        "name": "F0"
      }

...

In questo scenario, poiché la trasformazione Find Primes Less-than-N è il passaggio lento, interrompere la fusione prima di questo passaggio è una strategia appropriata. Un metodo per annullare il raggruppamento dei passaggi consiste nell'inserire una trasformazione e annullare il raggruppamento di GroupByKey prima del passaggio, come mostrato nel seguente esempio di codice Java.

sequence
    .apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
      public KV<Long, Void> apply(Long n) {
        return KV.of(n, null);
      }
    }))
    .apply("Group By Key", GroupByKey.<Long, Void>create())
    .apply("Emit Keys", Keys.<Long>create())
    .apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));

Puoi anche combinare questi passaggi di non fusione in una trasformazione composita riutilizzabile.

Dopo aver annullato i passaggi, quando esegui la pipeline, Increment Number viene completato in pochi secondi e la trasformazione, molto più lunga Find Primes Less-than-N, viene eseguita in una fase separata.

Questo esempio applica un'operazione di gruppo e separazione ai passaggi di unfuse. Puoi utilizzare altri approcci per altre circostanze. In questo caso, la gestione dell'output duplicato non è un problema, dato l'output consecutivo della trasformazione GenerateSequence. Gli oggetti KV con chiavi duplicate vengono deduplicati in una singola chiave nella trasformazione del gruppo (GroupByKey) e nella trasformazione di separazione (Keys). Per conservare i duplicati dopo le operazioni di raggruppamento e separazione, crea coppie chiave-valore seguendo questa procedura:

  1. Utilizza una chiave casuale e l'input originale come valore.
  2. Raggruppa utilizzando la chiave casuale.
  3. Invia i valori per ogni chiave come output.

Puoi anche utilizzare una trasformazione Reshuffle per impedire la fusione delle trasformazioni circostanti. Tuttavia, gli effetti collaterali della trasformazione Reshuffle non sono portabili tra diversi runner Apache Beam.

Per ulteriori informazioni sull'ottimizzazione del parallelismo e della fusione, consulta Ciclo di vita della pipeline.

Utilizzare le metriche Apache Beam per raccogliere insight sulle pipeline

Le metriche Apache Beam sono una classe di utilità che produce metriche per generare report sulle proprietà di una pipeline in esecuzione. Quando utilizzi Cloud Monitoring, le metriche Apache Beam sono disponibili come metriche personalizzate di Cloud Monitoring.

L'esempio seguente mostra le metriche Counter di Apache Beam utilizzate in una sottoclasse DoFn.

Il codice di esempio utilizza due contatori. Un contatore monitora gli errori di analisi JSON (malformedCounter), mentre l'altro monitora se il messaggio JSON è valido, ma contiene un payload vuoto (emptyCounter). In Cloud Monitoring, i nomi delle metrica personalizzata sono custom.googleapis.com/dataflow/malformedJson e custom.googleapis.com/dataflow/emptyPayload. Con le metriche personalizzate, puoi creare visualizzazioni e criteri di avviso in Cloud Monitoring.

final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};

final class ParseEventFn extends DoFn<String, MyObject> {

  private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
  private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
  private Gson gsonParser;

  @Setup
  public setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
      if (myObj.getPayload() != null) {
        //  Output the element if non-empty payload
        c.output(successTag, myObj);
      }
      else {
        // Increment empty payload counter
        emptyCounter.inc();
      }
    }
    catch (JsonParseException e) {
      // Increment malformed JSON counter
      malformedCounter.inc();
      // Output the element to dead-letter queue
      c.output(errorTag, c.element());
    }
  }
}

Scopri di più

Le pagine seguenti forniscono ulteriori informazioni su come strutturare la pipeline, come scegliere le trasformazioni da applicare ai dati e cosa considerare quando scegli i metodi di input e di output della pipeline.

Per ulteriori informazioni sulla creazione del codice utente, consulta i requisiti per le funzioni fornite dall'utente.