The managed I/O connector is an Apache Beam transform that provides a common API for creating sources and sinks. On the backend, Dataflow treats the managed I/O connector as a service, which allows Dataflow to manage runtime operations for the connector. You can then focus on the business logic in your pipeline, rather than managing these details.
You create the managed I/O connector using Apache Beam code, just like any other I/O connector. You specify a source or sink to instantiate and pass in a set of configuration parameters.
For more information about the managed I/O API, see Managed
in the
Apache Beam Java SDK documentation.
Use managed I/O with Apache Iceberg
Starting with Apache Beam SDK for Java 2.56.0, the managed I/O connector supports reading and writing Apache Iceberg catalogs.
Add the following dependencies:
Java
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-managed</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-iceberg</artifactId>
<version>2.56.0</version>
</dependency>
The following example shows how to create a pipeline that reads from
Apache Iceberg and writes the records to a text file. This example assumes
the catalog has a table named db.table1
with two fields named id
and name
.
Java
// Configure the Iceberg source I/O.
Map catalogConfig = ImmutableMap.<String, Object>builder()
.put("catalog_name", "local")
.put("warehouse_location", location)
.put("catalog_type", "hadoop")
.build();
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("table", "db.table1")
.put("catalog_config", catalogConfig)
.build();
// Build the pipeline.
PCollectionRowTuple.empty(pipeline).apply(
Managed.read(Managed.ICEBERG)
.withConfig(config))
.get("output")
// Format each record as a string with the format 'id:name'.
.apply(MapElements
.into(TypeDescriptors.strings())
.via((row -> {
return String.format("%d:%s",
row.getInt64("id"),
row.getString("name"));
})))
// Write to a text file.
.apply(
TextIO.write()
.to(outputPath)
.withNumShards(1)
.withSuffix(".txt"));
You can also put the configuration parameters into a YAML file and provide a URL to the file. The following YAML specifies the same configuration as the previous code example:
catalog_config:
catalog_name: "local"
warehouse_location: "<location>"
catalog_type: "hadoop"
table: "db.table1"
Pass the URL to the YAML file as follows:
Java
PCollectionRowTuple.empty(pipeline).apply(
Managed.read(Managed.ICEBERG)
.withConfigUrl("gs://path/to/config.yaml"))