Google Cloud Platform

Handling Invalid Inputs in Dataflow

ParDos . . . and don’ts: handling invalid inputs in Dataflow using Side Outputs as a “Dead Letter” file

Unstructured data can also be ill-structured. In a lot of cases, we wouldn't like a bad input item to spoil our ability to process the rest of the collection. Since ParDo can be used with more than a single input and output, we can build a variety of structural patterns within a Dataflow pipeline.

In this post, I’ll illustrate how to use a multi-output ParDo (i.e., a side-output) to automatically save all invalid inputs to a dead-letter file for later analysis. Our goal is to keep the pipeline (batch or streaming) running, and have enough data after execution to see what elements were problematic.

If the failure is within the processing code of a DoFn, one way to handle this is to catch the exception, log an error, and then drop the input. The rest of the elements in the pipeline will be processed successfully, so progress can be made as normal. But just logging the elements isn’t ideal because it doesn’t provide an easy way to see these malformed inputs and reprocess them later.

A better way to solve this would be to have a dead letter file where all of the failing inputs are written for later analysis and reprocessing. We can use a side output in Dataflow to accomplish this goal. For example:


The branch for processing the successful results could contain a complete pipeline. Implementing this requires introducing TupleTags for each of the outputs. Then we output any failing elements to the "deadLetter" TupleTag:​

  final TupleTag<Output> successTag = new TupleTag<>() {};
final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* … */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {
  void processElement(ProcessContext c) {
  try {
  } catch (Exception e) {
    LOG.severe("Failed to process input {} -- adding to dead letter file",
      c.element(), e);
    c.sideOutput(deadLetterTag, c.element());
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead letter inputs to a BigQuery table for later analysis
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing as desired ...

This pattern applies equally well to batch and streaming pipelines, since we're simply using combinators from the Unified Model. Today we saw how to use a multiple-output ParDo to improve our handling of malformed input elements in a Dataflow pipeline. See the Dataflow documentation and keep visiting us for future blog posts for more ParDos... and don'ts!