Menulis dari Dataflow ke Apache Iceberg

Untuk menulis dari Dataflow ke Apache Iceberg, gunakan konektor I/O terkelola.

I/O Terkelola mendukung kemampuan berikut untuk Apache Iceberg:

Katalog
Kemampuan baca Pembacaan batch
Kemampuan tulis
  • Operasi tulis batch
  • Penulisan streaming
  • Tujuan dinamis
  • Pembuatan tabel dinamis

Untuk tabel BigQuery untuk Apache Iceberg, gunakan konektor BigQueryIO dengan BigQuery Storage API. Tabel harus sudah ada; pembuatan tabel dinamis tidak didukung.

Dependensi

Tambahkan dependensi berikut ke project Anda:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Konfigurasi

I/O Terkelola menggunakan parameter konfigurasi berikut untuk Apache Iceberg:

Konfigurasi baca dan tulis Jenis data Deskripsi
table string ID tabel Apache Iceberg. Contoh: "db.table1".
catalog_name string Nama katalog. Contoh: "local".
catalog_properties map Peta properti konfigurasi untuk katalog Apache Iceberg. Properti yang diperlukan bergantung pada katalog. Untuk informasi selengkapnya, lihat CatalogUtil dalam dokumentasi Apache Iceberg.
config_properties map Kumpulan properti konfigurasi Hadoop opsional. Untuk informasi selengkapnya, lihat CatalogUtil dalam dokumentasi Apache Iceberg.
Konfigurasi tulis Jenis data Deskripsi
triggering_frequency_seconds bilangan bulat Untuk pipeline operasi tulis streaming, frekuensi saat sink mencoba membuat snapshot, dalam detik.

Tujuan dinamis

I/O Terkelola untuk Apache Iceberg mendukung tujuan dinamis. Daripada menulis ke satu tabel tetap, konektor dapat memilih tabel tujuan secara dinamis berdasarkan nilai kolom dalam data yang masuk.

Untuk menggunakan tujuan dinamis, berikan template untuk parameter konfigurasi table. Untuk informasi selengkapnya, lihat Tujuan dinamis.

Contoh

Contoh berikut menulis data JSON dalam memori ke tabel Apache Iceberg.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}

Langkah berikutnya