托管式 I/O 连接器是一种 Apache Beam 转换,可提供用于创建来源和接收器的通用 API。在后端,Dataflow 将托管式 I/O 连接器视为服务,让 Dataflow 能够管理连接器的运行时操作。然后,您可以专注于流水线中的业务逻辑,而无需管理这些细节。
您可以使用 Apache Beam 代码创建托管式 I/O 连接器,就像创建任何其他 I/O 连接器一样。您可以指定要实例化的来源或接收器并传入一组配置参数。
如需详细了解托管式 I/O API,请参阅 Apache Beam Java SDK 文档中的 Managed
。
将托管式 I/O 与 Apache Iceberg 搭配使用
从 Java 版 Apache Beam SDK 2.56.0 开始,托管式 I/O 连接器支持读取和写入 Apache Iceberg 目录。
添加以下依赖项:
Java
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-managed</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-iceberg</artifactId>
<version>2.56.0</version>
</dependency>
以下示例展示了如何创建从 Apache Iceberg 读取数据并将记录写入文本文件的流水线。此示例假定目录有一个名为 db.table1
的表,其中包含两个分别名为 id
和 name
的字段。
Java
// Configure the Iceberg source I/O.
Map catalogConfig = ImmutableMap.<String, Object>builder()
.put("catalog_name", "local")
.put("warehouse_location", location)
.put("catalog_type", "hadoop")
.build();
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("table", "db.table1")
.put("catalog_config", catalogConfig)
.build();
// Build the pipeline.
PCollectionRowTuple.empty(pipeline).apply(
Managed.read(Managed.ICEBERG)
.withConfig(config))
.get("output")
// Format each record as a string with the format 'id:name'.
.apply(MapElements
.into(TypeDescriptors.strings())
.via((row -> {
return String.format("%d:%s",
row.getInt64("id"),
row.getString("name"));
})))
// Write to a text file.
.apply(
TextIO.write()
.to(outputPath)
.withNumShards(1)
.withSuffix(".txt"));
您还可以将配置参数放入 YAML 文件中,并提供该文件的网址。以下 YAML 指定与上一代码示例相同的配置:
catalog_config:
catalog_name: "local"
warehouse_location: "<location>"
catalog_type: "hadoop"
table: "db.table1"
传递 YAML 文件的网址,如下所示:
Java
PCollectionRowTuple.empty(pipeline).apply(
Managed.read(Managed.ICEBERG)
.withConfigUrl("gs://path/to/config.yaml"))