Write from Dataflow to Apache Iceberg

To write from Dataflow to Apache Iceberg, use the managed I/O connector.

Dependencies

Add the following dependencies to your project:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Configuration

Managed I/O uses the following configuration parameters for Apache Iceberg:

Name Data type Description
table string The identifier of the Apache Iceberg table. Example: "db.table1".
catalog_name string The name of the catalog. Example: "local".
catalog_properties map A map of configuration properties for the Apache Iceberg catalog. The required properties depend on the catalog. For more information, see CatalogUtil in the Apache Iceberg documentation.
config_properties map An optional set of Hadoop configuration properties. For more information, see CatalogUtil in the Apache Iceberg documentation.
triggering_frequency_seconds integer For streaming write pipelines, the frequency at which the sink attempts to produce snapshots, in seconds.
drop list of strings A list of field names to drop when writing the table.
keep list of strings A list of field names to keep when writing the table.
only string The name of exactly one field to keep when writing the table. All other fields are dropped. The value of the field must be a row type.

The drop, keep, and only parameters are mutually exclusive. You can set at most one of them.

Dynamic destinations

Instead of writing to a single table, the Apache Iceberg I/O connector can dynamically select a destination table based on field values within the incoming records.

To use dynamic destinations, provide a template for the table name (the table configuration parameter). The template can include field names within curly brackets. At runtime, the connector substitutes the record's field values to determine the destination table.

For example, you could set the destination to "flights.{country}.{airport}". For each input record, the connector substitutes the values of country and airport to get the table name. If a record has values country=usa and airport=RDU, its destination table is flights.usa.RDU.

You can specify nested fields by using dot-notation. For example: {top.middle.nested}.

You might want to filter out certain fields before they are written to the destination table. You can use the drop, keep, or only parameters for this purpose. These filtering parameters let you include destination metadata in the input records without writing that data to the tables.

Example

The following example writes in-memory JSON data to an Apache Iceberg table.

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}