Traiter des flux vers BigQuery en utilisant le traitement "exactement une fois"
Restez organisé à l'aide des collections
Enregistrez et classez les contenus selon vos préférences.
Utiliser l'API Storage Write pour traiter des flux de Dataflow vers BigQuery avec le traitement "exactement une fois"
En savoir plus
Pour obtenir une documentation détaillée incluant cet exemple de code, consultez les articles suivants :
Exemple de code
Sauf indication contraire, le contenu de cette page est régi par une licence Creative Commons Attribution 4.0, et les échantillons de code sont régis par une licence Apache 2.0. Pour en savoir plus, consultez les Règles du site Google Developers. Java est une marque déposée d'Oracle et/ou de ses sociétés affiliées.
[[["Facile à comprendre","easyToUnderstand","thumb-up"],["J'ai pu résoudre mon problème","solvedMyProblem","thumb-up"],["Autre","otherUp","thumb-up"]],[["Difficile à comprendre","hardToUnderstand","thumb-down"],["Informations ou exemple de code incorrects","incorrectInformationOrSampleCode","thumb-down"],["Il n'y a pas l'information/les exemples dont j'ai besoin","missingTheInformationSamplesINeed","thumb-down"],["Problème de traduction","translationIssue","thumb-down"],["Autre","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis content demonstrates how to use the Storage Write API to stream data from Dataflow to BigQuery with exactly-once processing.\u003c/p\u003e\n"],["\u003cp\u003eThe provided Java code sample simulates streaming data, maps it into BigQuery TableRow objects, and writes these rows to a specified BigQuery table.\u003c/p\u003e\n"],["\u003cp\u003eThe code utilizes \u003ccode\u003eBigQueryIO.writeTableRows()\u003c/code\u003e with the \u003ccode\u003eSTORAGE_WRITE_API\u003c/code\u003e method and sets a triggering frequency to ensure exactly-once processing.\u003c/p\u003e\n"],["\u003cp\u003eThe code shows how to handle potential errors during the BigQuery write operation by capturing failed inserts, showcasing how to log or direct them to another queue.\u003c/p\u003e\n"],["\u003cp\u003eThe code sample requires setting up Application Default Credentials for authentication with Dataflow, as well as defining project ID, dataset name, and table name as pipeline options.\u003c/p\u003e\n"]]],[],null,["# Stream to BigQuery with exactly-once processing\n\nUse the Storage Write API to stream from Dataflow to BigQuery with exactly-once processing\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Write from Dataflow to BigQuery](/dataflow/docs/guides/write-to-bigquery)\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.api.services.bigquery.model.TableRow;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.PipelineResult;\n import org.apache.beam.sdk.coders.StringUtf8Coder;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.testing.TestStream;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TimestampedValue;\n import org.apache.beam.sdk.values.TypeDescriptor;\n import org.apache.beam.sdk.values.TypeDescriptors;\n import org.joda.time.Duration;\n import org.joda.time.Instant;\n\n public class BigQueryStreamExactlyOnce {\n // Create a PTransform that sends simulated streaming data. In a real application, the data\n // source would be an external source, such as Pub/Sub.\n private static TestStream\u003cString\u003e createEventSource() {\n Instant startTime = new Instant(0);\n return TestStream.create(StringUtf8Coder.of())\n .advanceWatermarkTo(startTime)\n .addElements(\n TimestampedValue.of(\"Alice,20\", startTime),\n TimestampedValue.of(\"Bob,30\",\n startTime.plus(Duration.standardSeconds(1))),\n TimestampedValue.of(\"Charles,40\",\n startTime.plus(Duration.standardSeconds(2))),\n TimestampedValue.of(\"Dylan,Invalid value\",\n startTime.plus(Duration.standardSeconds(2))))\n .advanceWatermarkToInfinity();\n }\n\n public static PipelineResult main(String[] args) {\n // Parse the pipeline options passed into the application. Example:\n // --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n PipelineOptionsFactory.register(ExamplePipelineOptions.class);\n ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)\n .withValidation()\n .as(ExamplePipelineOptions.class);\n options.setStreaming(true);\n\n // Create a pipeline and apply transforms.\n Pipeline pipeline = Pipeline.create(options);\n pipeline\n // Add a streaming data source.\n .apply(createEventSource())\n // Map the event data into TableRow objects.\n .apply(MapElements\n .into(TypeDescriptor.of(TableRow.class))\n .via((String x) -\u003e {\n String[] columns = x.split(\",\");\n return new TableRow().set(\"user_name\", columns[0]).set(\"age\", columns[1]);\n }))\n // Write the rows to BigQuery\n .apply(BigQueryIO.writeTableRows()\n .to(String.format(\"%s:%s.%s\",\n options.getProjectId(),\n options.getDatasetName(),\n options.getTableName()))\n .withCreateDisposition(CreateDisposition.CREATE_NEVER)\n .withWriteDisposition(WriteDisposition.WRITE_APPEND)\n .withMethod(Write.Method.STORAGE_WRITE_API)\n // For exactly-once processing, set the triggering frequency.\n .withTriggeringFrequency(Duration.standardSeconds(5)))\n // Get the collection of write errors.\n .getFailedStorageApiInserts()\n .apply(MapElements.into(TypeDescriptors.strings())\n // Process each error. In production systems, it's useful to write the errors to\n // another destination, such as a dead-letter table or queue.\n .via(\n x -\u003e {\n System.out.println(\"Failed insert: \" + x.getErrorMessage());\n System.out.println(\"Row: \" + x.getRow());\n return \"\";\n }));\n return pipeline.run();\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]