CDC-Lesevorgang aus Apache Iceberg mit BigLake REST Catalog

Wenn Sie CDC-Ereignisse (Change Data Capture) aus Apache Iceberg mit dem BigLake REST Catalog lesen möchten, verwenden Sie den von Apache Beam verwalteten E/A-Connector.

Die verwaltete E/A unterstützt die folgenden Funktionen für Apache Iceberg:

Kataloge
  • Hadoop
  • Hive
  • REST-basierte Kataloge
  • BigQuery-Metastore (erfordert Apache Beam SDK 2.62.0 oder höher, wenn nicht Runner v2 verwendet wird)
Lesefunktionen Batchlesevorgang
Schreibfunktionen
  • Batchschreibvorgang
  • Streaming-Schreibvorgang
  • Dynamische Ziele
  • Dynamische Tabellenerstellung

Verwenden Sie für BigQuery-Tabellen für Apache Iceberg den BigQueryIO-Connector mit der BigQuery Storage API. Die Tabelle muss bereits vorhanden sein. Das dynamische Erstellen von Tabellen wird nicht unterstützt.

Beschränkungen

  1. Apache Iceberg CDC wird nur über die Managed API unterstützt. Die Funktionen des Dienstes für verwaltete Transformationen sind noch nicht aktiviert. Mit Änderungen rechnen, die sich auf die Abwärtskompatibilität auswirken
  2. Die CDC Managed API liest nur Snapshots, die nur angehängt werden. Die vollständige CDC ist noch nicht verfügbar.

Vorbereitung

  1. BigLake einrichten Konfigurieren Sie Ihr Google Cloud-Projekt mit den erforderlichen Berechtigungen. Folgen Sie dazu der Anleitung unter BigLake Metastore mit dem Iceberg REST-Katalog verwenden. Machen Sie sich mit den Einschränkungen des BigLake-Iceberg-REST-Katalogs vertraut, die auf dieser Seite beschrieben werden.
  2. Iceberg-Quelltabelle erstellen: In diesem Beispiel wird davon ausgegangen, dass Sie eine Apache Iceberg-Tabelle haben. Dazu können Sie die Pipeline verwenden, die unter Streaming Write to Apache Iceberg with BigLake REST Catalog beschrieben wird.

Abhängigkeiten

Fügen Sie Ihrem Projekt die folgenden Abhängigkeiten hinzu:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.iceberg</groupId>
  <artifactId>iceberg-gcp</artifactId>
  <version>${iceberg.version}</version>
</dependency>

Beispiel

Im folgenden Beispiel wird eine Streamingpipeline veranschaulicht, die CDC-Ereignisse aus einer Apache Iceberg-Tabelle liest, Nutzerklicks aggregiert und die Ergebnisse in eine andere Apache Iceberg-Tabelle schreibt.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;

/**
 * A streaming pipeline that reads CDC events from an Iceberg table, aggregates user clicks, and
 * writes the results to another Iceberg table. For more information on BigLake, 
 * see the documentation at https://cloud.google.com/bigquery/docs/blms-rest-catalog.
 *
 * <p>This pipeline can be used to process the output of {@link
 * ApacheIcebergRestCatalogStreamingWrite}.
 */
public class ApacheIcebergCdcRead {

  // Schema for the source table containing click events.
  public static final Schema SOURCE_SCHEMA =
      Schema.builder().addStringField("user_id").addInt64Field("click_count").build();

  // Schema for the destination table containing aggregated click counts.
  public static final Schema DESTINATION_SCHEMA =
      Schema.builder().addStringField("user_id").addInt64Field("total_clicks").build();

  /** Pipeline options for this example. */
  public interface Options extends GcpOptions {
    @Description("The source Iceberg table to read CDC events from")
    @Validation.Required
    String getSourceTable();

    void setSourceTable(String sourceTable);

    @Description("The destination Iceberg table to write aggregated results to")
    @Validation.Required
    String getDestinationTable();

    void setDestinationTable(String destinationTable);

    @Description("Warehouse location for the Iceberg catalog")
    @Validation.Required
    String getWarehouse();

    void setWarehouse(String warehouse);

    @Description("The URI for the REST catalog")
    @Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog")
    String getCatalogUri();

    void setCatalogUri(String value);

    @Description("The name of the Iceberg catalog")
    @Validation.Required
    String getCatalogName();

    void setCatalogName(String catalogName);
  }

  public static void main(String[] args) throws IOException {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    // Note: The token expires in 1 hour. Users may need to re-run the pipeline.
    // Future updates to Iceberg and the BigLake Metastore will support token refreshing.
    Map<String, String> catalogProps =
        ImmutableMap.<String, String>builder()
            .put("type", "rest")
            .put("uri", options.getCatalogUri())
            .put("warehouse", options.getWarehouse())
            .put("header.x-goog-user-project", options.getProject())
            .put(
                "header.Authorization",
                "Bearer "
                    + GoogleCredentials.getApplicationDefault()
                        .createScoped("https://www.googleapis.com/auth/cloud-platform")
                        .refreshAccessToken()
                        .getTokenValue())
            .put("rest-metrics-reporting-enabled", "false")
            .build();

    Pipeline p = Pipeline.create(options);

    // Configure the Iceberg CDC read
    Map<String, Object> icebergReadConfig =
        ImmutableMap.<String, Object>builder()
            .put("table", options.getSourceTable())
            .put("catalog_name", options.getCatalogName())
            .put("catalog_properties", catalogProps)
            .put("streaming", Boolean.TRUE)
            .put("poll_interval_seconds", 20)
            .build();

    PCollection<Row> cdcEvents =
        p.apply("ReadFromIceberg", Managed.read(Managed.ICEBERG_CDC).withConfig(icebergReadConfig))
            .getSinglePCollection()
            .setRowSchema(SOURCE_SCHEMA);

    PCollection<Row> aggregatedRows =
        cdcEvents
            .apply("ApplyWindow", Window.into(FixedWindows.of(Duration.standardSeconds(30))))
            .apply(
                "ExtractUserAndCount",
                MapElements.into(
                        TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.longs()))
                    .via(
                        row -> {
                          String userId = row.getString("user_id");
                          Long clickCount = row.getInt64("click_count");
                          return KV.of(userId, clickCount == null ? 0L : clickCount);
                        }))
            .apply("SumClicksPerUser", Sum.longsPerKey())
            .apply(
                "FormatToRow",
                MapElements.into(TypeDescriptors.rows())
                    .via(
                        kv ->
                            Row.withSchema(DESTINATION_SCHEMA)
                                .withFieldValue("user_id", kv.getKey())
                                .withFieldValue("total_clicks", kv.getValue())
                                .build()))
            .setCoder(RowCoder.of(DESTINATION_SCHEMA));

    // Configure the Iceberg write
    Map<String, Object> icebergWriteConfig =
        ImmutableMap.<String, Object>builder()
            .put("table", options.getDestinationTable())
            .put("catalog_properties", catalogProps)
            .put("catalog_name", options.getCatalogName())
            .put("triggering_frequency_seconds", 30)
            .build();

    aggregatedRows.apply(
        "WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));

    p.run();
  }
}

Nächste Schritte