Set up BigLake. Configure your Google Cloud Platform project with the
required permissions by following
Use the BigLake Metastore with the Iceberg REST catalog.
Make sure that you understand the limitations of BigLake
Iceberg REST Catalog described on that page.
The following example demonstrates a streaming pipeline that writes data to an Apache Iceberg table using the REST catalog, backed by the BigLake Metastore.
importcom.google.auth.oauth2.GoogleCredentials;importcom.google.common.collect.ImmutableMap;importjava.io.IOException;importjava.util.Map;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.coders.RowCoder;importorg.apache.beam.sdk.extensions.gcp.options.GcpOptions;importorg.apache.beam.sdk.io.GenerateSequence;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Default;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.options.StreamingOptions;importorg.apache.beam.sdk.options.Validation;importorg.apache.beam.sdk.schemas.Schema;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.Row;importorg.apache.beam.sdk.values.TypeDescriptors;importorg.joda.time.Duration;/** * A streaming pipeline that writes data to an Iceberg table using the REST catalog. * * <p>This example demonstrates writing to an Iceberg table backed by the BigLake Metastore. For * more information on BigLake, see the documentation at * https://cloud.google.com/bigquery/docs/blms-rest-catalog. */publicclassApacheIcebergRestCatalogStreamingWrite{// The schema for the generated records.publicstaticfinalSchemaSCHEMA=Schema.builder().addStringField("user_id").addInt64Field("click_count").build();/** Pipeline options for this example. */publicinterfaceOptionsextendsGcpOptions,StreamingOptions{@Description("Warehouse location where the table's data will be written to. "+"BigLake only supports Single Region buckets")@Validation.RequiredStringgetWarehouse();voidsetWarehouse(Stringwarehouse);@Description("The URI for the REST catalog")@Validation.Required@Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog")StringgetCatalogUri();voidsetCatalogUri(Stringvalue);@Description("The name of the table to write to")@Validation.RequiredStringgetIcebergTable();voidsetIcebergTable(Stringvalue);@Description("The name of the Apache Iceberg catalog")@Validation.RequiredStringgetCatalogName();voidsetCatalogName(StringcatalogName);}/** * The main entry point for the pipeline. * * @param args Command-line arguments * @throws IOException If there is an issue with Google Credentials */publicstaticvoidmain(String[]args)throwsIOException{Optionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);options.setStreaming(true);// Note: The token expires in 1 hour. Users may need to re-run the pipeline.// Future updates to Iceberg and the BigLake Metastore will support token refreshing.Map<String,String>catalogProps=ImmutableMap.<String,String>builder().put("type","rest").put("uri",options.getCatalogUri()).put("warehouse",options.getWarehouse()).put("header.x-goog-user-project",options.getProject()).put("header.Authorization","Bearer "+GoogleCredentials.getApplicationDefault().createScoped("https://www.googleapis.com/auth/cloud-platform").refreshAccessToken().getTokenValue()).put("rest-metrics-reporting-enabled","false").build();Map<String,Object>icebergWriteConfig=ImmutableMap.<String,Object>builder().put("table",options.getIcebergTable()).put("catalog_properties",catalogProps).put("catalog_name",options.getCatalogName()).put("triggering_frequency_seconds",20).build();Pipelinep=Pipeline.create(options);p.apply("GenerateSequence",GenerateSequence.from(0).withRate(1,Duration.standardSeconds(5))).apply("ConvertToRows",MapElements.into(TypeDescriptors.rows()).via(i->
Row.withSchema(SCHEMA).withFieldValue("user_id","user-"+(i%10)).withFieldValue("click_count",i%100).build())).setCoder(RowCoder.of(SCHEMA)).apply("WriteToIceberg",Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));p.run();}}
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2025-09-09 UTC."],[],[],null,["To write from Dataflow to Apache Iceberg using the BigLake REST Catalog, use the\n[managed I/O connector](/dataflow/docs/guides/managed-io-iceberg).\n\nManaged I/O supports the following capabilities for Apache Iceberg:\n\n| Catalogs | - Hadoop - Hive - REST-based catalogs - [BigQuery metastore](/bigquery/docs/about-bqms) (requires Apache Beam SDK 2.62.0 or later if not using Runner v2) |\n| Read capabilities | Batch read |\n| Write capabilities | - Batch write - Streaming write - [Dynamic destinations](/dataflow/docs/guides/write-to-iceberg#dynamic-destinations) - Dynamic table creation |\n|--------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|\n\nFor [BigQuery tables for Apache Iceberg](/bigquery/docs/iceberg-tables),\nuse the\n[`BigQueryIO` connector](https://beam.apache.org/documentation/io/built-in/google-bigquery/)\nwith BigQuery Storage API. The table must already exist; dynamic table creation is\nnot supported.\n\nPrerequisites\n\n**Set up BigLake.** Configure your Google Cloud Platform project with the\nrequired permissions by following\n[Use the BigLake Metastore with the Iceberg REST catalog](/bigquery/docs/blms-rest-catalog).\nMake sure that you understand the limitations of BigLake\nIceberg REST Catalog described on that page.\n\nDependencies\n\nAdd the following dependencies to your project: \n\nJava \n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.beam\u003c/groupId\u003e\n \u003cartifactId\u003ebeam-sdks-java-managed\u003c/artifactId\u003e\n \u003cversion\u003e${beam.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.beam\u003c/groupId\u003e\n \u003cartifactId\u003ebeam-sdks-java-io-iceberg\u003c/artifactId\u003e\n \u003cversion\u003e${beam.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.iceberg\u003c/groupId\u003e\n \u003cartifactId\u003eiceberg-gcp\u003c/artifactId\u003e\n \u003cversion\u003e${iceberg.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\nExample\n\nThe following example demonstrates a streaming pipeline that writes data to an Apache Iceberg table using the REST catalog, backed by the BigLake Metastore. \n\nJava\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.auth.oauth2.https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials.html;\n import com.google.common.collect.ImmutableMap;\n import java.io.IOException;\n import java.util.Map;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.coders.RowCoder;\n import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;\n import org.apache.beam.sdk.io.GenerateSequence;\n import org.apache.beam.sdk.managed.Managed;\n import org.apache.beam.sdk.options.Default;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.options.StreamingOptions;\n import org.apache.beam.sdk.options.Validation;\n import org.apache.beam.sdk.schemas.Schema;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.Row;\n import org.apache.beam.sdk.values.TypeDescriptors;\n import org.joda.time.Duration;\n\n /**\n * A streaming pipeline that writes data to an Iceberg table using the REST catalog.\n *\n * \u003cp\u003eThis example demonstrates writing to an Iceberg table backed by the BigLake Metastore. For\n * more information on BigLake, see the documentation at\n * https://cloud.google.com/bigquery/docs/blms-rest-catalog.\n */\n public class ApacheIcebergRestCatalogStreamingWrite {\n\n // The schema for the generated records.\n public static final Schema SCHEMA =\n Schema.builder().addStringField(\"user_id\").addInt64Field(\"click_count\").build();\n\n /** Pipeline options for this example. */\n public interface Options extends GcpOptions, StreamingOptions {\n @Description(\n \"Warehouse location where the table's data will be written to. \"\n + \"BigLake only supports Single Region buckets\")\n @Validation.Required\n String getWarehouse();\n\n void setWarehouse(String warehouse);\n\n @Description(\"The URI for the REST catalog\")\n @Validation.Required\n @Default.String(\"https://biglake.googleapis.com/iceberg/v1beta/restcatalog\")\n String getCatalogUri();\n\n void setCatalogUri(String value);\n\n @Description(\"The name of the table to write to\")\n @Validation.Required\n String getIcebergTable();\n\n void setIcebergTable(String value);\n\n @Description(\"The name of the Apache Iceberg catalog\")\n @Validation.Required\n String getCatalogName();\n\n void setCatalogName(String catalogName);\n }\n\n /**\n * The main entry point for the pipeline.\n *\n * @param args Command-line arguments\n * @throws IOException If there is an issue with Google Credentials\n */\n public static void main(String[] args) throws IOException {\n Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);\n options.setStreaming(true);\n\n // Note: The token expires in 1 hour. Users may need to re-run the pipeline.\n // Future updates to Iceberg and the BigLake Metastore will support token refreshing.\n Map\u003cString, String\u003e catalogProps =\n ImmutableMap.\u003cString, String\u003ebuilder()\n .put(\"type\", \"rest\")\n .put(\"uri\", options.getCatalogUri())\n .put(\"warehouse\", options.getWarehouse())\n .put(\"header.x-goog-user-project\", options.getProject())\n .put(\n \"header.Authorization\",\n \"Bearer \"\n + https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials.html.https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials.html#com_google_auth_oauth2_GoogleCredentials_getApplicationDefault__()\n .createScoped(\"https://www.googleapis.com/auth/cloud-platform\")\n .refreshAccessToken()\n .getTokenValue())\n .put(\"rest-metrics-reporting-enabled\", \"false\")\n .build();\n\n Map\u003cString, Object\u003e icebergWriteConfig =\n ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"table\", options.getIcebergTable())\n .put(\"catalog_properties\", catalogProps)\n .put(\"catalog_name\", options.getCatalogName())\n .put(\"triggering_frequency_seconds\", 20)\n .build();\n\n Pipeline p = Pipeline.create(options);\n\n p.apply(\n \"GenerateSequence\",\n GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))\n .apply(\n \"ConvertToRows\",\n MapElements.into(TypeDescriptors.rows())\n .via(\n i -\u003e\n Row.withSchema(SCHEMA)\n .withFieldValue(\"user_id\", \"user-\" + (i % 10))\n .withFieldValue(\"click_count\", i % 100)\n .build()))\n .setCoder(RowCoder.https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.ClientId.html#com_google_auth_oauth2_ClientId_of_java_lang_String_java_lang_String_(SCHEMA))\n .apply(\"WriteToIceberg\", Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));\n\n p.run();\n }\n }\n\n\u003cbr /\u003e\n\nWhat's next\n\n- [CDC Read from Apache Iceberg with BigLake REST Catalog](/dataflow/docs/guides/cdc-read-from-iceberg-biglake).\n- Learn more about [Managed I/O](/dataflow/docs/guides/managed-io).\n- Learn more about [BigLake REST Catalog](/bigquery/docs/blms-rest-catalog)."]]