Google Cloud Big Data and Machine Learning Blog

Innovation in data processing and machine learning technology

Handling Invalid Inputs in Dataflow

Friday, January 29, 2016

ParDos . . . and don’ts: handling invalid inputs in Dataflow using Side Outputs as a “Dead Letter” file

Unstructured data can also be ill-structured. In a lot of cases, we wouldn't like a bad input item to spoil our ability to process the rest of the collection. Since ParDo can be used with more than a single input and output, we can build a variety of structural patterns within a Dataflow pipeline.

In this post, I’ll illustrate how to use a multi-output ParDo (i.e., a side-output) to automatically save all invalid inputs to a dead-letter file for later analysis. Our goal is to keep the pipeline (batch or streaming) running, and have enough data after execution to see what elements were problematic.

If the failure is within the processing code of a DoFn, one way to handle this is to catch the exception, log an error, and then drop the input. The rest of the elements in the pipeline will be processed successfully, so progress can be made as normal. But just logging the elements isn’t ideal because it doesn’t provide an easy way to see these malformed inputs and reprocess them later.

A better way to solve this would be to have a dead letter file where all of the failing inputs are written for later analysis and reprocessing. We can use a side output in Dataflow to accomplish this goal. For example:

The branch for processing the successful results could contain a complete pipeline. Implementing this requires introducing TupleTags for each of the outputs. Then we output any failing elements to the "deadLetter" TupleTag:​

final TupleTag<Output> successTag = new TupleTag<>() {};
final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* … */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {  
  @Override  
  void processElement(ProcessContext c) {    
  try {      
    c.output(process(c.element());    
  } catch (Exception e) {      
    LOG.severe("Failed to process input {} -- adding to dead letter file",        
      c.element(), e);      
    c.sideOutput(deadLetterTag, c.element());  
  }
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)      
  .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);

// and continue processing as desired ...

This pattern applies equally well to batch and streaming pipelines, since we're simply using combinators from the Unified Model. Today we saw how to use a multiple-output ParDo to improve our handling of malformed input elements in a Dataflow pipeline. See the Dataflow documentation and keep visiting us for future blog posts for more ParDos... and don'ts!

Posted by Ben Chambers, Software Engineer

  • Big Data Solutions

  • Product deep dives, technical comparisons, how-to's and tips and tricks for using the latest data processing and machine learning technologies.

  • Learn More

12 Months FREE TRIAL

Try BigQuery, Machine Learning and other cloud products and get $300 free credit to spend over 12 months.

TRY IT FREE

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.