マネージド I/O コネクタは、ソースとシンクを作成するための共通の API を提供する Apache Beam 変換です。バックエンドでは、Dataflow はマネージド I/O コネクタをサービスとして扱います。これにより、Dataflow はコネクタのランタイム オペレーションを管理できます。これにより、これらの詳細を管理することではなく、パイプラインのビジネス ロジックに注力できます。
他の I/O コネクタと同様に、Apache Beam コードを使用してマネージド I/O コネクタを作成します。インスタンス化するソースまたはシンクを指定して、一連の構成パラメータを渡します。
マネージド I/O API の詳細については、Apache Beam Java SDK ドキュメントの Managed
をご覧ください。
Apache Iceberg でマネージド I/O を使用する
Apache Beam SDK for Java 2.56.0 以降、マネージド I/O コネクタは Apache Iceberg カタログの読み取りと書き込みをサポートしています。
次の依存関係を追加します。
Java
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-managed</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-iceberg</artifactId>
<version>2.56.0</version>
</dependency>
次の例は、Apache Iceberg から読み取り、レコードをテキスト ファイルに書き込むパイプラインを作成する方法を示しています。この例では、カタログに db.table1
というテーブルがあり、id
と name
という 2 つのフィールドがあることを前提としています。
Java
// Configure the Iceberg source I/O.
Map catalogConfig = ImmutableMap.<String, Object>builder()
.put("catalog_name", "local")
.put("warehouse_location", location)
.put("catalog_type", "hadoop")
.build();
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("table", "db.table1")
.put("catalog_config", catalogConfig)
.build();
// Build the pipeline.
PCollectionRowTuple.empty(pipeline).apply(
Managed.read(Managed.ICEBERG)
.withConfig(config))
.get("output")
// Format each record as a string with the format 'id:name'.
.apply(MapElements
.into(TypeDescriptors.strings())
.via((row -> {
return String.format("%d:%s",
row.getInt64("id"),
row.getString("name"));
})))
// Write to a text file.
.apply(
TextIO.write()
.to(outputPath)
.withNumShards(1)
.withSuffix(".txt"));
構成パラメータを YAML ファイルに配置し、ファイルの URL を指定することもできます。次の YAML は、前のコード例と同じ構成を指定しています。
catalog_config:
catalog_name: "local"
warehouse_location: "<location>"
catalog_type: "hadoop"
table: "db.table1"
次のように、URL を YAML ファイルに渡します。
Java
PCollectionRowTuple.empty(pipeline).apply(
Managed.read(Managed.ICEBERG)
.withConfigUrl("gs://path/to/config.yaml"))