使用 BigLake REST 目錄將串流寫入 Apache Iceberg

如要使用 BigLake REST 目錄,從 Dataflow 寫入 Apache Iceberg,請使用受管理 I/O 連接器

代管 I/O 支援 Apache Iceberg 的下列功能:

目錄
讀取功能 批次讀取
寫入功能

如果是 Apache Iceberg 專用 BigQuery 資料表,請搭配 BigQuery Storage API 使用 BigQueryIO 連接器。資料表必須已存在,不支援動態建立資料表。

必要條件

設定 BigLake。請按照「搭配 Iceberg REST 目錄使用 BigLake Metastore」一文的說明,為 Google Cloud Platform 專案設定必要權限。請務必瞭解該頁面所述的 BigLake Iceberg REST 目錄限制。

依附元件

將下列依附元件新增至專案:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.iceberg</groupId>
  <artifactId>iceberg-gcp</artifactId>
  <version>${iceberg.version}</version>
</dependency>

範例

以下範例示範串流管道,該管道使用 REST 目錄將資料寫入 Apache Iceberg 資料表,並由 BigLake Metastore 支援。

Java

如要向 Dataflow 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;

/**
 * A streaming pipeline that writes data to an Iceberg table using the REST catalog.
 *
 * <p>This example demonstrates writing to an Iceberg table backed by the BigLake Metastore. For
 * more information on BigLake, see the documentation at
 * https://cloud.google.com/bigquery/docs/blms-rest-catalog.
 */
public class ApacheIcebergRestCatalogStreamingWrite {

  // The schema for the generated records.
  public static final Schema SCHEMA =
      Schema.builder().addStringField("user_id").addInt64Field("click_count").build();

  /** Pipeline options for this example. */
  public interface Options extends GcpOptions, StreamingOptions {
    @Description(
        "Warehouse location where the table's data will be written to. "
            + "BigLake only supports Single Region buckets")
    @Validation.Required
    String getWarehouse();

    void setWarehouse(String warehouse);

    @Description("The URI for the REST catalog")
    @Validation.Required
    @Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog")
    String getCatalogUri();

    void setCatalogUri(String value);

    @Description("The name of the table to write to")
    @Validation.Required
    String getIcebergTable();

    void setIcebergTable(String value);

    @Description("The name of the Apache Iceberg catalog")
    @Validation.Required
    String getCatalogName();

    void setCatalogName(String catalogName);
  }

  /**
   * The main entry point for the pipeline.
   *
   * @param args Command-line arguments
   * @throws IOException If there is an issue with Google Credentials
   */
  public static void main(String[] args) throws IOException {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);

    // Note: The token expires in 1 hour. Users may need to re-run the pipeline.
    // Future updates to Iceberg and the BigLake Metastore will support token refreshing.
    Map<String, String> catalogProps =
        ImmutableMap.<String, String>builder()
            .put("type", "rest")
            .put("uri", options.getCatalogUri())
            .put("warehouse", options.getWarehouse())
            .put("header.x-goog-user-project", options.getProject())
            .put(
                "header.Authorization",
                "Bearer "
                    + GoogleCredentials.getApplicationDefault()
                        .createScoped("https://www.googleapis.com/auth/cloud-platform")
                        .refreshAccessToken()
                        .getTokenValue())
            .put("rest-metrics-reporting-enabled", "false")
            .build();

    Map<String, Object> icebergWriteConfig =
        ImmutableMap.<String, Object>builder()
            .put("table", options.getIcebergTable())
            .put("catalog_properties", catalogProps)
            .put("catalog_name", options.getCatalogName())
            .put("triggering_frequency_seconds", 20)
            .build();

    Pipeline p = Pipeline.create(options);

    p.apply(
            "GenerateSequence",
            GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
        .apply(
            "ConvertToRows",
            MapElements.into(TypeDescriptors.rows())
                .via(
                    i ->
                        Row.withSchema(SCHEMA)
                            .withFieldValue("user_id", "user-" + (i % 10))
                            .withFieldValue("click_count", i % 100)
                            .build()))
        .setCoder(RowCoder.of(SCHEMA))
        .apply("WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));

    p.run();
  }
}

後續步驟