When you create or output pipeline data, you'll need to specify how the elements in
your PCollection
s are encoded and decoded to and from byte
strings. Byte strings are used for intermediate storage as well reading from
sources and writing to sinks.
The Dataflow SDKs use objects called
coders to describe how the elements of a given PCollection
should be encoded
and decoded.
Using Coders
You typically need to specify a coder when reading data into your pipeline from an external source (or creating pipeline data from local data), and also when you output pipeline data to an external sink. See Pipeline I/O for more information.
Java
In the Dataflow SDK for Java, the type Coder
provides the methods required
for encoding and decoding data. The
Dataflow SDK for Java provides a number of Coder
subclasses that work with a
variety of standard Java types, such as Integer
, Long
,
Double
, StringUtf8
, BigQuery TableRow
, and more. You can
find all of the available Coder
subclasses in the package
com.google.cloud.dataflow.sdk.coders.
When you read data into a pipeline, the coder indicates how to interpret the input data into a
language-specific type, such as Integer
or String
. Likewise, the coder
indicates how the language-specific types in your pipeline should be written into byte strings for
an output data sink, or to materialize intermediate data in your pipeline.
The Dataflow SDKs set a coder for every PCollection
in a pipeline,
including those generated as output from a transform. Most of the time, the Dataflow SDKs can
automatically infer the correct coder for an output PCollection
. See
Inferring a Coder for more information.
Note that coders do not necessarily have a 1:1 relationship with types. For example,
the Integer
type can have multiple valid coders, and input and output data can use
different Integer
coders. A transform might have Integer
-typed input
data that uses BigEndianIntegerCoder
, and Integer
-typed output data that
uses VarIntCoder
.
Java
You can explicitly set a Coder
when inputting or outputting a
PCollection
. You set the Coder
by calling the method
.withCoder
when you apply your pipeline's Read
or Write
transform.
Typically, you set the Coder
when the coder for a
PCollection
cannot be automatically inferred, or when you want to use a different
coder than your pipeline's default. The following example code reads a set of numbers
from a text file, and sets a Coder
of type TextualIntegerCoder
for the
resulting PCollection
:
PCollection<Integer> numbers = p.begin() .apply(TextIO.Read.named("ReadNumbers") .from("gs://my_bucket/path/to/numbers-*.txt") .withCoder(TextualIntegerCoder.of()));
You can set the coder for an existing PCollection
by using the method
PCollection.setCoder
. Note that you cannot call setCoder
on a
PCollection
that has been finalized (e.g. by calling .apply
on it).
You can get the coder for an existing PCollection
by using the method
getCoder
. This method will fail with an
IllegalStateException
if a coder has not been set and cannot be inferred for the
given PCollection
.
Coder Inference and Default Coders
The Dataflow SDKs require a coder for every PCollection
in your pipeline. Most of
the time, however, you do not need to explicitly specify a coder, such as for an intermediate
PCollection
produced by a transform in the middle of your pipeline. In such cases,
the Dataflow SDKs can infer an appropriate coder from the inputs and outputs of the transform
used to produce the PCollection
.
Java
Each Pipeline
object has a CoderRegistry
. The
CoderRegistry
represents a mapping of Java types to the default coders that the
pipeline should use for PCollection
s of each type.
By default, the Dataflow SDK for Java automatically infers the Coder
for the
elements of an output PCollection
using the type parameter from the
transform's function object, such as DoFn
. In the case of
ParDo
, for example, a DoFn<Integer,
String>
function object accepts an input element of type Integer
and
produces an output element of type String
. In such a case, the Dataflow SDK for Java
will automatically infer the default Coder
for the output
PCollection<String>
(in the default pipeline CoderRegistry
,
this is StringUtf8Coder
).
Default Coders and the CoderRegistry
Each Pipeline
object has a CoderRegistry
object, which maps language
types to the default coder the pipeline should use for those types. You can use the
CoderRegistry
yourself to look up the default coder for a given type, or to
register a new default coder for a given type.
Java
CoderRegistry
contains a default mapping of Coder
s to standard Java
types for any Pipeline
you create using the Dataflow SDK for Java. The
following table shows the standard mapping:
Java Type | Default Coder |
---|---|
Double |
DoubleCoder |
Instant |
InstantCoder |
Integer |
VarIntCoder |
Iterable |
IterableCoder |
KV |
KvCoder |
List |
ListCoder |
Map |
MapCoder |
Long |
VarLongCoder |
String |
StringUtf8Coder |
TableRow |
TableRowJsonCoder |
Void |
VoidCoder |
byte[] |
ByteArrayCoder |
TimestampedValue |
TimestampedValueCoder |
You can use the method CoderRegistry.registerStandardCoders
to set default
mappings for any given CoderRegistry
.
Looking Up a Default Coder
Java
You can use the method CoderRegistry.getDefaultCoder
to determine the default
Coder
for a Java type. You can access the CoderRegistry
for a given Pipeline
by using the method
Pipeline.getCoderRegistry
.
This allows you to determine (or set)
the default Coder
for a Java type on a per-pipeline basis: i.e. "for this pipeline,
verify that Integer
values are encoded using BigEndianIntegerCoder
."
Setting the Default Coder for a Type
Java
To set the default Coder
for a Java type for a particular pipeline, you
obtain and modify the pipeline's CoderRegistry
. You use the method
Pipeline.getCoderRegistry
to get the CoderRegistry
object, and then use
the method CoderRegistry.registerCoder
to register a new Coder
for the target Java type.
The following example code demonstrates how to set a default Coder
, in this case
BigEndianIntegerCoder
, for Integer
values for a pipeline.
PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); CoderRegistry cr = p.getCoderRegistry(); cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
Annotating a Custom Data Type with a Default Coder
Java
If your pipeline program defines a custom data type, you can use the @DefaultCoder
annotation to specify the coder to use with that type. For example, let's say you have a custom
data type for which you want to use SerializableCoder
. You can use the
@DefaultCoder
annotation as follows:
@DefaultCoder(AvroCoder.class) public class MyCustomDataType { ... }
If you've created a custom coder to match your data type, and you want to use the
@DefaultCoder
annotation, your coder class must implement a
static Coder.of(Class<T>)
factory method.
public class MyCustomCoder implements Coder { public static Coder<T> of(Class<T> clazz) {...} ... } @DefaultCoder(MyCustomCoder.class) public class MyCustomDataType { ... }