importcom.google.common.collect.ImmutableMap;importjava.util.Map;importorg.apache.beam.sdk.Pipeline;importorg.apache.beam.sdk.io.TextIO;importorg.apache.beam.sdk.managed.Managed;importorg.apache.beam.sdk.options.Description;importorg.apache.beam.sdk.options.PipelineOptions;importorg.apache.beam.sdk.options.PipelineOptionsFactory;importorg.apache.beam.sdk.transforms.MapElements;importorg.apache.beam.sdk.values.PCollectionRowTuple;importorg.apache.beam.sdk.values.TypeDescriptors;publicclassApacheIcebergRead{staticfinalStringCATALOG_TYPE="hadoop";publicinterfaceOptionsextendsPipelineOptions{@Description("The URI of the Apache Iceberg warehouse location")StringgetWarehouseLocation();voidsetWarehouseLocation(Stringvalue);@Description("Path to write the output file")StringgetOutputPath();voidsetOutputPath(Stringvalue);@Description("The name of the Apache Iceberg catalog")StringgetCatalogName();voidsetCatalogName(Stringvalue);@Description("The name of the table to write to")StringgetTableName();voidsetTableName(Stringvalue);}publicstaticvoidmain(String[]args){// Parse the pipeline options passed into the application. Example:// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \// --tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-optionsOptionsoptions=PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);Pipelinepipeline=Pipeline.create(options);// Configure the Iceberg source I/OMapcatalogConfig=ImmutableMap.<String,Object>builder().put("warehouse",options.getWarehouseLocation()).put("type",CATALOG_TYPE).build();ImmutableMap<String,Object>config=ImmutableMap.<String,Object>builder().put("table",options.getTableName()).put("catalog_name",options.getCatalogName()).put("catalog_properties",catalogConfig).build();// Build the pipeline.pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection()// Format each record as a string with the format 'id:name'..apply(MapElements.into(TypeDescriptors.strings()).via((row->{returnString.format("%d:%s",row.getInt64("id"),row.getString("name"));})))// Write to a text file..apply(TextIO.write().to(options.getOutputPath()).withNumShards(1).withSuffix(".txt"));pipeline.run().waitUntilFinish();}}
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2025-08-21 UTC."],[[["\u003cp\u003eThe Managed I/O connector facilitates reading from Apache Iceberg to Dataflow, supporting Hadoop, Hive, REST-based catalogs, and BigQuery metastore.\u003c/p\u003e\n"],["\u003cp\u003eManaged I/O for Apache Iceberg enables batch reads, batch writes, streaming writes, dynamic destinations, and dynamic table creation.\u003c/p\u003e\n"],["\u003cp\u003eFor BigQuery tables, the \u003ccode\u003eBigQueryIO\u003c/code\u003e connector is used, requiring pre-existing tables without support for dynamic table creation.\u003c/p\u003e\n"],["\u003cp\u003eReading from Apache Iceberg is achieved by using the \u003ccode\u003eManaged.read(Managed.ICEBERG)\u003c/code\u003e in a pipeline, which can then be transformed and outputted to various destinations, like text files as shown in the provided example.\u003c/p\u003e\n"],["\u003cp\u003eTo use the managed Iceberg I/O connectors, you must include the \u003ccode\u003ebeam-sdks-java-managed\u003c/code\u003e and \u003ccode\u003ebeam-sdks-java-io-iceberg\u003c/code\u003e dependencies.\u003c/p\u003e\n"]]],[],null,[]]