1 回限りの処理で BigQuery にストリーミングする
コレクションでコンテンツを整理
必要に応じて、コンテンツの保存と分類を行います。
Storage Write API を使用して 1 回限りの処理で Dataflow から BigQuery にストリーミングする
さらに詳しい情報
このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。
コードサンプル
Java
Dataflow で認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
特に記載のない限り、このページのコンテンツはクリエイティブ・コモンズの表示 4.0 ライセンスにより使用許諾されます。コードサンプルは Apache 2.0 ライセンスにより使用許諾されます。詳しくは、Google Developers サイトのポリシーをご覧ください。Java は Oracle および関連会社の登録商標です。
[[["わかりやすい","easyToUnderstand","thumb-up"],["問題の解決に役立った","solvedMyProblem","thumb-up"],["その他","otherUp","thumb-up"]],[["わかりにくい","hardToUnderstand","thumb-down"],["情報またはサンプルコードが不正確","incorrectInformationOrSampleCode","thumb-down"],["必要な情報 / サンプルがない","missingTheInformationSamplesINeed","thumb-down"],["翻訳に関する問題","translationIssue","thumb-down"],["その他","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis content demonstrates how to use the Storage Write API to stream data from Dataflow to BigQuery with exactly-once processing.\u003c/p\u003e\n"],["\u003cp\u003eThe provided Java code sample simulates streaming data, maps it into BigQuery TableRow objects, and writes these rows to a specified BigQuery table.\u003c/p\u003e\n"],["\u003cp\u003eThe code utilizes \u003ccode\u003eBigQueryIO.writeTableRows()\u003c/code\u003e with the \u003ccode\u003eSTORAGE_WRITE_API\u003c/code\u003e method and sets a triggering frequency to ensure exactly-once processing.\u003c/p\u003e\n"],["\u003cp\u003eThe code shows how to handle potential errors during the BigQuery write operation by capturing failed inserts, showcasing how to log or direct them to another queue.\u003c/p\u003e\n"],["\u003cp\u003eThe code sample requires setting up Application Default Credentials for authentication with Dataflow, as well as defining project ID, dataset name, and table name as pipeline options.\u003c/p\u003e\n"]]],[],null,["# Stream to BigQuery with exactly-once processing\n\nUse the Storage Write API to stream from Dataflow to BigQuery with exactly-once processing\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Write from Dataflow to BigQuery](/dataflow/docs/guides/write-to-bigquery)\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.api.services.bigquery.model.TableRow;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.PipelineResult;\n import org.apache.beam.sdk.coders.StringUtf8Coder;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;\n import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.testing.TestStream;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TimestampedValue;\n import org.apache.beam.sdk.values.TypeDescriptor;\n import org.apache.beam.sdk.values.TypeDescriptors;\n import org.joda.time.Duration;\n import org.joda.time.Instant;\n\n public class BigQueryStreamExactlyOnce {\n // Create a PTransform that sends simulated streaming data. In a real application, the data\n // source would be an external source, such as Pub/Sub.\n private static TestStream\u003cString\u003e createEventSource() {\n Instant startTime = new Instant(0);\n return TestStream.create(StringUtf8Coder.of())\n .advanceWatermarkTo(startTime)\n .addElements(\n TimestampedValue.of(\"Alice,20\", startTime),\n TimestampedValue.of(\"Bob,30\",\n startTime.plus(Duration.standardSeconds(1))),\n TimestampedValue.of(\"Charles,40\",\n startTime.plus(Duration.standardSeconds(2))),\n TimestampedValue.of(\"Dylan,Invalid value\",\n startTime.plus(Duration.standardSeconds(2))))\n .advanceWatermarkToInfinity();\n }\n\n public static PipelineResult main(String[] args) {\n // Parse the pipeline options passed into the application. Example:\n // --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME\n // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n PipelineOptionsFactory.register(ExamplePipelineOptions.class);\n ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)\n .withValidation()\n .as(ExamplePipelineOptions.class);\n options.setStreaming(true);\n\n // Create a pipeline and apply transforms.\n Pipeline pipeline = Pipeline.create(options);\n pipeline\n // Add a streaming data source.\n .apply(createEventSource())\n // Map the event data into TableRow objects.\n .apply(MapElements\n .into(TypeDescriptor.of(TableRow.class))\n .via((String x) -\u003e {\n String[] columns = x.split(\",\");\n return new TableRow().set(\"user_name\", columns[0]).set(\"age\", columns[1]);\n }))\n // Write the rows to BigQuery\n .apply(BigQueryIO.writeTableRows()\n .to(String.format(\"%s:%s.%s\",\n options.getProjectId(),\n options.getDatasetName(),\n options.getTableName()))\n .withCreateDisposition(CreateDisposition.CREATE_NEVER)\n .withWriteDisposition(WriteDisposition.WRITE_APPEND)\n .withMethod(Write.Method.STORAGE_WRITE_API)\n // For exactly-once processing, set the triggering frequency.\n .withTriggeringFrequency(Duration.standardSeconds(5)))\n // Get the collection of write errors.\n .getFailedStorageApiInserts()\n .apply(MapElements.into(TypeDescriptors.strings())\n // Process each error. In production systems, it's useful to write the errors to\n // another destination, such as a dead-letter table or queue.\n .via(\n x -\u003e {\n System.out.println(\"Failed insert: \" + x.getErrorMessage());\n System.out.println(\"Row: \" + x.getRow());\n return \"\";\n }));\n return pipeline.run();\n }\n }\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]