Apache Iceberg 카탈로그의 구성 속성 맵입니다. 필요한 속성은 카탈로그에 따라 달라집니다. 자세한 내용은 Apache Iceberg 문서의 CatalogUtil을 참조하세요.
config_properties
지도
Hadoop 구성 속성의 선택적 집합입니다. 자세한 내용은 Apache Iceberg 문서의 CatalogUtil을 참조하세요.
쓰기 구성
데이터 유형
설명
triggering_frequency_seconds
정수
스트리밍 쓰기 파이프라인의 경우 싱크에서 스냅샷을 생성하려고 시도하는 빈도(초)입니다.
동적 대상
Apache Iceberg용 관리형 I/O는 동적 대상을 지원합니다. 커넥터는 고정된 단일 테이블에 쓰는 대신 수신 레코드 내 필드 값을 기반으로 대상 테이블을 동적으로 선택할 수 있습니다.
동적 대상을 사용하려면 table 구성 매개변수에 대한 템플릿을 제공합니다. 자세한 내용은 동적 대상을 참고하세요.
예시
다음 예시에서는 관리형 I/O를 사용하여 Apache Iceberg에 쓰는 방법을 보여줍니다.
Apache Iceberg 테이블에 쓰기
다음 예시에서는 메모리 내 JSON 데이터를 Apache Iceberg 테이블에 씁니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다.
자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
importcom.google.common.collect.ImmutableMap;importjava.util.Arrays;importjava.util.List;importjava.util.Map;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.schemas.Schema;importorg.apache.beam.sdk.transforms.Create;importorg.apache.beam.sdk.transforms.JsonToRow;importorg.apache.beam.sdk.values.PCollectionRowTuple;publicclassApacheIcebergWrite{staticfinalList<String>TABLE_ROWS=Arrays.asList("{\"id\":0, \"name\":\"Alice\"}","{\"id\":1, \"name\":\"Bob\"}","{\"id\":2, \"name\":\"Charles\"}");staticfinalStringCATALOG_TYPE="hadoop";// The schema for the table rows.publicstaticfinalSchemaSCHEMA=newSchema.Builder().addStringField("name").addInt64Field("id").build();publicinterfaceOptionsextendsPipelineOptions{@Description("The URI of the Apache Iceberg warehouse location")StringgetWarehouseLocation();voidsetWarehouseLocation(Stringvalue);@Description("The name of the Apache Iceberg catalog")StringgetCatalogName();voidsetCatalogName(Stringvalue);@Description("The name of the table to write to")StringgetTableName();voidsetTableName(Stringvalue);}publicstaticvoidmain(String[]args){// Parse the pipeline options passed into the application. Example:// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \// --tableName= $TABLE_NAME// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);Pipelinepipeline=Pipeline.create(options);// Configure the Iceberg source I/OMapcatalogConfig=ImmutableMap.<String,Object>builder().put("warehouse",options.getWarehouseLocation()).put("type",CATALOG_TYPE).build();ImmutableMap<String,Object>config=ImmutableMap.<String,Object>builder().put("table",options.getTableName()).put("catalog_name",options.getCatalogName()).put("catalog_properties",catalogConfig).build();// Build the pipeline.pipeline.apply(Create.of(TABLE_ROWS)).apply(JsonToRow.withSchema(SCHEMA)).apply(Managed.write(Managed.ICEBERG).withConfig(config));pipeline.run().waitUntilFinish();}}
동적 대상으로 쓰기
다음 예시에서는 입력 데이터의 필드에 따라 여러 Apache Iceberg 테이블에 씁니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다.
자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
importcom.google.common.collect.ImmutableMap;importjava.util.Arrays;importjava.util.List;importjava.util.Map;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.PipelineResult;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.schemas.Schema;importorg.apache.beam.sdk.transforms.Create;importorg.apache.beam.sdk.transforms.JsonToRow;publicclassApacheIcebergDynamicDestinations{// The schema for the table rows.publicstaticfinalSchemaSCHEMA=newSchema.Builder().addInt64Field("id").addStringField("name").addStringField("airport").build();// The data to write to table, formatted as JSON strings.staticfinalList<String>TABLE_ROWS=List.of("{\"id\":0, \"name\":\"Alice\", \"airport\": \"ORD\" }","{\"id\":1, \"name\":\"Bob\", \"airport\": \"SYD\" }","{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }");publicinterfaceOptionsextendsPipelineOptions{@Description("The URI of the Apache Iceberg warehouse location")StringgetWarehouseLocation();voidsetWarehouseLocation(Stringvalue);@Description("The name of the Apache Iceberg catalog")StringgetCatalogName();voidsetCatalogName(Stringvalue);}// Write JSON data to Apache Iceberg, using dynamic destinations to determine the Iceberg table// where Dataflow writes each record. The JSON data contains a field named "airport". The// Dataflow pipeline writes to Iceberg tables with the naming pattern "flights-{airport}".publicstaticvoidmain(String[]args){// Parse the pipeline options passed into the application. Example:// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);Pipelinepipeline=Pipeline.create(options);// Configure the Iceberg source I/OMapcatalogConfig=ImmutableMap.<String,Object>builder().put("warehouse",options.getWarehouseLocation()).put("type","hadoop").build();ImmutableMap<String,Object>config=ImmutableMap.<String,Object>builder().put("catalog_name",options.getCatalogName()).put("catalog_properties",catalogConfig)// Route the incoming records based on the value of the "airport" field..put("table","flights-{airport}")// Specify which fields to keep from the input data..put("keep",Arrays.asList("name","id")).build();// Build the pipeline.pipeline// Read in-memory JSON data..apply(Create.of(TABLE_ROWS))// Convert the JSON records to Row objects..apply(JsonToRow.withSchema(SCHEMA))// Write each Row to Apache Iceberg..apply(Managed.write(Managed.ICEBERG).withConfig(config));// Run the pipeline.pipeline.run().waitUntilFinish();}}
[[["이해하기 쉬움","easyToUnderstand","thumb-up"],["문제가 해결됨","solvedMyProblem","thumb-up"],["기타","otherUp","thumb-up"]],[["이해하기 어려움","hardToUnderstand","thumb-down"],["잘못된 정보 또는 샘플 코드","incorrectInformationOrSampleCode","thumb-down"],["필요한 정보/샘플이 없음","missingTheInformationSamplesINeed","thumb-down"],["번역 문제","translationIssue","thumb-down"],["기타","otherDown","thumb-down"]],["최종 업데이트: 2025-08-08(UTC)"],[[["\u003cp\u003eManaged I/O connector enables writing from Dataflow to Apache Iceberg, supporting batch and streaming writes, as well as dynamic destinations and dynamic table creation.\u003c/p\u003e\n"],["\u003cp\u003eSupported catalogs for Apache Iceberg include Hadoop, Hive, REST-based catalogs, and BigQuery metastore, with specific dependencies or version requirements for some.\u003c/p\u003e\n"],["\u003cp\u003eThe \u003ccode\u003eBigQueryIO\u003c/code\u003e connector with BigQuery Storage API should be used for BigQuery tables for Apache Iceberg, noting that these tables must already exist, as dynamic table creation is not supported.\u003c/p\u003e\n"],["\u003cp\u003eConfiguration for Managed I/O to write to Apache Iceberg involves specifying parameters like \u003ccode\u003etable\u003c/code\u003e, \u003ccode\u003ecatalog_name\u003c/code\u003e, \u003ccode\u003ecatalog_properties\u003c/code\u003e, \u003ccode\u003econfig_properties\u003c/code\u003e, and \u003ccode\u003etriggering_frequency_seconds\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eManaged I/O for Apache Iceberg supports writing to multiple tables dynamically based on field values within incoming records, demonstrated through examples that showcase both standard and dynamic table writing processes.\u003c/p\u003e\n"]]],[],null,["# Write from Dataflow to Apache Iceberg\n\nTo write from Dataflow to Apache Iceberg, use the\n[managed I/O connector](/dataflow/docs/guides/managed-io-iceberg).\n\nManaged I/O supports the following capabilities for Apache Iceberg:\n\nFor [BigQuery tables for Apache Iceberg](/bigquery/docs/iceberg-tables),\nuse the\n[`BigQueryIO` connector](https://beam.apache.org/documentation/io/built-in/google-bigquery/)\nwith BigQuery Storage API. The table must already exist; dynamic table creation is\nnot supported.\n\nDependencies\n------------\n\nAdd the following dependencies to your project: \n\n### Java\n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.beam\u003c/groupId\u003e\n \u003cartifactId\u003ebeam-sdks-java-managed\u003c/artifactId\u003e\n \u003cversion\u003e${beam.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\n \u003cdependency\u003e\n \u003cgroupId\u003eorg.apache.beam\u003c/groupId\u003e\n \u003cartifactId\u003ebeam-sdks-java-io-iceberg\u003c/artifactId\u003e\n \u003cversion\u003e${beam.version}\u003c/version\u003e\n \u003c/dependency\u003e\n\nDynamic destinations\n--------------------\n\nManaged I/O for Apache Iceberg supports dynamic destinations. Instead of\nwriting to a single fixed table, the connector can dynamically select a\ndestination table based on field values within the incoming records.\n\nTo use dynamic destinations, provide a template for the `table` configuration\nparameter. For more information, see\n[Dynamic destinations](/dataflow/docs/guides/managed-io#dynamic-destinations).\n\nExamples\n--------\n\nThe following examples show how to use Managed I/O to write to\nApache Iceberg.\n\n### Write to an Apache Iceberg table\n\nThe following example writes in-memory JSON data to an Apache Iceberg table. \n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.common.collect.ImmutableMap;\n import java.util.Arrays;\n import java.util.List;\n import java.util.Map;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.managed.Managed;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptions;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.schemas.Schema;\n import org.apache.beam.sdk.transforms.Create;\n import org.apache.beam.sdk.transforms.JsonToRow;\n import org.apache.beam.sdk.values.PCollectionRowTuple;\n\n public class ApacheIcebergWrite {\n static final List\u003cString\u003e TABLE_ROWS = Arrays.asList(\n \"{\\\"id\\\":0, \\\"name\\\":\\\"Alice\\\"}\",\n \"{\\\"id\\\":1, \\\"name\\\":\\\"Bob\\\"}\",\n \"{\\\"id\\\":2, \\\"name\\\":\\\"Charles\\\"}\"\n );\n\n static final String CATALOG_TYPE = \"hadoop\";\n\n // The schema for the table rows.\n public static final Schema SCHEMA = new Schema.Builder()\n .addStringField(\"name\")\n .addInt64Field(\"id\")\n .build();\n\n public interface Options extends PipelineOptions {\n @Description(\"The URI of the Apache Iceberg warehouse location\")\n String getWarehouseLocation();\n\n void setWarehouseLocation(String value);\n\n @Description(\"The name of the Apache Iceberg catalog\")\n String getCatalogName();\n\n void setCatalogName(String value);\n\n @Description(\"The name of the table to write to\")\n String getTableName();\n\n void setTableName(String value);\n }\n\n public static void main(String[] args) {\n\n // Parse the pipeline options passed into the application. Example:\n // --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \\\n // --tableName= $TABLE_NAME\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);\n Pipeline pipeline = Pipeline.create(options);\n\n // Configure the Iceberg source I/O\n Map catalogConfig = ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"warehouse\", options.getWarehouseLocation())\n .put(\"type\", CATALOG_TYPE)\n .build();\n\n ImmutableMap\u003cString, Object\u003e config = ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"table\", options.getTableName())\n .put(\"catalog_name\", options.getCatalogName())\n .put(\"catalog_properties\", catalogConfig)\n .build();\n\n // Build the pipeline.\n pipeline.apply(Create.of(TABLE_ROWS))\n .apply(JsonToRow.withSchema(SCHEMA))\n .apply(Managed.write(Managed.ICEBERG).withConfig(config));\n\n pipeline.run().waitUntilFinish();\n }\n }\n\n\u003cbr /\u003e\n\n### Write with dynamic destinations\n\nThe following example writes to different Apache Iceberg tables based on a\nfield in the input data. \n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.common.collect.ImmutableMap;\n import java.util.Arrays;\n import java.util.List;\n import java.util.Map;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.PipelineResult;\n import org.apache.beam.sdk.managed.Managed;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptions;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.schemas.Schema;\n import org.apache.beam.sdk.transforms.Create;\n import org.apache.beam.sdk.transforms.JsonToRow;\n\n public class ApacheIcebergDynamicDestinations {\n\n // The schema for the table rows.\n public static final Schema SCHEMA = new Schema.Builder()\n .addInt64Field(\"id\")\n .addStringField(\"name\")\n .addStringField(\"airport\")\n .build();\n\n // The data to write to table, formatted as JSON strings.\n static final List\u003cString\u003e TABLE_ROWS = List.of(\n \"{\\\"id\\\":0, \\\"name\\\":\\\"Alice\\\", \\\"airport\\\": \\\"ORD\\\" }\",\n \"{\\\"id\\\":1, \\\"name\\\":\\\"Bob\\\", \\\"airport\\\": \\\"SYD\\\" }\",\n \"{\\\"id\\\":2, \\\"name\\\":\\\"Charles\\\", \\\"airport\\\": \\\"ORD\\\" }\"\n );\n\n public interface Options extends PipelineOptions {\n @Description(\"The URI of the Apache Iceberg warehouse location\")\n String getWarehouseLocation();\n\n void setWarehouseLocation(String value);\n\n @Description(\"The name of the Apache Iceberg catalog\")\n String getCatalogName();\n\n void setCatalogName(String value);\n }\n\n // Write JSON data to Apache Iceberg, using dynamic destinations to determine the Iceberg table\n // where Dataflow writes each record. The JSON data contains a field named \"airport\". The\n // Dataflow pipeline writes to Iceberg tables with the naming pattern \"flights-{airport}\".\n public static void main(String[] args) {\n // Parse the pipeline options passed into the application. Example:\n // --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \\\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);\n Pipeline pipeline = Pipeline.create(options);\n\n // Configure the Iceberg source I/O\n Map catalogConfig = ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"warehouse\", options.getWarehouseLocation())\n .put(\"type\", \"hadoop\")\n .build();\n\n ImmutableMap\u003cString, Object\u003e config = ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"catalog_name\", options.getCatalogName())\n .put(\"catalog_properties\", catalogConfig)\n // Route the incoming records based on the value of the \"airport\" field.\n .put(\"table\", \"flights-{airport}\")\n // Specify which fields to keep from the input data.\n .put(\"keep\", Arrays.asList(\"name\", \"id\"))\n .build();\n\n // Build the pipeline.\n pipeline\n // Read in-memory JSON data.\n .apply(Create.of(TABLE_ROWS))\n // Convert the JSON records to Row objects.\n .apply(JsonToRow.withSchema(SCHEMA))\n // Write each Row to Apache Iceberg.\n .apply(Managed.write(Managed.ICEBERG).withConfig(config));\n\n // Run the pipeline.\n pipeline.run().waitUntilFinish();\n }\n }\n\n\u003cbr /\u003e\n\nWhat's next\n-----------\n\n- [Read from Apache Iceberg](/dataflow/docs/guides/read-from-iceberg).\n- Learn more about [Managed I/O](/dataflow/docs/guides/managed-io)."]]