Dataflow 托管式 I/O

托管式 I/O 连接器是一种 Apache Beam 转换,可提供用于创建来源和接收器的通用 API。在后端,Dataflow 将托管式 I/O 连接器视为服务,让 Dataflow 能够管理连接器的运行时操作。然后,您可以专注于流水线中的业务逻辑,而无需管理这些细节。

您可以使用 Apache Beam 代码创建托管式 I/O 连接器,就像创建任何其他 I/O 连接器一样。您可以指定要实例化的来源或接收器并传入一组配置参数。

如需详细了解托管式 I/O API,请参阅 Apache Beam Java SDK 文档中的 Managed

将托管式 I/O 与 Apache Iceberg 搭配使用

从 Java 版 Apache Beam SDK 2.56.0 开始,托管式 I/O 连接器支持读取和写入 Apache Iceberg 目录。

添加以下依赖项:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>2.56.0</version>
</dependency>

以下示例展示了如何创建从 Apache Iceberg 读取数据并将记录写入文本文件的流水线。此示例假定目录有一个名为 db.table1 的表,其中包含两个分别名为 idname 的字段。

Java

// Configure the Iceberg source I/O.
Map catalogConfig = ImmutableMap.<String, Object>builder()
    .put("catalog_name", "local")
    .put("warehouse_location", location)
    .put("catalog_type", "hadoop")
    .build();

ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
  .put("table", "db.table1")
  .put("catalog_config", catalogConfig)
  .build();

// Build the pipeline.
PCollectionRowTuple.empty(pipeline).apply(
        Managed.read(Managed.ICEBERG)
              .withConfig(config))
    .get("output")
    // Format each record as a string with the format 'id:name'.
    .apply(MapElements
        .into(TypeDescriptors.strings())
        .via((row -> {
          return String.format("%d:%s",
              row.getInt64("id"),
              row.getString("name"));
        })))
    // Write to a text file.
    .apply(
        TextIO.write()
            .to(outputPath)
            .withNumShards(1)
            .withSuffix(".txt"));

您还可以将配置参数放入 YAML 文件中,并提供该文件的网址。以下 YAML 指定与上一代码示例相同的配置:

catalog_config:
  catalog_name: "local"
  warehouse_location: "<location>"
  catalog_type: "hadoop"
table: "db.table1"

传递 YAML 文件的网址,如下所示:

Java

PCollectionRowTuple.empty(pipeline).apply(
        Managed.read(Managed.ICEBERG)
              .withConfigUrl("gs://path/to/config.yaml"))