Mit genau einmaliger Verarbeitung an BigQuery streamen
Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Storage Write API zum Streamen von Dataflow zu BigQuery mit genau einmaliger Verarbeitung verwenden
Weitere Informationen
Eine ausführliche Dokumentation, die dieses Codebeispiel enthält, finden Sie hier:
Codebeispiel
Nächste Schritte
Wenn Sie nach Codebeispielen für andere Google Cloud -Produkte suchen und filtern möchten, können Sie den Google Cloud -Beispielbrowser verwenden.
Sofern nicht anders angegeben, sind die Inhalte dieser Seite unter der Creative Commons Attribution 4.0 License und Codebeispiele unter der Apache 2.0 License lizenziert. Weitere Informationen finden Sie in den Websiterichtlinien von Google Developers. Java ist eine eingetragene Marke von Oracle und/oder seinen Partnern.
[[["Leicht verständlich","easyToUnderstand","thumb-up"],["Mein Problem wurde gelöst","solvedMyProblem","thumb-up"],["Sonstiges","otherUp","thumb-up"]],[["Schwer verständlich","hardToUnderstand","thumb-down"],["Informationen oder Beispielcode falsch","incorrectInformationOrSampleCode","thumb-down"],["Benötigte Informationen/Beispiele nicht gefunden","missingTheInformationSamplesINeed","thumb-down"],["Problem mit der Übersetzung","translationIssue","thumb-down"],["Sonstiges","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis content demonstrates how to use the Storage Write API to stream data from Dataflow to BigQuery with exactly-once processing.\u003c/p\u003e\n"],["\u003cp\u003eThe provided Java code sample simulates streaming data, maps it into BigQuery TableRow objects, and writes these rows to a specified BigQuery table.\u003c/p\u003e\n"],["\u003cp\u003eThe code utilizes \u003ccode\u003eBigQueryIO.writeTableRows()\u003c/code\u003e with the \u003ccode\u003eSTORAGE_WRITE_API\u003c/code\u003e method and sets a triggering frequency to ensure exactly-once processing.\u003c/p\u003e\n"],["\u003cp\u003eThe code shows how to handle potential errors during the BigQuery write operation by capturing failed inserts, showcasing how to log or direct them to another queue.\u003c/p\u003e\n"],["\u003cp\u003eThe code sample requires setting up Application Default Credentials for authentication with Dataflow, as well as defining project ID, dataset name, and table name as pipeline options.\u003c/p\u003e\n"]]],[],null,["# Stream to BigQuery with exactly-once processing\n\nUse the Storage Write API to stream from Dataflow to BigQuery with exactly-once processing\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Write from Dataflow to BigQuery](/dataflow/docs/guides/write-to-bigquery)\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.api.services.bigquery.model.TableRow;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.PipelineResult;\n import org.apache.beam.sdk.coders.StringUtf8Coder;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.testing.TestStream;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TimestampedValue;\n import org.apache.beam.sdk.values.TypeDescriptor;\n import org.apache.beam.sdk.values.TypeDescriptors;\n import org.joda.time.Duration;\n import org.joda.time.Instant;\n\n public class BigQueryStreamExactlyOnce {\n // Create a PTransform that sends simulated streaming data. In a real application, the data\n // source would be an external source, such as Pub/Sub.\n private static TestStream\u003cString\u003e createEventSource() {\n Instant startTime = new Instant(0);\n return TestStream.create(StringUtf8Coder.of())\n .advanceWatermarkTo(startTime)\n .addElements(\n TimestampedValue.of(\"Alice,20\", startTime),\n TimestampedValue.of(\"Bob,30\",\n startTime.plus(Duration.standardSeconds(1))),\n TimestampedValue.of(\"Charles,40\",\n startTime.plus(Duration.standardSeconds(2))),\n TimestampedValue.of(\"Dylan,Invalid value\",\n startTime.plus(Duration.standardSeconds(2))))\n .advanceWatermarkToInfinity();\n }\n\n public static PipelineResult main(String[] args) {\n // Parse the pipeline options passed into the application. Example:\n // --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n PipelineOptionsFactory.register(ExamplePipelineOptions.class);\n ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)\n .withValidation()\n .as(ExamplePipelineOptions.class);\n options.setStreaming(true);\n\n // Create a pipeline and apply transforms.\n Pipeline pipeline = Pipeline.create(options);\n pipeline\n // Add a streaming data source.\n .apply(createEventSource())\n // Map the event data into TableRow objects.\n .apply(MapElements\n .into(TypeDescriptor.of(TableRow.class))\n .via((String x) -\u003e {\n String[] columns = x.split(\",\");\n return new TableRow().set(\"user_name\", columns[0]).set(\"age\", columns[1]);\n }))\n // Write the rows to BigQuery\n .apply(BigQueryIO.writeTableRows()\n .to(String.format(\"%s:%s.%s\",\n options.getProjectId(),\n options.getDatasetName(),\n options.getTableName()))\n .withCreateDisposition(CreateDisposition.CREATE_NEVER)\n .withWriteDisposition(WriteDisposition.WRITE_APPEND)\n .withMethod(Write.Method.STORAGE_WRITE_API)\n // For exactly-once processing, set the triggering frequency.\n .withTriggeringFrequency(Duration.standardSeconds(5)))\n // Get the collection of write errors.\n .getFailedStorageApiInserts()\n .apply(MapElements.into(TypeDescriptors.strings())\n // Process each error. In production systems, it's useful to write the errors to\n // another destination, such as a dead-letter table or queue.\n .via(\n x -\u003e {\n System.out.println(\"Failed insert: \" + x.getErrorMessage());\n System.out.println(\"Row: \" + x.getRow());\n return \"\";\n }));\n return pipeline.run();\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]