Trasmetti in streaming a BigQuery con l'elaborazione exactly-once
Mantieni tutto organizzato con le raccolte
Salva e classifica i contenuti in base alle tue preferenze.
Utilizzare l'API Storage Write per eseguire lo streaming da Dataflow a BigQuery con l'elaborazione exactly-once
Per saperne di più
Per la documentazione dettagliata che include questo esempio di codice, vedi quanto segue:
Esempio di codice
Salvo quando diversamente specificato, i contenuti di questa pagina sono concessi in base alla licenza Creative Commons Attribution 4.0, mentre gli esempi di codice sono concessi in base alla licenza Apache 2.0. Per ulteriori dettagli, consulta le norme del sito di Google Developers. Java è un marchio registrato di Oracle e/o delle sue consociate.
[[["Facile da capire","easyToUnderstand","thumb-up"],["Il problema è stato risolto","solvedMyProblem","thumb-up"],["Altra","otherUp","thumb-up"]],[["Difficile da capire","hardToUnderstand","thumb-down"],["Informazioni o codice di esempio errati","incorrectInformationOrSampleCode","thumb-down"],["Mancano le informazioni o gli esempi di cui ho bisogno","missingTheInformationSamplesINeed","thumb-down"],["Problema di traduzione","translationIssue","thumb-down"],["Altra","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis content demonstrates how to use the Storage Write API to stream data from Dataflow to BigQuery with exactly-once processing.\u003c/p\u003e\n"],["\u003cp\u003eThe provided Java code sample simulates streaming data, maps it into BigQuery TableRow objects, and writes these rows to a specified BigQuery table.\u003c/p\u003e\n"],["\u003cp\u003eThe code utilizes \u003ccode\u003eBigQueryIO.writeTableRows()\u003c/code\u003e with the \u003ccode\u003eSTORAGE_WRITE_API\u003c/code\u003e method and sets a triggering frequency to ensure exactly-once processing.\u003c/p\u003e\n"],["\u003cp\u003eThe code shows how to handle potential errors during the BigQuery write operation by capturing failed inserts, showcasing how to log or direct them to another queue.\u003c/p\u003e\n"],["\u003cp\u003eThe code sample requires setting up Application Default Credentials for authentication with Dataflow, as well as defining project ID, dataset name, and table name as pipeline options.\u003c/p\u003e\n"]]],[],null,["# Stream to BigQuery with exactly-once processing\n\nUse the Storage Write API to stream from Dataflow to BigQuery with exactly-once processing\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Write from Dataflow to BigQuery](/dataflow/docs/guides/write-to-bigquery)\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.api.services.bigquery.model.TableRow;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.PipelineResult;\n import org.apache.beam.sdk.coders.StringUtf8Coder;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.testing.TestStream;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TimestampedValue;\n import org.apache.beam.sdk.values.TypeDescriptor;\n import org.apache.beam.sdk.values.TypeDescriptors;\n import org.joda.time.Duration;\n import org.joda.time.Instant;\n\n public class BigQueryStreamExactlyOnce {\n // Create a PTransform that sends simulated streaming data. In a real application, the data\n // source would be an external source, such as Pub/Sub.\n private static TestStream\u003cString\u003e createEventSource() {\n Instant startTime = new Instant(0);\n return TestStream.create(StringUtf8Coder.of())\n .advanceWatermarkTo(startTime)\n .addElements(\n TimestampedValue.of(\"Alice,20\", startTime),\n TimestampedValue.of(\"Bob,30\",\n startTime.plus(Duration.standardSeconds(1))),\n TimestampedValue.of(\"Charles,40\",\n startTime.plus(Duration.standardSeconds(2))),\n TimestampedValue.of(\"Dylan,Invalid value\",\n startTime.plus(Duration.standardSeconds(2))))\n .advanceWatermarkToInfinity();\n }\n\n public static PipelineResult main(String[] args) {\n // Parse the pipeline options passed into the application. Example:\n // --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n PipelineOptionsFactory.register(ExamplePipelineOptions.class);\n ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)\n .withValidation()\n .as(ExamplePipelineOptions.class);\n options.setStreaming(true);\n\n // Create a pipeline and apply transforms.\n Pipeline pipeline = Pipeline.create(options);\n pipeline\n // Add a streaming data source.\n .apply(createEventSource())\n // Map the event data into TableRow objects.\n .apply(MapElements\n .into(TypeDescriptor.of(TableRow.class))\n .via((String x) -\u003e {\n String[] columns = x.split(\",\");\n return new TableRow().set(\"user_name\", columns[0]).set(\"age\", columns[1]);\n }))\n // Write the rows to BigQuery\n .apply(BigQueryIO.writeTableRows()\n .to(String.format(\"%s:%s.%s\",\n options.getProjectId(),\n options.getDatasetName(),\n options.getTableName()))\n .withCreateDisposition(CreateDisposition.CREATE_NEVER)\n .withWriteDisposition(WriteDisposition.WRITE_APPEND)\n .withMethod(Write.Method.STORAGE_WRITE_API)\n // For exactly-once processing, set the triggering frequency.\n .withTriggeringFrequency(Duration.standardSeconds(5)))\n // Get the collection of write errors.\n .getFailedStorageApiInserts()\n .apply(MapElements.into(TypeDescriptors.strings())\n // Process each error. In production systems, it's useful to write the errors to\n // another destination, such as a dead-letter table or queue.\n .via(\n x -\u003e {\n System.out.println(\"Failed insert: \" + x.getErrorMessage());\n System.out.println(\"Row: \" + x.getRow());\n return \"\";\n }));\n return pipeline.run();\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]