新しい BigQuery Storage Write API を使用して、JSON データを BigQuery にストリーミングする方法を学ぶ
Google Cloud Japan Team
※この投稿は米国時間 2022 年 3 月 9 日に、Google Cloud blog に投稿されたものの抄訳です。
Google BigQuery Write API は、1 つの統合型 API で高性能なバッチ処理とストリーミングを提供しています。このシリーズの前回の投稿では、BigQuery Write API を紹介しました。今回の投稿では、Java クライアント ライブラリを使用して、JSON データを BigQuery にストリーミングする方法をお伝えします。
Write API は、プロトコル バッファ形式のバイナリデータの使用を想定しています。そのため、この API は高スループットのストリーミングに非常に有効です。しかし、プロトコル バッファを扱うのは若干難しい場合もあります。多くのアプリケーションにとって、JSON データはより利便性が高いデータ形式です。Java 用の BigQuery クライアント ライブラリでは、JsonStreamWriter を通して、両方の長所を提供します。JsonStreamWriter は、JSON レコードの形式でデータを受け入れ、JSON オブジェクトをバイナリ プロトコル バッファに自動的に変換してから、ネットワーク経由で送信します。
では、詳細を見てみましょう。
シナリオ
GitHub commit のデータを BigQuery にストリーミングするシナリオについて考えてみましょう。このデータを使用して、commit アクティビティに関する分析情報をリアルタイムで取得できます。
この例では、ローカル ファイルからデータを読み込みます。しかし、このデータをイベント形式やログファイルからのストリーミング形式で受け取るアプリケーションを想像するかもしれません。実際、互換性のある形式にできるのであれば、データの出所は関係ないという点が Write API のメリットの一つです。
ソースファイルの各行は、次のような構造になっています。
宛先テーブルを作成する
まず、ストリーミングされたデータを受け取るためのテーブルを BigQuery に作成する必要があります。BigQuery でテーブルを作成する方法はいくつかありますが、最も簡単な方法の一つは以下のように CREATE TABLE クエリを実行することです。
テーブルへのデータのストリーミング
テーブルを作成したので、そのテーブルにデータを書き込みます。
Write API は、1 回限りの配信を必要とするストリーミング アプリケーションに適したコミットモードや、ストリームレベルのトランザクションを伴うバッチ書き込みに適した保留モードをはじめとする、いくつかのモードをサポートしています。この例では、Write API のデフォルト ストリームを使用します。デフォルト ストリームは、at-least-once セマンティクスを必要としますが、より確実な exactly-one 保証は必要ないストリーミング アプリケーションに適しています。Git commit には一意の commit ID があるので、必要な場合は宛先テーブルで重複を識別できます。
まず、JsonStreamWriter を初期化し、以下のように、宛先テーブルの名前とテーブル スキーマを渡します。
これでデータファイルを読み込んで、Write API にデータを送信する準備が整いました。ベスト プラクティスとして、一度に 1 行ではなく、バッチでデータを送信することを推奨します。次のようにして各 JSON レコードを JSONObject に読み込み、それらのバッチを JSONArray に収集します。
レコードの各バッチを書き込むには、JsonStreamWriter.append メソッドを呼び出します。このメソッドは非同期で、ApiFuture を返します。パフォーマンスを最大限に高めるためには、Future が完了するのを待つ必要があります。代わりに、引き続き append の呼び出しを行い、結果を非同期で処理します。
この例では、完了コールバックを登録します。以下のようにコールバック内で、append が成功したかどうかを確認できます。
テーブル スキーマの更新処理
BigQuery では、既存のテーブル スキーマを、特定の制限された仕様で変更できます。たとえば、commit 作成者のメールアドレスを格納する、メールという名前のフィールドを追加するとします。オリジナルのスキーマにあるデータによって、このフィールドに NULL 値が設定されます。
データをストリーミングしている間にスキーマが変更された場合、Java クライアント ライブラリでは、自動的に更新されたスキーマに再接続されます。この例のシナリオでは、アプリケーションは中間処理をせずに、直接 Write API にデータを渡します。スキーマの変更に下位互換性がある限り、アプリケーションは中断することなくストリーミングを続行できます。テーブル スキーマが更新された後、新しいフィールドのデータの送信を開始できます。
注: スキーマの更新は、クライアント ライブラリにはすぐに表示されませんが、数分程度で検出されます。
スキーマの変更をプログラムで確認するには、append メソッドの完了後に AppendRowsResponse.hasUpdatedSchema を呼び出してください。詳細については、Write API ドキュメントのスキーマの操作をご覧ください。
データをクエリする
データは、BigQuery に取り込まれるとすぐに分析に利用できます。たとえば以下のように、何曜日が最も commit が多いかを確認するクエリを実行できるようになりました。
最も commit が多いのは火曜日だということがわかりました。予想どおりですが、土曜日と日曜日が最も commit が少なくなっています。
結論
この記事では、Java クライアント ライブラリを使用して、JSON データを BigQuery に簡単にストリーミングする方法を学びました。GitHub で完全なソースコードを確認できます。コミットモードと保留モードの使用方法など、Write API の詳細については、BigQuery Storage Write API のドキュメントをご覧ください。
- テクニカル ライター Veronica Wasson