The Dataflow SDKs provide an extensible API that you can use to create custom data sources and sinks. You'll need to create a custom data source or sink if you want your pipeline to read data from (or write data to) a data source or sink for which the Dataflow SDKs do not provide native support.
You create a custom source by extending the Dataflow SDK's abstract Source subclasses such as BoundedSource or UnboundedSource. You create a custom sink by extending the Dataflow SDK's abstract Sink base class. You can use the extensible API to create custom sources that read either bounded (batch) or unbounded (streaming) data, and to create sinks that write bounded data only.
Dataflow plans to add support for custom sinks that write unbounded data in a future release.
Basic Code Requirements for Custom Sources and Sinks
The Dataflow service uses the classes you
provide to read and/or write data using multiple worker instances in parallel. As such, the code
you provide for Source
and Sink
subclasses must meet some basic
requirements:
Serializability
Your Source
or Sink
subclass, whether bounded or unbounded, must be
Serializable
. The Dataflow service may create multiple instances of your
Source
or Sink
subclass to be sent to multiple remote workers to
facilitate reading or writing in parallel.
Immutability
Your Source
or Sink
subclass must be effectively immutable. All
private fields must be declared final
, and all private variables of collection type
must be effectively immutable. If your class has setter methods, those methods must return an
independent copy of the object with the relevant field modified.
You should only use mutable state in your Source
or Sink
subclass if
you are using lazy evaluation of expensive computations that you need to implement the source; in
that case, you must declare all mutable instance variables transient
.
Thread-Safety
If you build your custom source to work with Dataflow's Dynamic Work Rebalancing feature, it is critical that you make your code thread-safe. The Dataflow SDK for Java provides a helper class to make this easier. See Using Your BoundedSource with Dynamic Work Rebalancing below for more details.
Testability
It is critical to exhaustively unit-test all of your Source
and Sink
subclasses, especially if you build your classes to work with advanced features such as Dataflow's
Dynamic Work Rebalancing. A
minor implementation error can lead to data corruption or data loss (such as skipping or
duplicating records) that can be hard to detect.
To assist in testing sources, the Dataflow SDK provides the
SourceTestUtils
class. SourceTestUtils
contains utilities for automatically verifying some of the
properties of your BoundedSource
implementation.
You can use SourceTestUtils
to increase your implementation's test coverage using
a wide range of inputs with relatively few lines of code.
Creating a Custom Source
To create a custom data source for your pipeline, you'll need to provide the format-specific logic that tells the Dataflow service how to read data from your input source, and how to split your data source into multiple parts so that multiple worker instances can read your data in parallel. If you're creating a custom data source that reads unbounded data, you'll need to provide additional logic for managing your source's watermark and optional checkpointing.
You supply the logic for your custom source by creating the following classes:
- A subclass of BoundedSource if you want to read a finite (batch) data set, or a subclass of UnboundedSource if you want to read an infinite (streaming) data set. These subclasses describe the data you want to read, including the data's location and parameters (such as how much data to read).
A subclass of the Dataflow SDK class Source.Reader. Each
Source
must have an associatedReader
that captures all the state involved in reading from thatSource
. This can include things like file handles, RPC connections, and other parameters that depend on the specific requirements of the data format you want to read.The
Reader
class hierarchy mirrors theSource
hierarchy. If you're extendingBoundedSource
, you'll need to provide an associatedBoundedReader
; if you're extendingUnboundedSource
, you'll need to provide an associatedUnboundedReader
.
Implementing the Source Subclass
You'll need to create a subclass of either BoundedSource
or
UnboundedSource
, depending on whether your data is a finite batch or an infinite
stream. In either case, your Source
subclass must override the abstract methods in
the superclass. When using your custom data source, the Dataflow service uses these methods to
estimate the size of your data set and to split it up for parallel reading.
Your Source
subclass should also manage basic information about your data source,
such as the location. For example, the example Source
implementation in Dataflow's
DatastoreIO
class takes as arguments the host
, datasetID
, and query
used to obtain data from Datastore.
BoundedSource
BoundedSource
represents a finite data set from which the Dataflow service may read,
possibly in parallel. BoundedSource
contains a set of abstract methods that the
service uses to split the data set for reading by multiple remote workers.
To implement a BoundedSource
, your subclass must override the following abstract
methods:
splitIntoBundles
: The Dataflow service uses this method to split your finite data into bundles of a given size.getEstimatedSizeBytes
: The Dataflow service uses this method to estimate the total size of your data, in bytes.producesSortedKeys
: A method to tell the Dataflow service whether your source produces key/value pairs in sorted order. If your source doesn't produce key/value pairs, your implementation of this method must returnfalse
.createReader
: Creates the associatedBoundedReader
for thisBoundedSource
.
You can see a model of how to implement BoundedSource
and the required abstract
methods in the Dataflow SDK's example implementation of
DatastoreIO.
UnboundedSource
UnboundedSource
represents an infinite data stream from which the Dataflow service
may read, possibly in parallel. UnboundedSource
contains a set of abstract methods
that the service uses to support streaming reads in parallel; these include checkpointing
for failure recovery, record IDs to prevent data duplication, and watermarking for
estimating data completeness in downstream parts of your pipeline.
To implement an UnboundedSource
, your subclass must override the following abstract
methods:
generateInitialSplits
: The Dataflow service uses this method to generate a list ofUnboundedSource
objects which represent the number of sub-stream instances from which the service should read in parallel.getCheckpointMarkCoder
: The Dataflow service uses this method to obtain the Coder for the checkpoints for your source (if any).requiresDeduping
: The Dataflow service uses this method to determine whether the data requires explicit removal of duplicate records. If this method returnstrue
, the Dataflow service will automatically insert a step to remove duplicates from your source's output.createReader
: Creates the associatedUnboundedReader
for thisUnboundedSource
.
Implementing the Reader Subclass
You'll need to create a subclass of either BoundedReader
or
UnboundedReader
to be returned by your source subclass's createReader
method. The Dataflow service uses the methods in your Reader
(whether bounded or
unbounded) to do the actual reading of your dataset.
BoundedReader
and UnboundedReader
have similar basic interfaces,
which you'll need to define. In addition, there are some additional methods unique to
UnboundedReader
that you'll need to implement for working with unbounded data, and
an optional method you can implement if you want your BoundedReader
to take advantage
of Dataflow's Dynamic Work
Rebalancing feature. There are also minor differences in the semantics for the
start()
and advance()
methods when using
UnboundedReader
.
Reader Methods Common to Both BoundedReader and UnboundedReader
The Dataflow uses the following methods to read data using BoundedReader
or
UnboundedReader
:
start
: Initializes theReader
and advances to the first record to be read. This method is called exactly once when Dataflow begins reading your data, and is a good place to put expensive operations needed for initialization.advance
: Advances the reader to the next valid record. This method must returnfalse
if there is no more input available.BoundedReader
should stop reading onceadvance
returns false, butUnboundedReader
can returntrue
in future calls once more data is available from your stream.getCurrent
: Returns the data record at the current position, last read bystart
oradvance
.getCurrentTimestamp
: Returns the timestamp for the current data record. You only need to overridegetCurrentTimestamp
if your source reads data that has intrinsic timestamps. The Dataflow service uses this value to set the intrinsic timestamp for each element in the resulting outputPCollection
.
Reader Methods Unique to UnboundedReader
In addition to the basic Reader
interface, UnboundedReader
has some
additional methods for managing reads from an unbounded data source:
getCurrentRecordId
: Returns a unique identifier for the current record. The Dataflow service uses these record IDs to filter out duplicate records. If your data has logical IDs present in each record, you can have this method return them; otherwise, you can return a hash of the record contents, using at least a 128-bit hash. (It is not recommended to use Java'sObject.hashCode()
, as a 32-bit hash is generally insufficient for preventing collisions.)getWatermark
: Returns a watermark that yourReader
provides. The watermark is the approximate lower bound on timestamps of future elements to be read by yourReader
. The Dataflow service uses the watermark as an estimate of data completeness. Watermarks are used in Dataflow's Windowing and Trigger features.getCheckpointMark
: The Dataflow service uses this method to create a checkpoint in your data stream. The checkpoint represents theUnboundedReader
's progress, which can be used for failure recovery. Different data streams may use different checkpointing methods; some sources might require received records to be acknowledged, while others might use positional checkpointing. You'll need to tailor this method to the most appropriate checkpointing scheme. For example, you might have this method return the most recently acked record(s).
Note: Implementing getCurrentRecordId
is optional if your source
uses a checkpointing scheme that uniquely identifies each record. However, record IDs can still
be useful if upstream systems writing data to your source occasionally produce duplicate records
that your source might then read.
Note: getCheckpointMark
is optional; you don't need to implement
it if your data does not have meaningful checkpoints. However, if you choose not to implement
checkpointing in your source, you may encounter duplicate data or data loss in your pipeline,
depending on whether your data source tries to re-send records in case of errors.
Using your BoundedSource with Dynamic Work Rebalancing
If your source provides bounded data, you can have your BoundedReader
work with the
Dataflow service's Dynamic Work Rebalancing feature by implementing the method
splitAtFraction
. The Dataflow service may call splitAtFraction
concurrently with start
or advance
on a given reader so that the
remaining data in your Source
can be split and redistributed to other workers.
When you implement splitAtFraction
, your code must produce a
mutually-exclusive set of splits where the union of those splits matches the total data set.
Convenience Source and Reader Base Classes
The Dataflow SDK contains some convenient abstract base classes to help you create
Source
and Reader
classes that work with common data storage formats,
like files.
FileBasedSource
If your data source uses files, you can derive your Source
and Reader
classes from the FileBasedSource
and FileBasedReader
abstract base classes in the Dataflow SDK for Java. FileBasedSource
is a
bounded source subclass that implements code common to Dataflow sources that interact with
files, including:
- File pattern expansion
- Sequential record reading
- Split points
XmlSource
If your data source uses XML-formatted files, you can derive your Source
class from the XmlSource
abstract base class in the Dataflow SDK for Java. XmlSource
extends
FileBasedSource
and provides additional methods for parsing XML files, such as
setting the XML elements that designate the file root and the individual records in the file.
Reading from a Custom Source
To read data from a custom source in your pipeline, you apply the SDK generic Read
transform and pass your custom source as a parameter using the .from
operation:
Java
MySource source = new MySource(false, file.getPath(), 64, null); p.apply("ReadFileData", Read.from(source))
Creating a Custom Sink
To create a custom data sink for your pipeline, you'll need to provide the format-specific
logic that tells the Dataflow service how to
write bounded data from your pipeline's PCollection
s to an output sink, such as a
directory or file system, a database table, etc. The Dataflow service writes bundles of data in
parallel using multiple workers.
Note: Dataflow currently only supports writing bounded data to a custom output sink.
You supply the writing logic by creating the following classes:
- A subclass of the Dataflow SDK abstract base class Sink.
Sink
describes a location or resource that your pipeline can write to in parallel. YourSink
subclass might contain fields such as the resource or file location, database table name, etc. - A subclass of Sink.WriteOperation.
Sink.WriteOperation
represents the state of a single parallel write operation to the output location described in yourSink
. YourWriteOperation
subclass must define the initialization and finalization processes of the parallel write. - A subclass of Sink.Writer.
Sink.Writer
writes a bundle of elements from an inputPCollection
to your designated data sink.
Implementing the Sink Subclass
Your Sink
subclass describes the location or resource to which your pipeline writes
its output. This might include a file system location, the name of a database table or dataset,
etc. Your Sink
subclass must validate that the output location can be written
to, and create WriteOperation
s that define how to write data to that output
location.
To implement a Sink
, your subclass must override the following abstract methods:
validate
: This method ensures that the output location for the pipeline data is valid and can be written to.validate
should ensure the file can be opened, that the output directory exists, that the user has access permissions for the database table, etc. The Dataflow service callsvalidate
at pipeline creation time.createWriteOperation
: This method creates aSink.WriteOperation
object that defines how to write to the output location.
Implementing the WriteOperation Subclass
Your WriteOperation
subclass defines how to write a bundle of elements to the output
location defined in your Sink
. The WriteOperation
performs the necessary
initialization and finalization for a parallel write.
To implement a WriteOperation
, your subclass must override the following abstract
methods:
initialize
: This method performs any necessary initialization before writing to the output location. The Dataflow service calls this method before writing begins. You can useinitialize
to, for example, create a temporary output directory.finalize
: This method handles the results of a write performed by aWriter
class. Your implementation offinalize
should perform clean-up from any failed writes or writes that were successfully re-tried, and must be capable of locating any temporary or partial output written by failed writes.
Asfinalize
may be called multiple times in the case of failure or retry, a best practice is to make your implementation offinalize
atomic; if that's not possible, you must make your implementation offinalize
idempotent.createWriter
: This method creates aSink.Writer
object that writes a bundle of data to the output location defined in yourSink
.
Implementing the Writer Subclass
Your Writer
subclass implements the logic for writing a single bundle of records
to the output location defined in your Sink
. The Dataflow service may instantiate
multiple instances of your Writer
in different threads on the same worker, so access
to any static members or methods must be thread-safe.
To implement a Writer
, your subclass must override the following abstract
methods:
open
: This method performs any initialization for the bundle of records to be written, such as creating a temporary file for writing. The Dataflow service calls this method once at the start of the write, and passes it a unique bundle ID for the bundle of records to be written.write
: This method writes a single record to the output location. The Dataflow service callswrite
for each value in the bundle.close
: This method finishes writing and closes any resources used for writing the bundle.close
must return a writer result, which the enclosingWriteOperation
will use to identify successful writes. The Dataflow service calls this method once at the end of the write.
Handling Bundle IDs
When the service calls Writer.open
, it will pass a unique bundle ID for the records
to be written. Your Writer
must use this bundle ID to ensure that its output does not
interfere with that of other Writer
instances that might have been created in
parallel. This is particularly important as the Dataflow service may re-try writes multiple times
in case of failure.
For example, if your Sink
's output is file-based, your Writer
class
might use the bundle ID as a filename suffix to ensure that your Writer
writes its
records to a unique output file not used by other Writer
s. You can then have your
Writer
's close
method return that file location as part of the write
result.
You can see a model of how to implement Sink
, WriteOperation
, and
Writer
along with their required abstract methods in the Dataflow SDK's example
implementation of DatastoreIO.
Convenience Sink and Writer Base Classes
The Dataflow SDK contains some convenient abstract base classes to help you create
Source
and Reader
classes that work with common data storage formats,
like files.
FileBasedSink
If your data source uses files, you can derive your Sink
,
WriteOperation
, and Writer
classes from the
FileBasedSink,
FileBasedWriteOperation, and
and FileBasedWriter
abstract base classes in the Dataflow SDK for Java. These classes implement code common to
Dataflow sources that interact with files, including:
- Setting file headers and footers
- Sequential record writing
- Setting the output MIME type
FileBasedSink
and its subclasses support writing to both local files and to files
in Google Cloud Storage For more information, see the example
implementation for FileBasedSink
called XmlSink
in the Dataflow SDK for Java.
Writing to a Custom Sink
To write data to a custom sink in your pipeline, you apply the SDK generic Write
transform and pass your custom sink as a parameter using the .to
operation:
Java
p.apply("WriteResults", Write.to(new MySink()));