Storage Write API を使用したデータのストリーミング
このドキュメントでは、BigQuery Storage Write API を使用して BigQuery にデータをストリーミングする方法について説明します。
ストリーミングのシナリオでは、データは継続的に到着し、最小限のレイテンシで読み取りに使用できる必要があります。ストリーミング ワークロードに BigQuery Storage Write API を使用する場合は、どのような保証が必要かを検討してください。
- アプリケーションで at-least-once セマンティクスのみが必要な場合は、デフォルト ストリームを使用します。
- exactly-once セマンティクスが必要な場合は、コミットタイプで 1 つ以上のストリームを作成し、ストリーム オフセットを使用して exactly-once の書き込みを保証します。
コミットタイプでは、サーバーが書き込みリクエストを確認するとすぐに、ストリームに書き込まれたデータがクエリに使用できるようになります。デフォルト ストリームでもコミットタイプが使用されますが、exactly-once は保証されません。
at-least-once セマンティクスにデフォルト ストリームを使用する
アプリケーションが宛先テーブルに重複するレコードが表示される可能性を受け入れることができる場合は、ストリーミング シナリオにデフォルト ストリームを使用することをおすすめします。
次のコードでは、デフォルト ストリームにデータを書き込む方法を示します。
Java
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。
Node.js
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。
多重化を使用する
デフォルト ストリームに対してのみ、ストリーム ライター レベルで多重化を有効にします。Java で多重化を有効にするには、StreamWriter
オブジェクトまたは JsonStreamWriter
オブジェクトを作成するときに setEnableConnectionPool
メソッドを呼び出します。
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .build();
Go で多重化を有効にするには、接続共有(多重化)をご覧ください。
exactly-once セマンティクスにコミットタイプを使用する
exactly-once セマンティクスが必要な場合は、コミットタイプで書き込みストリームを作成します。コミットタイプでは、クライアントがバックエンドから確認応答を受け取るとすぐにレコードをクエリで使用できます。
コミットタイプでは、レコード オフセットを使用して、ストリーム内で exactly-once 配信を行います。レコード オフセットを使用して、アプリケーションは AppendRows
の呼び出しごとに次の追加オフセットを指定します。オフセット値が次に追加されたオフセットに一致する場合にのみ、書き込みオペレーションが実行されます。詳細については、ストリーム オフセットを管理して exactly-once セマンティクスを実現するをご覧ください。
オフセットを指定しない場合、レコードはストリームの現在の末尾に追加されます。この場合、追記リクエストでエラーが発生したときに再試行すると、ストリーム内でレコードが複数回出現する可能性があります。
コミットタイプを使用するには、次の手順を行います。
Java
CreateWriteStream
を呼び出して、コミットタイプで 1 つ以上のストリームを作成します。- ストリームごとに、
AppendRows
をループで呼び出して、レコードのバッチを書き込みます。 - ストリームごとに
FinalizeWriteStream
を呼び出して、ストリームを解放します。このメソッドを呼び出した後は、ストリームにこれ以上行を書き込むことはできません。コミットタイプではこの手順を省略できますが、アクティブなストリームの上限を超えないようにする効果があります。詳細については、ストリーム作成のレートを制限するをご覧ください。
Node.js
createWriteStreamFullResponse
を呼び出して、コミットタイプで 1 つ以上のストリームを作成します。- ストリームごとに、
appendRows
をループで呼び出して、レコードのバッチを書き込みます。 - ストリームごとに
finalize
を呼び出して、ストリームを解放します。このメソッドを呼び出した後は、ストリームにこれ以上行を書き込むことはできません。コミットタイプではこの手順を省略できますが、アクティブなストリームの上限を超えないようにする効果があります。詳細については、ストリーム作成のレートを制限するをご覧ください。
ストリームを明示的に削除することはできません。ストリームは、システムが定義した有効期間(TTL)に従います。
- ストリームにトラフィックがない場合、コミットされたストリームの TTL は 3 日間です。
- ストリームにトラフィックがない場合、バッファリングされたストリームの TTL は、デフォルトで 7 日間です。
次のコードでは、コミットタイプを使用する方法を示します。
Java
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。
Node.js
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証を設定するをご覧ください。