使用 BigLake REST 目录将数据流式写入 Apache Iceberg

如需使用 BigLake REST Catalog 从 Dataflow 写入 Apache Iceberg,请使用托管式 I/O 连接器

托管式 I/O 支持 Apache Iceberg 的以下功能:

目录
  • Hadoop
  • Hive
  • 基于 REST 的目录
  • BigQuery Metastore(如果不使用 Runner v2,则需要使用 Apache Beam SDK 2.62.0 或更高版本)
读取功能 批处理读取
写入功能
  • 批处理写入
  • 流式写入
  • 动态目标
  • 动态表格创建

对于 Apache Iceberg 的 BigQuery 表,请将 BigQueryIO 连接器与 BigQuery Storage API 搭配使用。该表必须已经存在;不支持动态表格创建。

前提条件

设置 BigLake。按照将 BigLake Metastore 与 Iceberg REST 目录搭配使用中的说明,为您的 Google Cloud Platform 项目配置所需的权限。 请务必了解该页面上所述的 BigLake Iceberg REST 目录的限制。

依赖项

将以下依赖项添加到项目中:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.iceberg</groupId>
  <artifactId>iceberg-gcp</artifactId>
  <version>${iceberg.version}</version>
</dependency>

示例

以下示例演示了一个流处理流水线,该流水线使用由 BigLake Metastore 支持的 REST 目录将数据写入 Apache Iceberg 表。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;

/**
 * A streaming pipeline that writes data to an Iceberg table using the REST catalog.
 *
 * <p>This example demonstrates writing to an Iceberg table backed by the BigLake Metastore. For
 * more information on BigLake, see the documentation at
 * https://cloud.google.com/bigquery/docs/blms-rest-catalog.
 */
public class ApacheIcebergRestCatalogStreamingWrite {

  // The schema for the generated records.
  public static final Schema SCHEMA =
      Schema.builder().addStringField("user_id").addInt64Field("click_count").build();

  /** Pipeline options for this example. */
  public interface Options extends GcpOptions, StreamingOptions {
    @Description(
        "Warehouse location where the table's data will be written to. "
            + "BigLake only supports Single Region buckets")
    @Validation.Required
    String getWarehouse();

    void setWarehouse(String warehouse);

    @Description("The URI for the REST catalog")
    @Validation.Required
    @Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog")
    String getCatalogUri();

    void setCatalogUri(String value);

    @Description("The name of the table to write to")
    @Validation.Required
    String getIcebergTable();

    void setIcebergTable(String value);

    @Description("The name of the Apache Iceberg catalog")
    @Validation.Required
    String getCatalogName();

    void setCatalogName(String catalogName);
  }

  /**
   * The main entry point for the pipeline.
   *
   * @param args Command-line arguments
   * @throws IOException If there is an issue with Google Credentials
   */
  public static void main(String[] args) throws IOException {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);

    // Note: The token expires in 1 hour. Users may need to re-run the pipeline.
    // Future updates to Iceberg and the BigLake Metastore will support token refreshing.
    Map<String, String> catalogProps =
        ImmutableMap.<String, String>builder()
            .put("type", "rest")
            .put("uri", options.getCatalogUri())
            .put("warehouse", options.getWarehouse())
            .put("header.x-goog-user-project", options.getProject())
            .put(
                "header.Authorization",
                "Bearer "
                    + GoogleCredentials.getApplicationDefault()
                        .createScoped("https://www.googleapis.com/auth/cloud-platform")
                        .refreshAccessToken()
                        .getTokenValue())
            .put("rest-metrics-reporting-enabled", "false")
            .build();

    Map<String, Object> icebergWriteConfig =
        ImmutableMap.<String, Object>builder()
            .put("table", options.getIcebergTable())
            .put("catalog_properties", catalogProps)
            .put("catalog_name", options.getCatalogName())
            .put("triggering_frequency_seconds", 20)
            .build();

    Pipeline p = Pipeline.create(options);

    p.apply(
            "GenerateSequence",
            GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
        .apply(
            "ConvertToRows",
            MapElements.into(TypeDescriptors.rows())
                .via(
                    i ->
                        Row.withSchema(SCHEMA)
                            .withFieldValue("user_id", "user-" + (i % 10))
                            .withFieldValue("click_count", i % 100)
                            .build()))
        .setCoder(RowCoder.of(SCHEMA))
        .apply("WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));

    p.run();
  }
}

后续步骤