importcom.google.auth.oauth2.GoogleCredentials;importcom.google.common.collect.ImmutableMap;importjava.io.IOException;importjava.util.Map;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.coders.RowCoder;importorg.apache.beam.sdk.extensions.gcp.options.GcpOptions;importorg.apache.beam.sdk.io.GenerateSequence;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Default;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.options.StreamingOptions;importorg.apache.beam.sdk.options.Validation;importorg.apache.beam.sdk.schemas.Schema;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.Row;importorg.apache.beam.sdk.values.TypeDescriptors;importorg.joda.time.Duration;/** * A streaming pipeline that writes data to an Iceberg table using the REST catalog. * * <p>This example demonstrates writing to an Iceberg table backed by the BigLake Metastore. For * more information on BigLake, see the documentation at * https://cloud.google.com/bigquery/docs/blms-rest-catalog. */publicclassApacheIcebergRestCatalogStreamingWrite{// The schema for the generated records.publicstaticfinalSchemaSCHEMA=Schema.builder().addStringField("user_id").addInt64Field("click_count").build();/** Pipeline options for this example. */publicinterfaceOptionsextendsGcpOptions,StreamingOptions{@Description("Warehouse location where the table's data will be written to. "+"BigLake only supports Single Region buckets")@Validation.RequiredStringgetWarehouse();voidsetWarehouse(Stringwarehouse);@Description("The URI for the REST catalog")@Validation.Required@Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog")StringgetCatalogUri();voidsetCatalogUri(Stringvalue);@Description("The name of the table to write to")@Validation.RequiredStringgetIcebergTable();voidsetIcebergTable(Stringvalue);@Description("The name of the Apache Iceberg catalog")@Validation.RequiredStringgetCatalogName();voidsetCatalogName(StringcatalogName);}/** * The main entry point for the pipeline. * * @param args Command-line arguments * @throws IOException If there is an issue with Google Credentials */publicstaticvoidmain(String[]args)throwsIOException{Optionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);options.setStreaming(true);// Note: The token expires in 1 hour. Users may need to re-run the pipeline.// Future updates to Iceberg and the BigLake Metastore will support token refreshing.Map<String,String>catalogProps=ImmutableMap.<String,String>builder().put("type","rest").put("uri",options.getCatalogUri()).put("warehouse",options.getWarehouse()).put("header.x-goog-user-project",options.getProject()).put("header.Authorization","Bearer "+GoogleCredentials.getApplicationDefault().createScoped("https://www.googleapis.com/auth/cloud-platform").refreshAccessToken().getTokenValue()).put("rest-metrics-reporting-enabled","false").build();Map<String,Object>icebergWriteConfig=ImmutableMap.<String,Object>builder().put("table",options.getIcebergTable()).put("catalog_properties",catalogProps).put("catalog_name",options.getCatalogName()).put("triggering_frequency_seconds",20).build();Pipelinep=Pipeline.create(options);p.apply("GenerateSequence",GenerateSequence.from(0).withRate(1,Duration.standardSeconds(5))).apply("ConvertToRows",MapElements.into(TypeDescriptors.rows()).via(i->
Row.withSchema(SCHEMA).withFieldValue("user_id","user-"+(i%10)).withFieldValue("click_count",i%100).build())).setCoder(RowCoder.of(SCHEMA)).apply("WriteToIceberg",Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));p.run();}}
[[["容易理解","easyToUnderstand","thumb-up"],["確實解決了我的問題","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["難以理解","hardToUnderstand","thumb-down"],["資訊或程式碼範例有誤","incorrectInformationOrSampleCode","thumb-down"],["缺少我需要的資訊/範例","missingTheInformationSamplesINeed","thumb-down"],["翻譯問題","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["上次更新時間:2025-09-10 (世界標準時間)。"],[],[],null,["To write from Dataflow to Apache Iceberg using the BigLake REST Catalog, use the\n[managed I/O connector](/dataflow/docs/guides/managed-io-iceberg).\n\nManaged I/O supports the following capabilities for Apache Iceberg:\n\n| Catalogs | - Hadoop - Hive - REST-based catalogs - [BigQuery metastore](/bigquery/docs/about-bqms) (requires Apache Beam SDK 2.62.0 or later if not using Runner v2) |\n| Read capabilities | Batch read |\n| Write capabilities | - Batch write - Streaming write - [Dynamic destinations](/dataflow/docs/guides/write-to-iceberg#dynamic-destinations) - Dynamic table creation |\n|--------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|\n\nFor [BigQuery tables for Apache Iceberg](/bigquery/docs/iceberg-tables),\nuse the\n[`BigQueryIO` connector](https://beam.apache.org/documentation/io/built-in/google-bigquery/)\nwith BigQuery Storage API. The table must already exist; dynamic table creation is\nnot supported.\n\nPrerequisites\n\n**Set up BigLake.** Configure your Google Cloud Platform project with the\nrequired permissions by following\n[Use the BigLake Metastore with the Iceberg REST catalog](/bigquery/docs/blms-rest-catalog).\nMake sure that you understand the limitations of BigLake\nIceberg REST Catalog described on that page.\n\nDependencies\n\nAdd the following dependencies to your project: \n\nJava \n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.beam\u003c/groupId\u003e\n \u003cartifactId\u003ebeam-sdks-java-managed\u003c/artifactId\u003e\n \u003cversion\u003e${beam.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.beam\u003c/groupId\u003e\n \u003cartifactId\u003ebeam-sdks-java-io-iceberg\u003c/artifactId\u003e\n \u003cversion\u003e${beam.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.iceberg\u003c/groupId\u003e\n \u003cartifactId\u003eiceberg-gcp\u003c/artifactId\u003e\n \u003cversion\u003e${iceberg.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\nExample\n\nThe following example demonstrates a streaming pipeline that writes data to an Apache Iceberg table using the REST catalog, backed by the BigLake Metastore. \n\nJava\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.auth.oauth2.https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials.html;\n import com.google.common.collect.ImmutableMap;\n import java.io.IOException;\n import java.util.Map;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.coders.RowCoder;\n import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;\n import org.apache.beam.sdk.io.GenerateSequence;\n import org.apache.beam.sdk.managed.Managed;\n import org.apache.beam.sdk.options.Default;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.options.StreamingOptions;\n import org.apache.beam.sdk.options.Validation;\n import org.apache.beam.sdk.schemas.Schema;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.Row;\n import org.apache.beam.sdk.values.TypeDescriptors;\n import org.joda.time.Duration;\n\n /**\n * A streaming pipeline that writes data to an Iceberg table using the REST catalog.\n *\n * \u003cp\u003eThis example demonstrates writing to an Iceberg table backed by the BigLake Metastore. For\n * more information on BigLake, see the documentation at\n * https://cloud.google.com/bigquery/docs/blms-rest-catalog.\n */\n public class ApacheIcebergRestCatalogStreamingWrite {\n\n // The schema for the generated records.\n public static final Schema SCHEMA =\n Schema.builder().addStringField(\"user_id\").addInt64Field(\"click_count\").build();\n\n /** Pipeline options for this example. */\n public interface Options extends GcpOptions, StreamingOptions {\n @Description(\n \"Warehouse location where the table's data will be written to. \"\n + \"BigLake only supports Single Region buckets\")\n @Validation.Required\n String getWarehouse();\n\n void setWarehouse(String warehouse);\n\n @Description(\"The URI for the REST catalog\")\n @Validation.Required\n @Default.String(\"https://biglake.googleapis.com/iceberg/v1beta/restcatalog\")\n String getCatalogUri();\n\n void setCatalogUri(String value);\n\n @Description(\"The name of the table to write to\")\n @Validation.Required\n String getIcebergTable();\n\n void setIcebergTable(String value);\n\n @Description(\"The name of the Apache Iceberg catalog\")\n @Validation.Required\n String getCatalogName();\n\n void setCatalogName(String catalogName);\n }\n\n /**\n * The main entry point for the pipeline.\n *\n * @param args Command-line arguments\n * @throws IOException If there is an issue with Google Credentials\n */\n public static void main(String[] args) throws IOException {\n Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);\n options.setStreaming(true);\n\n // Note: The token expires in 1 hour. Users may need to re-run the pipeline.\n // Future updates to Iceberg and the BigLake Metastore will support token refreshing.\n Map\u003cString, String\u003e catalogProps =\n ImmutableMap.\u003cString, String\u003ebuilder()\n .put(\"type\", \"rest\")\n .put(\"uri\", options.getCatalogUri())\n .put(\"warehouse\", options.getWarehouse())\n .put(\"header.x-goog-user-project\", options.getProject())\n .put(\n \"header.Authorization\",\n \"Bearer \"\n + https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials.html.https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials.html#com_google_auth_oauth2_GoogleCredentials_getApplicationDefault__()\n .createScoped(\"https://www.googleapis.com/auth/cloud-platform\")\n .refreshAccessToken()\n .getTokenValue())\n .put(\"rest-metrics-reporting-enabled\", \"false\")\n .build();\n\n Map\u003cString, Object\u003e icebergWriteConfig =\n ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"table\", options.getIcebergTable())\n .put(\"catalog_properties\", catalogProps)\n .put(\"catalog_name\", options.getCatalogName())\n .put(\"triggering_frequency_seconds\", 20)\n .build();\n\n Pipeline p = Pipeline.create(options);\n\n p.apply(\n \"GenerateSequence\",\n GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))\n .apply(\n \"ConvertToRows\",\n MapElements.into(TypeDescriptors.rows())\n .via(\n i -\u003e\n Row.withSchema(SCHEMA)\n .withFieldValue(\"user_id\", \"user-\" + (i % 10))\n .withFieldValue(\"click_count\", i % 100)\n .build()))\n .setCoder(RowCoder.https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.ClientId.html#com_google_auth_oauth2_ClientId_of_java_lang_String_java_lang_String_(SCHEMA))\n .apply(\"WriteToIceberg\", Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));\n\n p.run();\n }\n }\n\n\u003cbr /\u003e\n\nWhat's next\n\n- [CDC Read from Apache Iceberg with BigLake REST Catalog](/dataflow/docs/guides/cdc-read-from-iceberg-biglake).\n- Learn more about [Managed I/O](/dataflow/docs/guides/managed-io).\n- Learn more about [BigLake REST Catalog](/bigquery/docs/blms-rest-catalog)."]]