Dataflow managed I/O

The managed I/O connector is an Apache Beam transform that provides a common API for creating sources and sinks. On the backend, Dataflow treats the managed I/O connector as a service, which allows Dataflow to manage runtime operations for the connector. You can then focus on the business logic in your pipeline, rather than managing these details.

You create the managed I/O connector using Apache Beam code, just like any other I/O connector. You specify a source or sink to instantiate and pass in a set of configuration parameters.

For more information about the managed I/O API, see Managed in the Apache Beam Java SDK documentation.

Use managed I/O with Apache Iceberg

Starting with Apache Beam SDK for Java 2.56.0, the managed I/O connector supports reading and writing Apache Iceberg catalogs.

Add the following dependencies:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>2.56.0</version>
</dependency>

The following example shows how to create a pipeline that reads from Apache Iceberg and writes the records to a text file. This example assumes the catalog has a table named db.table1 with two fields named id and name.

Java

// Configure the Iceberg source I/O.
Map catalogConfig = ImmutableMap.<String, Object>builder()
    .put("catalog_name", "local")
    .put("warehouse_location", location)
    .put("catalog_type", "hadoop")
    .build();

ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
  .put("table", "db.table1")
  .put("catalog_config", catalogConfig)
  .build();

// Build the pipeline.
PCollectionRowTuple.empty(pipeline).apply(
        Managed.read(Managed.ICEBERG)
              .withConfig(config))
    .get("output")
    // Format each record as a string with the format 'id:name'.
    .apply(MapElements
        .into(TypeDescriptors.strings())
        .via((row -> {
          return String.format("%d:%s",
              row.getInt64("id"),
              row.getString("name"));
        })))
    // Write to a text file.
    .apply(
        TextIO.write()
            .to(outputPath)
            .withNumShards(1)
            .withSuffix(".txt"));

You can also put the configuration parameters into a YAML file and provide a URL to the file. The following YAML specifies the same configuration as the previous code example:

catalog_config:
  catalog_name: "local"
  warehouse_location: "<location>"
  catalog_type: "hadoop"
table: "db.table1"

Pass the URL to the YAML file as follows:

Java

PCollectionRowTuple.empty(pipeline).apply(
        Managed.read(Managed.ICEBERG)
              .withConfigUrl("gs://path/to/config.yaml"))