Data Encoding

When you create or output pipeline data, you'll need to specify how the elements in your PCollections are encoded and decoded to and from byte strings. Byte strings are used for intermediate storage as well reading from sources and writing to sinks. The Dataflow SDKs use objects called coders to describe how the elements of a given PCollection should be encoded and decoded.

Using Coders

You typically need to specify a coder when reading data into your pipeline from an external source (or creating pipeline data from local data), and also when you output pipeline data to an external sink. See Pipeline I/O for more information.

Java

In the Dataflow SDK for Java, the type Coder provides the methods required for encoding and decoding data. The Dataflow SDK for Java provides a number of Coder subclasses that work with a variety of standard Java types, such as Integer, Long, Double, StringUtf8, BigQuery TableRow, and more. You can find all of the available Coder subclasses in the package com.google.cloud.dataflow.sdk.coders.

When you read data into a pipeline, the coder indicates how to interpret the input data into a language-specific type, such as Integer or String. Likewise, the coder indicates how the language-specific types in your pipeline should be written into byte strings for an output data sink, or to materialize intermediate data in your pipeline.

The Dataflow SDKs set a coder for every PCollection in a pipeline, including those generated as output from a transform. Most of the time, the Dataflow SDKs can automatically infer the correct coder for an output PCollection. See Inferring a Coder for more information.

Note that coders do not necessarily have a 1:1 relationship with types. For example, the Integer type can have multiple valid coders, and input and output data can use different Integer coders. A transform might have Integer-typed input data that uses BigEndianIntegerCoder, and Integer-typed output data that uses VarIntCoder.

Java

You can explicitly set a Coder when inputting or outputting a PCollection. You set the Coder by calling the method .withCoder when you apply your pipeline's Read or Write transform.

Typically, you set the Coder when the coder for a PCollection cannot be automatically inferred, or when you want to use a different coder than your pipeline's default. The following example code reads a set of numbers from a text file, and sets a Coder of type TextualIntegerCoder for the resulting PCollection:

  PCollection<Integer> numbers =
     p.begin()
      .apply(TextIO.Read.named("ReadNumbers")
                        .from("gs://my_bucket/path/to/numbers-*.txt")
                        .withCoder(TextualIntegerCoder.of()));

You can set the coder for an existing PCollection by using the method PCollection.setCoder. Note that you cannot call setCoder on a PCollection that has been finalized (e.g. by calling .apply on it).

You can get the coder for an existing PCollection by using the method getCoder. This method will fail with an IllegalStateException if a coder has not been set and cannot be inferred for the given PCollection.

Coder Inference and Default Coders

The Dataflow SDKs require a coder for every PCollection in your pipeline. Most of the time, however, you do not need to explicitly specify a coder, such as for an intermediate PCollection produced by a transform in the middle of your pipeline. In such cases, the Dataflow SDKs can infer an appropriate coder from the inputs and outputs of the transform used to produce the PCollection.

Java

Each Pipeline object has a CoderRegistry. The CoderRegistry represents a mapping of Java types to the default coders that the pipeline should use for PCollections of each type.

By default, the Dataflow SDK for Java automatically infers the Coder for the elements of an output PCollection using the type parameter from the transform's function object, such as DoFn. In the case of ParDo, for example, a DoFn<Integer, String> function object accepts an input element of type Integer and produces an output element of type String. In such a case, the Dataflow SDK for Java will automatically infer the default Coder for the output PCollection<String> (in the default pipeline CoderRegistry, this is StringUtf8Coder).

Default Coders and the CoderRegistry

Each Pipeline object has a CoderRegistry object, which maps language types to the default coder the pipeline should use for those types. You can use the CoderRegistry yourself to look up the default coder for a given type, or to register a new default coder for a given type.

Java

CoderRegistry contains a default mapping of Coders to standard Java types for any Pipeline you create using the Dataflow SDK for Java. The following table shows the standard mapping:

Java Type Default Coder
Double DoubleCoder
Instant InstantCoder
Integer VarIntCoder
Iterable IterableCoder
KV KvCoder
List ListCoder
Map MapCoder
Long VarLongCoder
String StringUtf8Coder
TableRow TableRowJsonCoder
Void VoidCoder
byte[] ByteArrayCoder
TimestampedValue TimestampedValueCoder

You can use the method CoderRegistry.registerStandardCoders to set default mappings for any given CoderRegistry.

Looking Up a Default Coder

Java

You can use the method CoderRegistry.getDefaultCoder to determine the default Coder for a Java type. You can access the CoderRegistry for a given Pipeline by using the method Pipeline.getCoderRegistry. This allows you to determine (or set) the default Coder for a Java type on a per-pipeline basis: i.e. "for this pipeline, verify that Integer values are encoded using BigEndianIntegerCoder."

Setting the Default Coder for a Type

Java

To set the default Coder for a Java type for a particular pipeline, you obtain and modify the pipeline's CoderRegistry. You use the method Pipeline.getCoderRegistry to get the CoderRegistry object, and then use the method CoderRegistry.registerCoder to register a new Coder for the target Java type.

The following example code demonstrates how to set a default Coder, in this case BigEndianIntegerCoder, for Integer values for a pipeline.

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  CoderRegistry cr = p.getCoderRegistry();
  cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);

Annotating a Custom Data Type with a Default Coder

Java

If your pipeline program defines a custom data type, you can use the @DefaultCoder annotation to specify the coder to use with that type. For example, let's say you have a custom data type for which you want to use SerializableCoder. You can use the @DefaultCoder annotation as follows:

  @DefaultCoder(AvroCoder.class)
  public class MyCustomDataType {
    ...
  }

If you've created a custom coder to match your data type, and you want to use the @DefaultCoder annotation, your coder class must implement a static Coder.of(Class<T>) factory method.

  public class MyCustomCoder implements Coder {
    public static Coder<T> of(Class<T> clazz) {...}
    ...
  }

  @DefaultCoder(MyCustomCoder.class)
  public class MyCustomDataType {
    ...
  }

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...

Cloud Dataflow Documentation