Dataflow managed I/O

The managed I/O connector is an Apache Beam transform that provides a common API for creating sources and sinks. On the backend, Dataflow treats the managed I/O connector as a service, which allows Dataflow to manage runtime operations for the connector. You can then focus on the business logic in your pipeline, rather than managing these details.

You create the managed I/O connector using Apache Beam code, just like any other I/O connector. You specify a source or sink to instantiate and pass in a set of configuration parameters.

For more information about the managed I/O API, see Managed in the Apache Beam Java SDK documentation.

Use managed I/O with Apache Iceberg

Starting with Apache Beam SDK for Java 2.56.0, the managed I/O connector supports reading and writing Apache Iceberg catalogs.


Add the following dependencies to your project:





The Apache Iceberg source and sink use the following configuration parameters:

  • table (string). The name of the Apache Iceberg. Example: "db.table1".
  • catalog_config (map). The catalog configuration. Contains the following fields:
    • catalog_name (string). The name of the catalog. Example: "local".
    • catalog_type (string). The type of catalog. Supported values: "hadoop", "hive", "rest".
    • warehouse_location (string). The warehouse location. Example: file://path/to/warehouse.


The following example shows how to create a pipeline that reads from Apache Iceberg and writes the records to a text file. This example assumes the catalog has a table named db.table1 with two fields named id and name.


// Configure the Iceberg source I/O.
Map catalogConfig = ImmutableMap.<String, Object>builder()
    .put("catalog_name", "local")
    .put("warehouse_location", location)
    .put("catalog_type", "hadoop")

ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
  .put("table", "db.table1")
  .put("catalog_config", catalogConfig)

// Build the pipeline.
    // Format each record as a string with the format 'id:name'.
        .via((row -> {
          return String.format("%d:%s",
    // Write to a text file.

You can also put the configuration parameters into a YAML file and provide a URL to the file. The following YAML specifies the same configuration as the previous code example:

  catalog_name: "local"
  warehouse_location: "<location>"
  catalog_type: "hadoop"
table: "db.table1"

Pass the URL to the YAML file as follows:

