このドキュメントでは、Apache Beam BigQuery I/O コネクタを使用して Dataflow から BigQuery にデータを書き込む方法について説明します。
BigQuery I/O コネクタは Apache Beam SDK で使用できます。最新バージョンの SDK を使用することをおすすめします。詳細については、Apache Beam 2.x SDK をご覧ください。
Python の言語間変換サポートも利用できます。
概要
BigQuery I/O コネクタでは、BigQuery への書き込みで次のメソッドがサポートされています。
STORAGE_WRITE_API
。このモードでは、コネクタは BigQuery Storage Write API を使用して BigQuery ストレージに直接書き込みを行います。Storage Write API は、ストリーミングの取り込みとバッチ読み込みを 1 つの高性能 API にまとめたものです。このモードでは exactly-once セマンティクスが保証されます。STORAGE_API_AT_LEAST_ONCE
。このモードでは Storage Write API も使用しますが、at-least-once セマンティクスが提供されます。このモードを使用すると、ほとんどのパイプラインでレイテンシが短縮されます。ただし、重複書き込みが発生する可能性があります。FILE_LOADS
。このモードでは、入力データを Cloud Storage のステージング ファイルに書き込みます。その後、BigQuery の読み込みジョブを実行して、データを BigQuery に読み込みます。このモードは、バッチ パイプラインで最もよく見られる制限付きPCollections
のデフォルトです。STREAMING_INSERTS
。このモードでは、コネクタは以前のストリーミング API を使用します。このモードは、制限なしPCollections
のデフォルトですが、新しいプロジェクトでは推奨されません。
書き込み方法を選択する際は、次の点を考慮してください。
- ストリーミング ジョブの場合は、
STORAGE_WRITE_API
またはSTORAGE_API_AT_LEAST_ONCE
の使用を検討してください。これらのモードでは、中間ステージング ファイルを使用せずに、BigQuery ストレージに直接書き込みます。 - 1 回以上のストリーミング モードを使用してパイプラインを実行する場合は、書き込みモードを
STORAGE_API_AT_LEAST_ONCE
に設定します。この設定はより効率的で、1 回以上のストリーミング モードのセマンティクスと一致します。 - ファイルの読み込みと Storage Write API では割り当てと上限が異なります。
- 読み込みジョブは、共有 BigQuery スロットプールまたは予約済みスロットのいずれかを使用します。予約済みスロットを使用するには、
PIPELINE
タイプの予約割り当てのプロジェクトで読み込みジョブを実行します。共有 BigQuery スロットプールを使用する場合、読み込みジョブは無料です。ただし、BigQuery は共有プールで使用可能な容量について保証しません。詳細については、予約の概要をご覧ください。
並列処理
ストリーミング パイプラインでの
FILE_LOADS
とSTORAGE_WRITE_API
の場合、コネクタはデータを多数のファイルまたはストリームにシャーディングします。通常は、withAutoSharding
を呼び出して自動シャーディングを有効にすることをおすすめします。バッチ パイプラインの
FILE_LOADS
の場合、コネクタはパーティション分割ファイルにデータを書き込み、その後、BigQuery に並列に読み込まれます。バッチ パイプライン内の
STORAGE_WRITE_API
の場合、各ワーカーはシャードの合計数によって BigQuery に書き込む 1 つ以上のストリームを作成します。STORAGE_API_AT_LEAST_ONCE
の場合、デフォルトの書き込みストリームが 1 つあります。このストリームには複数のワーカーが追加されます。
パフォーマンス
次の表に、BigQuery I/O のさまざまな読み取りオプションのパフォーマンス指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2
ワーカーで実行されています。Runner v2 は使用されていません。
1 億件のレコード | 1 KB | 1 列 | スループット(バイト) | スループット(要素) |
---|---|---|
ストレージ書き込み | 55 MBps | 54,000 要素/秒 |
Avro の読み込み | 78 MBps | 77,000 要素/秒 |
Json の読み込み | 54 MBps | 53,000 要素/秒 |
これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。
ベスト プラクティス
このセクションでは、Dataflow から BigQuery に書き込むためのベスト プラクティスを説明します。
一般的な考慮事項
Storage Write API には割り当て上限があります。コネクタは、ほとんどのパイプラインでこれらの上限を処理します。ただし、一部のシナリオでは使用可能な Storage Write API ストリームが使い果たされる可能性があります。たとえば、特に負荷の高いワークロードが長時間実行されるジョブでは、多数の宛先を持つ自動シャーディングと自動スケーリングを使用するパイプラインで、この問題が発生することがあります。この問題が発生した場合は、
STORAGE_WRITE_API_AT_LEAST_ONCE
の使用を検討してください。これにより、問題を回避できます。Google Cloud の指標を使用して、Storage Write API の割り当て使用量をモニタリングします。
ファイル読み込みを使用する場合、Avro は通常 JSON よりパフォーマンスが優れています。Avro を使用するには、
withAvroFormatFunction
を呼び出します。デフォルトでは、読み込みジョブは Dataflow ジョブと同じプロジェクトで実行されます。別のプロジェクトを指定するには、
withLoadJobProjectId
を呼び出します。Java SDK を使用する場合は、BigQuery テーブルのスキーマを表すクラスを作成することを検討してください。次に、パイプラインで
useBeamSchema
を呼び出して、Apache BeamRow
型と BigQuery のTableRow
型を自動的に変換します。スキーマクラスの例については、ExampleModel.java
をご覧ください。何千ものフィールドを含む複雑なスキーマを持つテーブルを読み込む場合は、
withMaxBytesPerPartition
を呼び出して、読み込みジョブごとの最大サイズを小さく設定することを検討してください。
ストリーミング パイプライン
ストリーミング パイプラインには、次の推奨事項が適用されます。
ストリーミング パイプラインには、Storage Write API(
STORAGE_WRITE_API
またはSTORAGE_API_AT_LEAST_ONCE
)を使用することをおすすめします。ストリーミング パイプラインはファイル読み込みを使用できますが、次のような欠点があります。
可能であれば、
STORAGE_WRITE_API_AT_LEAST_ONCE
の使用を検討してください。その結果、重複するレコードが BigQuery に書き込まれる可能性がありますが、STORAGE_WRITE_API
よりも低コストで、スケーラビリティも高くなります。通常は
STREAMING_INSERTS
は使用しないでください。ストリーミング挿入には Storage Write API よりもコストがかかり、パフォーマンスも低くなります。データ シャーディングにより、ストリーミング パイプラインのパフォーマンスが向上します。ほとんどのパイプラインで、自動シャーディングが出発点として適しています。ただし、次のようにシャーディングを調整できます。
STORAGE_WRITE_API
の場合は、withNumStorageWriteApiStreams
を呼び出して、書き込みストリームの数を設定します。FILE_LOADS
の場合は、withNumFileShards
を呼び出してファイル シャードの数を設定します。
ストリーミング挿入を使用する場合は、
retryTransientErrors
を再試行ポリシーとして設定することをおすすめします。
バッチ パイプライン
バッチ パイプラインには、次の推奨事項が適用されます。
ほとんどの大規模なバッチ パイプラインでは、まず
FILE_LOADS
を試すことをおすすめします。バッチ パイプラインはSTORAGE_WRITE_API
を使用できますが、大規模(1,000 個以上の vCPU)な場合や同時実行のパイプラインが実行されている場合は、割り当ての上限を超える可能性があります。Apache Beam は、バッチSTORAGE_WRITE_API
ジョブの書き込みストリームの最大数をスロットリングしないため、ジョブは最終的に BigQuery Storage API の上限に達します。FILE_LOADS
を使用すると、共有 BigQuery スロットプールまたは予約済みスロットプールのいずれかが使い果たされる可能性があります。この種の障害が発生した場合は、次の方法をお試しください。- ジョブのワーカーの最大数またはワーカーのサイズを減らす。
- 予約スロットを追加購入する。
STORAGE_WRITE_API
の使用を検討する。
小規模から中規模のパイプライン(1,000 vCPU 未満)では、
STORAGE_WRITE_API
を使用することが効果的な場合があります。このような小規模なジョブで、デッドレター キューが必要な場合や、FILE_LOADS
共有スロットプールが不十分な場合は、STORAGE_WRITE_API
の使用を検討してください。重複データを許容できる場合は、
STORAGE_WRITE_API_AT_LEAST_ONCE
の使用を検討してください。このモードでは、重複するレコードが BigQuery に書き込まれる可能性がありますが、STORAGE_WRITE_API
オプションよりも低コストです。書き込みモードは、パイプラインの特性に応じて異なるパフォーマンスを発揮する場合があります。ワークロードに最適な書き込みモードをテストして見つけてください。
行レベルのエラーを処理する
このセクションでは、不適切な形式の入力データやスキーマの不一致が原因で発生する、行レベルのエラーの処理方法について説明します。
Storage Write API の場合、書き込みできない行は別の PCollection
に配置されます。このコレクションを取得するには、WriteResult
オブジェクトで getFailedStorageApiInserts
を呼び出します。このアプローチの例については、BigQuery にデータをストリーミングするをご覧ください。
後で処理できるように、デッドレター キューまたはテーブルにエラーを送信することをおすすめします。このパターンの詳細については、BigQueryIO
デッドレター パターンをご覧ください。
FILE_LOADS
でデータの読み込み中にエラーが発生した場合、読み込みジョブが失敗し、パイプラインがランタイム例外をスローします。エラーは Dataflow ログまたは BigQuery ジョブ履歴で確認できます。I/O コネクタは、個々の失敗した行に関する情報を返しません。
エラーのトラブルシューティングの詳細については、BigQuery コネクタエラーをご覧ください。
例
次の例では、Dataflow を使用して BigQuery に書き込む方法を示します。
既存のテーブルに書き込む
次の例では、PCollection<MyData>
を BigQuery に書き込むバッチ パイプラインを作成します(MyData
はカスタムデータ型です)。
BigQueryIO.write()
メソッドは、書き込みオペレーションの構成に使用される BigQueryIO.Write<T>
タイプを返します。詳細については、Apache Beam ドキュメントのテーブルへの書き込みをご覧ください。このコードサンプルは、既存のテーブル(CREATE_NEVER
)に書き込み、新しい行をテーブル(WRITE_APPEND
)に追加します。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
新規または既存のテーブルに書き込む
次の例では、create disposition を CREATE_IF_NEEDED
に設定することで、宛先テーブルが存在しない場合に新しいテーブルを作成します。このオプションを使用する場合は、テーブル スキーマを指定する必要があります。コネクタは、新しいテーブルを作成するときにこのスキーマを使用します。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
BigQuery にデータをストリーミングする
次の例は、書き込みモードを STORAGE_WRITE_API
に設定して、exactly-once セマンティクスを使用してデータをストリーミングする方法を示しています。
すべてのストリーミング パイプラインが exactly-once セマンティクスを必要とするわけではありません。たとえば、宛先テーブルから重複を手動で削除できる場合があります。シナリオでレコードの重複が許容される場合は、書き込みメソッドを STORAGE_API_AT_LEAST_ONCE
に設定して、exactly-once セマンティクスを使用することを検討してください。通常、この方法は効率的で、ほとんどのパイプラインでレイテンシが短縮されます。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
次のステップ
- BigQuery I/O コネクタの詳細を確認する。Apache Beam のドキュメントをご覧ください。
- Storage Write API を使用した BigQuery へのデータのストリーミング(ブログ投稿)を読む。