Streaming ke BigQuery dengan pemrosesan tepat satu kali
Tetap teratur dengan koleksi
Simpan dan kategorikan konten berdasarkan preferensi Anda.
Menggunakan Storage Write API untuk melakukan streaming dari Dataflow ke BigQuery dengan pemrosesan tepat sekali
Mempelajari lebih lanjut
Untuk dokumentasi mendetail yang menyertakan contoh kode ini, lihat artikel berikut:
Contoh kode
Kecuali dinyatakan lain, konten di halaman ini dilisensikan berdasarkan Lisensi Creative Commons Attribution 4.0, sedangkan contoh kode dilisensikan berdasarkan Lisensi Apache 2.0. Untuk mengetahui informasi selengkapnya, lihat Kebijakan Situs Google Developers. Java adalah merek dagang terdaftar dari Oracle dan/atau afiliasinya.
[[["Mudah dipahami","easyToUnderstand","thumb-up"],["Memecahkan masalah saya","solvedMyProblem","thumb-up"],["Lainnya","otherUp","thumb-up"]],[["Sulit dipahami","hardToUnderstand","thumb-down"],["Informasi atau kode contoh salah","incorrectInformationOrSampleCode","thumb-down"],["Informasi/contoh yang saya butuhkan tidak ada","missingTheInformationSamplesINeed","thumb-down"],["Masalah terjemahan","translationIssue","thumb-down"],["Lainnya","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis content demonstrates how to use the Storage Write API to stream data from Dataflow to BigQuery with exactly-once processing.\u003c/p\u003e\n"],["\u003cp\u003eThe provided Java code sample simulates streaming data, maps it into BigQuery TableRow objects, and writes these rows to a specified BigQuery table.\u003c/p\u003e\n"],["\u003cp\u003eThe code utilizes \u003ccode\u003eBigQueryIO.writeTableRows()\u003c/code\u003e with the \u003ccode\u003eSTORAGE_WRITE_API\u003c/code\u003e method and sets a triggering frequency to ensure exactly-once processing.\u003c/p\u003e\n"],["\u003cp\u003eThe code shows how to handle potential errors during the BigQuery write operation by capturing failed inserts, showcasing how to log or direct them to another queue.\u003c/p\u003e\n"],["\u003cp\u003eThe code sample requires setting up Application Default Credentials for authentication with Dataflow, as well as defining project ID, dataset name, and table name as pipeline options.\u003c/p\u003e\n"]]],[],null,["# Stream to BigQuery with exactly-once processing\n\nUse the Storage Write API to stream from Dataflow to BigQuery with exactly-once processing\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Write from Dataflow to BigQuery](/dataflow/docs/guides/write-to-bigquery)\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.api.services.bigquery.model.TableRow;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.PipelineResult;\n import org.apache.beam.sdk.coders.StringUtf8Coder;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.testing.TestStream;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TimestampedValue;\n import org.apache.beam.sdk.values.TypeDescriptor;\n import org.apache.beam.sdk.values.TypeDescriptors;\n import org.joda.time.Duration;\n import org.joda.time.Instant;\n\n public class BigQueryStreamExactlyOnce {\n // Create a PTransform that sends simulated streaming data. In a real application, the data\n // source would be an external source, such as Pub/Sub.\n private static TestStream\u003cString\u003e createEventSource() {\n Instant startTime = new Instant(0);\n return TestStream.create(StringUtf8Coder.of())\n .advanceWatermarkTo(startTime)\n .addElements(\n TimestampedValue.of(\"Alice,20\", startTime),\n TimestampedValue.of(\"Bob,30\",\n startTime.plus(Duration.standardSeconds(1))),\n TimestampedValue.of(\"Charles,40\",\n startTime.plus(Duration.standardSeconds(2))),\n TimestampedValue.of(\"Dylan,Invalid value\",\n startTime.plus(Duration.standardSeconds(2))))\n .advanceWatermarkToInfinity();\n }\n\n public static PipelineResult main(String[] args) {\n // Parse the pipeline options passed into the application. Example:\n // --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n PipelineOptionsFactory.register(ExamplePipelineOptions.class);\n ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)\n .withValidation()\n .as(ExamplePipelineOptions.class);\n options.setStreaming(true);\n\n // Create a pipeline and apply transforms.\n Pipeline pipeline = Pipeline.create(options);\n pipeline\n // Add a streaming data source.\n .apply(createEventSource())\n // Map the event data into TableRow objects.\n .apply(MapElements\n .into(TypeDescriptor.of(TableRow.class))\n .via((String x) -\u003e {\n String[] columns = x.split(\",\");\n return new TableRow().set(\"user_name\", columns[0]).set(\"age\", columns[1]);\n }))\n // Write the rows to BigQuery\n .apply(BigQueryIO.writeTableRows()\n .to(String.format(\"%s:%s.%s\",\n options.getProjectId(),\n options.getDatasetName(),\n options.getTableName()))\n .withCreateDisposition(CreateDisposition.CREATE_NEVER)\n .withWriteDisposition(WriteDisposition.WRITE_APPEND)\n .withMethod(Write.Method.STORAGE_WRITE_API)\n // For exactly-once processing, set the triggering frequency.\n .withTriggeringFrequency(Duration.standardSeconds(5)))\n // Get the collection of write errors.\n .getFailedStorageApiInserts()\n .apply(MapElements.into(TypeDescriptors.strings())\n // Process each error. In production systems, it's useful to write the errors to\n // another destination, such as a dead-letter table or queue.\n .via(\n x -\u003e {\n System.out.println(\"Failed insert: \" + x.getErrorMessage());\n System.out.println(\"Row: \" + x.getRow());\n return \"\";\n }));\n return pipeline.run();\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]