讀取 Avro 格式
透過集合功能整理內容
你可以依據偏好儲存及分類內容。
使用 BigQueryIO 連接器以 Avro 格式讀取資料。
深入探索
如需包含這個程式碼範例的詳細說明文件,請參閱下列內容:
程式碼範例
Java
如要向 Dataflow 進行驗證,請設定應用程式預設憑證。
詳情請參閱「為本機開發環境設定驗證」。
除非另有註明,否則本頁面中的內容是採用創用 CC 姓名標示 4.0 授權,程式碼範例則為阿帕契 2.0 授權。詳情請參閱《Google Developers 網站政策》。Java 是 Oracle 和/或其關聯企業的註冊商標。
[[["容易理解","easyToUnderstand","thumb-up"],["確實解決了我的問題","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["難以理解","hardToUnderstand","thumb-down"],["資訊或程式碼範例有誤","incorrectInformationOrSampleCode","thumb-down"],["缺少我需要的資訊/範例","missingTheInformationSamplesINeed","thumb-down"],["翻譯問題","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis code sample demonstrates how to use the BigQueryIO connector to read data in Avro format from a BigQuery table.\u003c/p\u003e\n"],["\u003cp\u003eThe code defines a custom data type, \u003ccode\u003eMyData\u003c/code\u003e, to hold records read from the table, with fields for \u003ccode\u003ename\u003c/code\u003e and \u003ccode\u003eage\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eA \u003ccode\u003eFromSchemaAndRecord\u003c/code\u003e function is implemented to convert Avro records from BigQuery into \u003ccode\u003eMyData\u003c/code\u003e instances.\u003c/p\u003e\n"],["\u003cp\u003eThe pipeline reads data using \u003ccode\u003eBigQueryIO.read()\u003c/code\u003e and a custom parsing function, then maps it to the \u003ccode\u003eMyData\u003c/code\u003e type for processing.\u003c/p\u003e\n"],["\u003cp\u003eThe sample outputs the data to the system, providing name and age from each record that was read from the BigQuery table.\u003c/p\u003e\n"]]],[],null,["# Read Avro format\n\nUse the BigQueryIO connector to read in Avro format.\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Read from BigQuery to Dataflow](/dataflow/docs/guides/read-from-bigquery)\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import org.apache.avro.generic.GenericRecord;\n import org.apache.avro.util.Utf8;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.coders.DefaultCoder;\n import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;\n import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.transforms.SerializableFunction;\n import org.apache.beam.sdk.values.TypeDescriptor;\n\n public class BigQueryReadAvro {\n\n // A custom datatype to hold a record from the source table.\n @DefaultCoder(AvroCoder.class)\n public static class MyData {\n public String name;\n public Long age;\n\n // Function to convert Avro records to MyData instances.\n public static class FromSchemaAndRecord\n implements SerializableFunction\u003cSchemaAndRecord, MyData\u003e {\n @Override public MyData apply(SchemaAndRecord elem) {\n MyData data = new MyData();\n GenericRecord record = elem.getRecord();\n data.name = ((Utf8) record.get(\"user_name\")).toString();\n data.age = (Long) record.get(\"age\");\n return data;\n }\n }\n }\n\n public static void main(String[] args) {\n // Parse the pipeline options passed into the application. Example:\n // --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n PipelineOptionsFactory.register(ExamplePipelineOptions.class);\n ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)\n .withValidation()\n .as(ExamplePipelineOptions.class);\n\n // Create a pipeline and apply transforms.\n Pipeline pipeline = Pipeline.create(options);\n pipeline\n // Read table data into Avro records, using an application-defined parsing function.\n .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())\n .from(String.format(\"%s:%s.%s\",\n options.getProjectId(),\n options.getDatasetName(),\n options.getTableName()))\n .withMethod(TypedRead.Method.DIRECT_READ))\n // The output from the previous step is a PCollection\u003cMyData\u003e.\n .apply(MapElements\n .into(TypeDescriptor.of(MyData.class))\n .via((MyData x) -\u003e {\n System.out.printf(\"Name: %s, Age: %d%n\", x.name, x.age);\n return x;\n }));\n pipeline.run().waitUntilFinish();\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]