BigQuery Storage Write API のベスト プラクティス

このドキュメントでは、BigQuery Storage Write API の使用に関するベスト プラクティスについて説明します。このドキュメントを読む前に、BigQuery Storage Write API の概要をご覧ください。

ストリーム作成のレートを制限する

ストリームを作成する前に、デフォルト ストリームを使用できるかどうかを検討してください。ストリーミング シナリオでは、デフォルト ストリームのほうが割り当て制限が少なく、アプリケーションによって作成されたストリームを使用する場合よりもスケーリングがしやすくなります。アプリケーションによって作成されたストリームを使用する場合は、追加のストリームを作成する前に、各ストリームで最大スループットを活用するようにしてください。たとえば、非同期書き込みを使用します。

アプリケーションによって作成されたストリームの場合は、CreateWriteStream を頻繁に呼び出さないようにします。一般に、1 秒あたりの呼び出し数が 40~50 を超えると、API 呼び出しのレイテンシが大幅に増加します(25 秒より大きくなります)。アプリケーションでコールド スタートを許容して、ストリームの数を段階的に増やし、CreateWriteStream 呼び出しのレートを制限できるようにしてください。DeadlineExceeded エラーで失敗しないように、呼び出しが完了するまでの待機期間を長く設定することもできます。また、CreateWriteStream 呼び出しの最大レートには、より長期の割り当てがあります。ストリームの作成はリソースを大量に消費するプロセスです。ストリームの作成レートを小さくし、既存のストリームを完全に活用するのが、この制限を超えないようにする最善の方法です。

接続プールの管理

AppendRows メソッドは、ストリームへの双方向接続を作成します。デフォルト ストリームでは複数の接続を開くことができますが、アプリケーションが作成したストリームではアクティブな接続は 1 つだけです。

デフォルト ストリームを使用する場合は、Storage Write API の多重化を使用して、共有接続で複数の宛先テーブルに書き込むことができます。プール接続を多重化すると、スループットとリソースの使用率が改善されます。ワークフローの同時接続数が 20 を超える場合は、多重化の使用をおすすめします。多重化は Java と Go で使用できます。Java での実装の詳細については、多重化の使用をご覧ください。Go での実装の詳細については、接続共有(多重化)をご覧ください。at-least-once セマンティクスの Beam コネクタを使用する場合は、UseStorageApiConnectionPool で多重化を有効にできます。Dataproc Spark コネクタでは、デフォルトで多重化が有効になっています。

最高のパフォーマンスを得るには、できるだけ多くのデータ書き込みに 1 つの接続を使用します。1 回の書き込みに 1 つの接続を使用しないでください。また、多数の小規模な書き込みでストリームの開始と終了が発生しないようにしてください。

プロジェクトごとの同時接続数には割り当てがあります。上限を超えると、AppendRows の呼び出しは失敗します。ただし、同時接続の割り当ては増やすことができるため、通常はスケーリングの制限要因になるわけではありません。

AppendRows を呼び出すたびに、新しいデータ ライター オブジェクトが作成されます。そのため、アプリケーションによって作成されたストリームの場合、接続数は作成されたストリームの数に対応します。通常、1 つの接続で少なくとも 1 MBps のスループットがサポートされます。上限は、ネットワーク帯域幅、データのスキーマ、サーバーの負荷などの要因によって異なりますが、10 MBps を超える場合もあります。

プロジェクトごとの合計スループットにも割り当てがあります。これは、Storage Write API サービスを通過するすべての接続の 1 秒あたりのバイト数を表します。プロジェクトでこの割り当てを超えた場合は、割り当て上限の引き上げをリクエストできます。その場合、同時接続の割り当てなど、付随する割り当ても同じ比率で増やす必要があります。

ストリーム オフセットを管理して 1 回限りのセマンティクスを実現する

Storage Write API は、現在のストリームの最後への書き込みのみを許可しますが、これはデータが追加されると移動します。ストリーム内の現在の位置は、ストリームの開始からのオフセットとして指定されます。

アプリケーションによって作成されたストリームに書き込むときに、ストリーム オフセットを指定すると、1 回限りの書き込みセマンティクスを実現できます。

オフセットを指定すると、書き込みオペレーションはべき等になります。これにより、ネットワーク エラーやサーバーからのレスポンスがないときでも、安全に再試行できるようになります。オフセットに関連する次のエラーを処理します。

  • ALREADY_EXISTSStorageErrorCode.OFFSET_ALREADY_EXISTS): 行がすでに書き込まれています。このエラーは無視してかまいません。
  • OUT_OF_RANGEStorageErrorCode.OFFSET_OUT_OF_RANGE): 前の書き込みオペレーションが失敗しました。最後に成功した書き込みから再試行します。

なお、これらのエラーは間違ったオフセット値を設定した場合にも発生する可能性があるため、オフセットを慎重に管理する必要があります。

ストリーム オフセットを使用する前に、1 回限りのセマンティクスが必要かどうかを検討してください。たとえば、アップストリームのデータ パイプラインが 1 回以上の書き込みのみを保証している場合や、データの取り込み後に重複を簡単に検出できる場合は、1 回限りの書き込みが必要でない可能性があります。その場合は、行オフセットを追跡する必要がないデフォルト ストリームの使用をおすすめします。

AppendRows の呼び出しをブロックしない

AppendRows メソッドは非同期です。書き込みのそれぞれに対するレスポンスをブロックすることなく、一連の書き込みを個別に送信できます。双方向接続のレスポンス メッセージは、リクエストがキューに追加された順序で到着します。最大のスループットを得るには、レスポンスをブロックせずに AppendRows を呼び出します。

スキーマの更新を処理する

データ ストリーミングのシナリオでは、テーブル スキーマは通常、ストリーミング パイプラインの外部で管理されます。null を許容する新しいフィールドの追加など、スキーマは時間の経過とともに進化するのが一般的です。堅牢なパイプラインにするには、帯域外のスキーマ更新にも対応する必要があります。

Storage Write API は、次のようなテーブル スキーマをサポートしています。

  • 最初の書き込みリクエストにはスキーマが含まれます。
  • データの各行は、バイナリ プロトコル バッファとして送信します。BigQuery はデータをスキーマにマッピングします。
  • null を許容するフィールドは省略できますが、現在のスキーマに存在しないフィールドを含めることはできません。追加フィールドを含む行を送信すると、Storage Write API は StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELD を含む StorageError を返します。

ペイロードで新しいフィールドを送信する場合は、まず BigQuery のテーブル スキーマを更新する必要があります。Storage Write API は、短期間(数分程度)でスキーマ変更を検出します。Storage Write API がスキーマの変更を検出すると、AppendRowsResponse レスポンス メッセージには、新しいスキーマを記述する TableSchema オブジェクトが含まれます。

更新されたスキーマを使用してデータを送信するには、既存の接続を閉じて、新しいスキーマで新しい接続を開く必要があります。

Java クライアント。Java クライアント ライブラリは、JsonStreamWriter クラスによりスキーマ更新のための追加機能を提供します。スキーマの更新後、JsonStreamWriter は更新されたスキーマに自動的に再接続します。接続を明示的に閉じて再度開く必要はありません。スキーマの変更をプログラムで確認するには、append メソッドの完了後に AppendRowsResponse.hasUpdatedSchema を呼び出します。

また、入力データ内の不明なフィールドを無視するように JsonStreamWriter を構成することもできます。この動作を設定するには、setIgnoreUnknownFields を呼び出します。この動作は、従来の tabledata.insertAll API を使用する場合の ignoreUnknownValues オプションに似ています。ただし、不明なフィールドが通知なく破棄されるため、意図しないデータ損失につながる可能性があります。