Dataflow マネージド I/O

マネージド I/O コネクタは、ソースとシンクを作成するための共通の API を提供する Apache Beam 変換です。バックエンドでは、Dataflow はマネージド I/O コネクタをサービスとして扱います。これにより、Dataflow はコネクタのランタイム オペレーションを管理できます。これにより、これらの詳細を管理することではなく、パイプラインのビジネス ロジックに注力できます。

他の I/O コネクタと同様に、Apache Beam コードを使用してマネージド I/O コネクタを作成します。インスタンス化するソースまたはシンクを指定して、一連の構成パラメータを渡します。

マネージド I/O API の詳細については、Apache Beam Java SDK ドキュメントの Managed をご覧ください。

Apache Iceberg でマネージド I/O を使用する

Apache Beam SDK for Java 2.56.0 以降、マネージド I/O コネクタは Apache Iceberg カタログの読み取りと書き込みをサポートしています。

次の依存関係を追加します。

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>2.56.0</version>
</dependency>

次の例は、Apache Iceberg から読み取り、レコードをテキスト ファイルに書き込むパイプラインを作成する方法を示しています。この例では、カタログに db.table1 というテーブルがあり、idname という 2 つのフィールドがあることを前提としています。

Java

// Configure the Iceberg source I/O.
Map catalogConfig = ImmutableMap.<String, Object>builder()
    .put("catalog_name", "local")
    .put("warehouse_location", location)
    .put("catalog_type", "hadoop")
    .build();

ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
  .put("table", "db.table1")
  .put("catalog_config", catalogConfig)
  .build();

// Build the pipeline.
PCollectionRowTuple.empty(pipeline).apply(
        Managed.read(Managed.ICEBERG)
              .withConfig(config))
    .get("output")
    // Format each record as a string with the format 'id:name'.
    .apply(MapElements
        .into(TypeDescriptors.strings())
        .via((row -> {
          return String.format("%d:%s",
              row.getInt64("id"),
              row.getString("name"));
        })))
    // Write to a text file.
    .apply(
        TextIO.write()
            .to(outputPath)
            .withNumShards(1)
            .withSuffix(".txt"));

構成パラメータを YAML ファイルに配置し、ファイルの URL を指定することもできます。次の YAML は、前のコード例と同じ構成を指定しています。

catalog_config:
  catalog_name: "local"
  warehouse_location: "<location>"
  catalog_type: "hadoop"
table: "db.table1"

次のように、URL を YAML ファイルに渡します。

Java

PCollectionRowTuple.empty(pipeline).apply(
        Managed.read(Managed.ICEBERG)
              .withConfigUrl("gs://path/to/config.yaml"))