從 Apache Iceberg 讀取資料
透過集合功能整理內容
你可以依據偏好儲存及分類內容。
使用 Dataflow 代管 I/O 轉換功能,從 Apache Iceberg 讀取資料
深入探索
如需包含這個程式碼範例的詳細說明文件,請參閱下列內容:
程式碼範例
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。
詳情請參閱「為本機開發環境設定驗證」。
除非另有註明,否則本頁面中的內容是採用創用 CC 姓名標示 4.0 授權,程式碼範例則為阿帕契 2.0 授權。詳情請參閱《Google Developers 網站政策》。Java 是 Oracle 和/或其關聯企業的註冊商標。
[[["容易理解","easyToUnderstand","thumb-up"],["確實解決了我的問題","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["難以理解","hardToUnderstand","thumb-down"],["資訊或程式碼範例有誤","incorrectInformationOrSampleCode","thumb-down"],["缺少我需要的資訊/範例","missingTheInformationSamplesINeed","thumb-down"],["翻譯問題","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis page details how to use the Dataflow managed I/O transform to read data from Apache Iceberg.\u003c/p\u003e\n"],["\u003cp\u003eThe provided code sample demonstrates reading data from an Apache Iceberg table using Java and Dataflow.\u003c/p\u003e\n"],["\u003cp\u003eThe example requires setting up Application Default Credentials for authentication.\u003c/p\u003e\n"],["\u003cp\u003eThe code sample configures the Iceberg source I/O by specifying the warehouse location, catalog type, catalog name, and table name.\u003c/p\u003e\n"],["\u003cp\u003eThe code uses a Dataflow pipeline to read from Iceberg, format records, and write them to a text file.\u003c/p\u003e\n"]]],[],null,["# Read from Apache Iceberg\n\nUse the Dataflow managed I/O transform to read from Apache Iceberg\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Read from Apache Iceberg to Dataflow](/dataflow/docs/guides/read-from-iceberg)\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.common.collect.ImmutableMap;\n import java.util.Map;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.io.TextIO;\n import org.apache.beam.sdk.managed.Managed;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptions;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.PCollectionRowTuple;\n import org.apache.beam.sdk.values.TypeDescriptors;\n\n public class ApacheIcebergRead {\n\n static final String CATALOG_TYPE = \"hadoop\";\n\n public interface Options extends PipelineOptions {\n @Description(\"The URI of the Apache Iceberg warehouse location\")\n String getWarehouseLocation();\n\n void setWarehouseLocation(String value);\n\n @Description(\"Path to write the output file\")\n String getOutputPath();\n\n void setOutputPath(String value);\n\n @Description(\"The name of the Apache Iceberg catalog\")\n String getCatalogName();\n\n void setCatalogName(String value);\n\n @Description(\"The name of the table to write to\")\n String getTableName();\n\n void setTableName(String value);\n }\n\n public static void main(String[] args) {\n\n // Parse the pipeline options passed into the application. Example:\n // --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \\\n // --tableName= $TABLE_NAME --outputPath=$OUTPUT_FILE\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);\n Pipeline pipeline = Pipeline.create(options);\n\n // Configure the Iceberg source I/O\n Map catalogConfig = ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"warehouse\", options.getWarehouseLocation())\n .put(\"type\", CATALOG_TYPE)\n .build();\n\n ImmutableMap\u003cString, Object\u003e config = ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"table\", options.getTableName())\n .put(\"catalog_name\", options.getCatalogName())\n .put(\"catalog_properties\", catalogConfig)\n .build();\n\n // Build the pipeline.\n pipeline.apply(Managed.read(Managed.ICEBERG).withConfig(config))\n .getSinglePCollection()\n // Format each record as a string with the format 'id:name'.\n .apply(MapElements\n .into(TypeDescriptors.strings())\n .via((row -\u003e {\n return String.format(\"%d:%s\",\n row.getInt64(\"id\"),\n row.getString(\"name\"));\n })))\n // Write to a text file.\n .apply(\n TextIO.write()\n .to(options.getOutputPath())\n .withNumShards(1)\n .withSuffix(\".txt\"));\n\n pipeline.run().waitUntilFinish();\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]