Custom Sources and Sinks

The Dataflow SDKs provide an extensible API that you can use to create custom data sources and sinks. You'll need to create a custom data source or sink if you want your pipeline to read data from (or write data to) a data source or sink for which the Dataflow SDKs do not provide native support.

You create a custom source by extending the Dataflow SDK's abstract Source subclasses such as BoundedSource or UnboundedSource. You create a custom sink by extending the Dataflow SDK's abstract Sink base class. You can use the extensible API to create custom sources that read either bounded (batch) or unbounded (streaming) data, and to create sinks that write bounded data only.

Dataflow plans to add support for custom sinks that write unbounded data in a future release.

Basic Code Requirements for Custom Sources and Sinks

The Dataflow service uses the classes you provide to read and/or write data using multiple worker instances in parallel. As such, the code you provide for Source and Sink subclasses must meet some basic requirements:

Serializability

Your Source or Sink subclass, whether bounded or unbounded, must be Serializable. The Dataflow service may create multiple instances of your Source or Sink subclass to be sent to multiple remote workers to facilitate reading or writing in parallel.

Immutability

Your Source or Sink subclass must be effectively immutable. All private fields must be declared final, and all private variables of collection type must be effectively immutable. If your class has setter methods, those methods must return an independent copy of the object with the relevant field modified.

You should only use mutable state in your Source or Sink subclass if you are using lazy evaluation of expensive computations that you need to implement the source; in that case, you must declare all mutable instance variables transient.

Thread-Safety

If you build your custom source to work with Dataflow's Dynamic Work Rebalancing feature, it is critical that you make your code thread-safe. The Dataflow SDK for Java provides a helper class to make this easier. See Using Your BoundedSource with Dynamic Work Rebalancing below for more details.

Testability

It is critical to exhaustively unit-test all of your Source and Sink subclasses, especially if you build your classes to work with advanced features such as Dataflow's Dynamic Work Rebalancing. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard to detect.

To assist in testing sources, the Dataflow SDK provides the SourceTestUtils class. SourceTestUtils contains utilities for automatically verifying some of the properties of your BoundedSource implementation. You can use SourceTestUtils to increase your implementation's test coverage using a wide range of inputs with relatively few lines of code.

Creating a Custom Source

To create a custom data source for your pipeline, you'll need to provide the format-specific logic that tells the Dataflow service how to read data from your input source, and how to split your data source into multiple parts so that multiple worker instances can read your data in parallel. If you're creating a custom data source that reads unbounded data, you'll need to provide additional logic for managing your source's watermark and optional checkpointing.

You supply the logic for your custom source by creating the following classes:

  • A subclass of BoundedSource if you want to read a finite (batch) data set, or a subclass of UnboundedSource if you want to read an infinite (streaming) data set. These subclasses describe the data you want to read, including the data's location and parameters (such as how much data to read).
  • A subclass of the Dataflow SDK class Source.Reader. Each Source must have an associated Reader that captures all the state involved in reading from that Source. This can include things like file handles, RPC connections, and other parameters that depend on the specific requirements of the data format you want to read.

    The Reader class hierarchy mirrors the Source hierarchy. If you're extending BoundedSource, you'll need to provide an associated BoundedReader; if you're extending UnboundedSource, you'll need to provide an associated UnboundedReader.

Implementing the Source Subclass

You'll need to create a subclass of either BoundedSource or UnboundedSource, depending on whether your data is a finite batch or an infinite stream. In either case, your Source subclass must override the abstract methods in the superclass. When using your custom data source, the Dataflow service uses these methods to estimate the size of your data set and to split it up for parallel reading.

Your Source subclass should also manage basic information about your data source, such as the location. For example, the example Source implementation in Dataflow's DatastoreIO class takes as arguments the host, datasetID, and query used to obtain data from Datastore.

BoundedSource

BoundedSource represents a finite data set from which the Dataflow service may read, possibly in parallel. BoundedSource contains a set of abstract methods that the service uses to split the data set for reading by multiple remote workers.

To implement a BoundedSource, your subclass must override the following abstract methods:

  • splitIntoBundles: The Dataflow service uses this method to split your finite data into bundles of a given size.
  • getEstimatedSizeBytes: The Dataflow service uses this method to estimate the total size of your data, in bytes.
  • producesSortedKeys: A method to tell the Dataflow service whether your source produces key/value pairs in sorted order. If your source doesn't produce key/value pairs, your implementation of this method must return false.
  • createReader: Creates the associated BoundedReader for this BoundedSource.

You can see a model of how to implement BoundedSource and the required abstract methods in the Dataflow SDK's example implementation of DatastoreIO.

UnboundedSource

UnboundedSource represents an infinite data stream from which the Dataflow service may read, possibly in parallel. UnboundedSource contains a set of abstract methods that the service uses to support streaming reads in parallel; these include checkpointing for failure recovery, record IDs to prevent data duplication, and watermarking for estimating data completeness in downstream parts of your pipeline.

To implement an UnboundedSource, your subclass must override the following abstract methods:

  • generateInitialSplits: The Dataflow service uses this method to generate a list of UnboundedSource objects which represent the number of sub-stream instances from which the service should read in parallel.
  • getCheckpointMarkCoder: The Dataflow service uses this method to obtain the Coder for the checkpoints for your source (if any).
  • requiresDeduping: The Dataflow service uses this method to determine whether the data requires explicit removal of duplicate records. If this method returns true, the Dataflow service will automatically insert a step to remove duplicates from your source's output.
  • createReader: Creates the associated UnboundedReader for this UnboundedSource.

Implementing the Reader Subclass

You'll need to create a subclass of either BoundedReader or UnboundedReader to be returned by your source subclass's createReader method. The Dataflow service uses the methods in your Reader (whether bounded or unbounded) to do the actual reading of your dataset.

BoundedReader and UnboundedReader have similar basic interfaces, which you'll need to define. In addition, there are some additional methods unique to UnboundedReader that you'll need to implement for working with unbounded data, and an optional method you can implement if you want your BoundedReader to take advantage of Dataflow's Dynamic Work Rebalancing feature. There are also minor differences in the semantics for the start() and advance() methods when using UnboundedReader.

Reader Methods Common to Both BoundedReader and UnboundedReader

The Dataflow uses the following methods to read data using BoundedReader or UnboundedReader:

  • start: Initializes the Reader and advances to the first record to be read. This method is called exactly once when Dataflow begins reading your data, and is a good place to put expensive operations needed for initialization.
  • advance: Advances the reader to the next valid record. This method must return false if there is no more input available. BoundedReader should stop reading once advance returns false, but UnboundedReader can return true in future calls once more data is available from your stream.
  • getCurrent: Returns the data record at the current position, last read by start or advance.
  • getCurrentTimestamp: Returns the timestamp for the current data record. You only need to override getCurrentTimestamp if your source reads data that has intrinsic timestamps. The Dataflow service uses this value to set the intrinsic timestamp for each element in the resulting output PCollection.

Reader Methods Unique to UnboundedReader

In addition to the basic Reader interface, UnboundedReader has some additional methods for managing reads from an unbounded data source:

  • getCurrentRecordId: Returns a unique identifier for the current record. The Dataflow service uses these record IDs to filter out duplicate records. If your data has logical IDs present in each record, you can have this method return them; otherwise, you can return a hash of the record contents, using at least a 128-bit hash. (It is not recommended to use Java's Object.hashCode(), as a 32-bit hash is generally insufficient for preventing collisions.)
  • Note: Implementing getCurrentRecordId is optional if your source uses a checkpointing scheme that uniquely identifies each record. However, record IDs can still be useful if upstream systems writing data to your source occasionally produce duplicate records that your source might then read.

  • getWatermark: Returns a watermark that your Reader provides. The watermark is the approximate lower bound on timestamps of future elements to be read by your Reader. The Dataflow service uses the watermark as an estimate of data completeness. Watermarks are used in Dataflow's Windowing and Trigger features.
  • getCheckpointMark: The Dataflow service uses this method to create a checkpoint in your data stream. The checkpoint represents the UnboundedReader's progress, which can be used for failure recovery. Different data streams may use different checkpointing methods; some sources might require received records to be acknowledged, while others might use positional checkpointing. You'll need to tailor this method to the most appropriate checkpointing scheme. For example, you might have this method return the most recently acked record(s).
  • Note: getCheckpointMark is optional; you don't need to implement it if your data does not have meaningful checkpoints. However, if you choose not to implement checkpointing in your source, you may encounter duplicate data or data loss in your pipeline, depending on whether your data source tries to re-send records in case of errors.

Using your BoundedSource with Dynamic Work Rebalancing

If your source provides bounded data, you can have your BoundedReader work with the Dataflow service's Dynamic Work Rebalancing feature by implementing the method splitAtFraction. The Dataflow service may call splitAtFraction concurrently with start or advance on a given reader so that the remaining data in your Source can be split and redistributed to other workers.

When you implement splitAtFraction, your code must produce a mutually-exclusive set of splits where the union of those splits matches the total data set.

Convenience Source and Reader Base Classes

The Dataflow SDK contains some convenient abstract base classes to help you create Source and Reader classes that work with common data storage formats, like files.

FileBasedSource

If your data source uses files, you can derive your Source and Reader classes from the FileBasedSource and FileBasedReader abstract base classes in the Dataflow SDK for Java. FileBasedSource is a bounded source subclass that implements code common to Dataflow sources that interact with files, including:

  • File pattern expansion
  • Sequential record reading
  • Split points

XmlSource

If your data source uses XML-formatted files, you can derive your Source class from the XmlSource abstract base class in the Dataflow SDK for Java. XmlSource extends FileBasedSource and provides additional methods for parsing XML files, such as setting the XML elements that designate the file root and the individual records in the file.

Reading from a Custom Source

To read data from a custom source in your pipeline, you apply the SDK generic Read transform and pass your custom source as a parameter using the .from operation:

Java

  MySource source = new MySource(false, file.getPath(), 64, null);
  p.apply("ReadFileData", Read.from(source))

Creating a Custom Sink

To create a custom data sink for your pipeline, you'll need to provide the format-specific logic that tells the Dataflow service how to write bounded data from your pipeline's PCollections to an output sink, such as a directory or file system, a database table, etc. The Dataflow service writes bundles of data in parallel using multiple workers.

Note: Dataflow currently only supports writing bounded data to a custom output sink.

You supply the writing logic by creating the following classes:

  • A subclass of the Dataflow SDK abstract base class Sink. Sink describes a location or resource that your pipeline can write to in parallel. Your Sink subclass might contain fields such as the resource or file location, database table name, etc.
  • A subclass of Sink.WriteOperation. Sink.WriteOperation represents the state of a single parallel write operation to the output location described in your Sink. Your WriteOperation subclass must define the initialization and finalization processes of the parallel write.
  • A subclass of Sink.Writer. Sink.Writer writes a bundle of elements from an input PCollection to your designated data sink.

Implementing the Sink Subclass

Your Sink subclass describes the location or resource to which your pipeline writes its output. This might include a file system location, the name of a database table or dataset, etc. Your Sink subclass must validate that the output location can be written to, and create WriteOperations that define how to write data to that output location.

To implement a Sink, your subclass must override the following abstract methods:

  • validate: This method ensures that the output location for the pipeline data is valid and can be written to. validate should ensure the file can be opened, that the output directory exists, that the user has access permissions for the database table, etc. The Dataflow service calls validate at pipeline creation time.
  • createWriteOperation: This method creates a Sink.WriteOperation object that defines how to write to the output location.

Implementing the WriteOperation Subclass

Your WriteOperation subclass defines how to write a bundle of elements to the output location defined in your Sink. The WriteOperation performs the necessary initialization and finalization for a parallel write.

To implement a WriteOperation, your subclass must override the following abstract methods:

  • initialize: This method performs any necessary initialization before writing to the output location. The Dataflow service calls this method before writing begins. You can use initialize to, for example, create a temporary output directory.
  • finalize: This method handles the results of a write performed by a Writer class. Your implementation of finalize should perform clean-up from any failed writes or writes that were successfully re-tried, and must be capable of locating any temporary or partial output written by failed writes.

    As finalize may be called multiple times in the case of failure or retry, a best practice is to make your implementation of finalize atomic; if that's not possible, you must make your implementation of finalize idempotent.
  • createWriter: This method creates a Sink.Writer object that writes a bundle of data to the output location defined in your Sink.

Implementing the Writer Subclass

Your Writer subclass implements the logic for writing a single bundle of records to the output location defined in your Sink. The Dataflow service may instantiate multiple instances of your Writer in different threads on the same worker, so access to any static members or methods must be thread-safe.

To implement a Writer, your subclass must override the following abstract methods:

  • open: This method performs any initialization for the bundle of records to be written, such as creating a temporary file for writing. The Dataflow service calls this method once at the start of the write, and passes it a unique bundle ID for the bundle of records to be written.
  • write: This method writes a single record to the output location. The Dataflow service calls write for each value in the bundle.
  • close: This method finishes writing and closes any resources used for writing the bundle. close must return a writer result, which the enclosing WriteOperation will use to identify successful writes. The Dataflow service calls this method once at the end of the write.

Handling Bundle IDs

When the service calls Writer.open, it will pass a unique bundle ID for the records to be written. Your Writer must use this bundle ID to ensure that its output does not interfere with that of other Writer instances that might have been created in parallel. This is particularly important as the Dataflow service may re-try writes multiple times in case of failure.

For example, if your Sink's output is file-based, your Writer class might use the bundle ID as a filename suffix to ensure that your Writer writes its records to a unique output file not used by other Writers. You can then have your Writer's close method return that file location as part of the write result.

You can see a model of how to implement Sink, WriteOperation, and Writer along with their required abstract methods in the Dataflow SDK's example implementation of DatastoreIO.

Convenience Sink and Writer Base Classes

The Dataflow SDK contains some convenient abstract base classes to help you create Source and Reader classes that work with common data storage formats, like files.

FileBasedSink

If your data source uses files, you can derive your Sink, WriteOperation, and Writer classes from the FileBasedSink, FileBasedWriteOperation, and and FileBasedWriter abstract base classes in the Dataflow SDK for Java. These classes implement code common to Dataflow sources that interact with files, including:

  • Setting file headers and footers
  • Sequential record writing
  • Setting the output MIME type

FileBasedSink and its subclasses support writing to both local files and to files in Google Cloud Storage For more information, see the example implementation for FileBasedSink called XmlSink in the Dataflow SDK for Java.

Writing to a Custom Sink

To write data to a custom sink in your pipeline, you apply the SDK generic Write transform and pass your custom sink as a parameter using the .to operation:

Java

  p.apply("WriteResults", Write.to(new MySink()));

Send feedback about...

Cloud Dataflow Documentation