다음 예에서는 BigLake Metastore에서 지원하는 REST 카탈로그를 사용하여 Apache Iceberg 테이블에 데이터를 쓰는 스트리밍 파이프라인을 보여줍니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다.
자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
importcom.google.auth.oauth2.GoogleCredentials;importcom.google.common.collect.ImmutableMap;importjava.io.IOException;importjava.util.Map;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.coders.RowCoder;importorg.apache.beam.sdk.extensions.gcp.options.GcpOptions;importorg.apache.beam.sdk.io.GenerateSequence;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Default;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.options.StreamingOptions;importorg.apache.beam.sdk.options.Validation;importorg.apache.beam.sdk.schemas.Schema;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.Row;importorg.apache.beam.sdk.values.TypeDescriptors;importorg.joda.time.Duration;/** * A streaming pipeline that writes data to an Iceberg table using the REST catalog. * * <p>This example demonstrates writing to an Iceberg table backed by the BigLake Metastore. For * more information on BigLake, see the documentation at * https://cloud.google.com/bigquery/docs/blms-rest-catalog. */publicclassApacheIcebergRestCatalogStreamingWrite{// The schema for the generated records.publicstaticfinalSchemaSCHEMA=Schema.builder().addStringField("user_id").addInt64Field("click_count").build();/** Pipeline options for this example. */publicinterfaceOptionsextendsGcpOptions,StreamingOptions{@Description("Warehouse location where the table's data will be written to. "+"BigLake only supports Single Region buckets")@Validation.RequiredStringgetWarehouse();voidsetWarehouse(Stringwarehouse);@Description("The URI for the REST catalog")@Validation.Required@Default.String("https://biglake.googleapis.com/iceberg/v1beta/restcatalog")StringgetCatalogUri();voidsetCatalogUri(Stringvalue);@Description("The name of the table to write to")@Validation.RequiredStringgetIcebergTable();voidsetIcebergTable(Stringvalue);@Description("The name of the Apache Iceberg catalog")@Validation.RequiredStringgetCatalogName();voidsetCatalogName(StringcatalogName);}/** * The main entry point for the pipeline. * * @param args Command-line arguments * @throws IOException If there is an issue with Google Credentials */publicstaticvoidmain(String[]args)throwsIOException{Optionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);options.setStreaming(true);// Note: The token expires in 1 hour. Users may need to re-run the pipeline.// Future updates to Iceberg and the BigLake Metastore will support token refreshing.Map<String,String>catalogProps=ImmutableMap.<String,String>builder().put("type","rest").put("uri",options.getCatalogUri()).put("warehouse",options.getWarehouse()).put("header.x-goog-user-project",options.getProject()).put("header.Authorization","Bearer "+GoogleCredentials.getApplicationDefault().createScoped("https://www.googleapis.com/auth/cloud-platform").refreshAccessToken().getTokenValue()).put("rest-metrics-reporting-enabled","false").build();Map<String,Object>icebergWriteConfig=ImmutableMap.<String,Object>builder().put("table",options.getIcebergTable()).put("catalog_properties",catalogProps).put("catalog_name",options.getCatalogName()).put("triggering_frequency_seconds",20).build();Pipelinep=Pipeline.create(options);p.apply("GenerateSequence",GenerateSequence.from(0).withRate(1,Duration.standardSeconds(5))).apply("ConvertToRows",MapElements.into(TypeDescriptors.rows()).via(i->
Row.withSchema(SCHEMA).withFieldValue("user_id","user-"+(i%10)).withFieldValue("click_count",i%100).build())).setCoder(RowCoder.of(SCHEMA)).apply("WriteToIceberg",Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));p.run();}}
[[["이해하기 쉬움","easyToUnderstand","thumb-up"],["문제가 해결됨","solvedMyProblem","thumb-up"],["기타","otherUp","thumb-up"]],[["이해하기 어려움","hardToUnderstand","thumb-down"],["잘못된 정보 또는 샘플 코드","incorrectInformationOrSampleCode","thumb-down"],["필요한 정보/샘플이 없음","missingTheInformationSamplesINeed","thumb-down"],["번역 문제","translationIssue","thumb-down"],["기타","otherDown","thumb-down"]],["최종 업데이트: 2025-09-10(UTC)"],[],[],null,["To write from Dataflow to Apache Iceberg using the BigLake REST Catalog, use the\n[managed I/O connector](/dataflow/docs/guides/managed-io-iceberg).\n\nManaged I/O supports the following capabilities for Apache Iceberg:\n\n| Catalogs | - Hadoop - Hive - REST-based catalogs - [BigQuery metastore](/bigquery/docs/about-bqms) (requires Apache Beam SDK 2.62.0 or later if not using Runner v2) |\n| Read capabilities | Batch read |\n| Write capabilities | - Batch write - Streaming write - [Dynamic destinations](/dataflow/docs/guides/write-to-iceberg#dynamic-destinations) - Dynamic table creation |\n|--------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|\n\nFor [BigQuery tables for Apache Iceberg](/bigquery/docs/iceberg-tables),\nuse the\n[`BigQueryIO` connector](https://beam.apache.org/documentation/io/built-in/google-bigquery/)\nwith BigQuery Storage API. The table must already exist; dynamic table creation is\nnot supported.\n\nPrerequisites\n\n**Set up BigLake.** Configure your Google Cloud Platform project with the\nrequired permissions by following\n[Use the BigLake Metastore with the Iceberg REST catalog](/bigquery/docs/blms-rest-catalog).\nMake sure that you understand the limitations of BigLake\nIceberg REST Catalog described on that page.\n\nDependencies\n\nAdd the following dependencies to your project: \n\nJava \n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.beam\u003c/groupId\u003e\n \u003cartifactId\u003ebeam-sdks-java-managed\u003c/artifactId\u003e\n \u003cversion\u003e${beam.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.beam\u003c/groupId\u003e\n \u003cartifactId\u003ebeam-sdks-java-io-iceberg\u003c/artifactId\u003e\n \u003cversion\u003e${beam.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.iceberg\u003c/groupId\u003e\n \u003cartifactId\u003eiceberg-gcp\u003c/artifactId\u003e\n \u003cversion\u003e${iceberg.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\nExample\n\nThe following example demonstrates a streaming pipeline that writes data to an Apache Iceberg table using the REST catalog, backed by the BigLake Metastore. \n\nJava\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.auth.oauth2.https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials.html;\n import com.google.common.collect.ImmutableMap;\n import java.io.IOException;\n import java.util.Map;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.coders.RowCoder;\n import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;\n import org.apache.beam.sdk.io.GenerateSequence;\n import org.apache.beam.sdk.managed.Managed;\n import org.apache.beam.sdk.options.Default;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.options.StreamingOptions;\n import org.apache.beam.sdk.options.Validation;\n import org.apache.beam.sdk.schemas.Schema;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.Row;\n import org.apache.beam.sdk.values.TypeDescriptors;\n import org.joda.time.Duration;\n\n /**\n * A streaming pipeline that writes data to an Iceberg table using the REST catalog.\n *\n * \u003cp\u003eThis example demonstrates writing to an Iceberg table backed by the BigLake Metastore. For\n * more information on BigLake, see the documentation at\n * https://cloud.google.com/bigquery/docs/blms-rest-catalog.\n */\n public class ApacheIcebergRestCatalogStreamingWrite {\n\n // The schema for the generated records.\n public static final Schema SCHEMA =\n Schema.builder().addStringField(\"user_id\").addInt64Field(\"click_count\").build();\n\n /** Pipeline options for this example. */\n public interface Options extends GcpOptions, StreamingOptions {\n @Description(\n \"Warehouse location where the table's data will be written to. \"\n + \"BigLake only supports Single Region buckets\")\n @Validation.Required\n String getWarehouse();\n\n void setWarehouse(String warehouse);\n\n @Description(\"The URI for the REST catalog\")\n @Validation.Required\n @Default.String(\"https://biglake.googleapis.com/iceberg/v1beta/restcatalog\")\n String getCatalogUri();\n\n void setCatalogUri(String value);\n\n @Description(\"The name of the table to write to\")\n @Validation.Required\n String getIcebergTable();\n\n void setIcebergTable(String value);\n\n @Description(\"The name of the Apache Iceberg catalog\")\n @Validation.Required\n String getCatalogName();\n\n void setCatalogName(String catalogName);\n }\n\n /**\n * The main entry point for the pipeline.\n *\n * @param args Command-line arguments\n * @throws IOException If there is an issue with Google Credentials\n */\n public static void main(String[] args) throws IOException {\n Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);\n options.setStreaming(true);\n\n // Note: The token expires in 1 hour. Users may need to re-run the pipeline.\n // Future updates to Iceberg and the BigLake Metastore will support token refreshing.\n Map\u003cString, String\u003e catalogProps =\n ImmutableMap.\u003cString, String\u003ebuilder()\n .put(\"type\", \"rest\")\n .put(\"uri\", options.getCatalogUri())\n .put(\"warehouse\", options.getWarehouse())\n .put(\"header.x-goog-user-project\", options.getProject())\n .put(\n \"header.Authorization\",\n \"Bearer \"\n + https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials.html.https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials.html#com_google_auth_oauth2_GoogleCredentials_getApplicationDefault__()\n .createScoped(\"https://www.googleapis.com/auth/cloud-platform\")\n .refreshAccessToken()\n .getTokenValue())\n .put(\"rest-metrics-reporting-enabled\", \"false\")\n .build();\n\n Map\u003cString, Object\u003e icebergWriteConfig =\n ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"table\", options.getIcebergTable())\n .put(\"catalog_properties\", catalogProps)\n .put(\"catalog_name\", options.getCatalogName())\n .put(\"triggering_frequency_seconds\", 20)\n .build();\n\n Pipeline p = Pipeline.create(options);\n\n p.apply(\n \"GenerateSequence\",\n GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))\n .apply(\n \"ConvertToRows\",\n MapElements.into(TypeDescriptors.rows())\n .via(\n i -\u003e\n Row.withSchema(SCHEMA)\n .withFieldValue(\"user_id\", \"user-\" + (i % 10))\n .withFieldValue(\"click_count\", i % 100)\n .build()))\n .setCoder(RowCoder.https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.ClientId.html#com_google_auth_oauth2_ClientId_of_java_lang_String_java_lang_String_(SCHEMA))\n .apply(\"WriteToIceberg\", Managed.write(Managed.ICEBERG).withConfig(icebergWriteConfig));\n\n p.run();\n }\n }\n\n\u003cbr /\u003e\n\nWhat's next\n\n- [CDC Read from Apache Iceberg with BigLake REST Catalog](/dataflow/docs/guides/cdc-read-from-iceberg-biglake).\n- Learn more about [Managed I/O](/dataflow/docs/guides/managed-io).\n- Learn more about [BigLake REST Catalog](/bigquery/docs/blms-rest-catalog)."]]