このドキュメントでは、Apache Beam BigQuery I/O コネクタを使用して BigQuery から Dataflow にデータを読み取る方法について説明します。
概要
BigQuery I/O コネクタでは、BigQuery からの読み取りを行うために 2 つのオプションがサポートされています。
- テーブルの直接読み取り。BigQuery Storage Read API を使用するため、これは最速のオプションとなります。
- エクスポート ジョブ。このオプションでは、BigQuery はテーブルデータを Cloud Storage に書き込むエクスポート ジョブを実行します。その後、コネクタがエクスポートされたデータを Cloud Storage から読み取ります。このオプションはエクスポート ステップを必要とするため、効率的ではありません。
デフォルトのオプションはエクスポート ジョブです。直接読み取りを指定するには、withMethod(Method.DIRECT_READ)
を呼び出します。
コネクタは、テーブルデータを PCollection
にシリアル化します。PCollection
内の各要素は 1 つのテーブル行を表します。コネクタは、次のシリアル化方法をサポートしています。
- Avro 形式のレコードとしてデータを読み取る。この方法では、Avro レコードを解析してカスタムデータ型に変換する関数を用意します。
- データを
TableRow
オブジェクトとして読み取る。この方法は、カスタムデータ型が必要ないため便利です。ただし、通常 Avro 形式のレコードを読み取る場合よりもパフォーマンスが低くなります。
並列処理
このコネクタの並列処理は読み取り方法によって異なります。
直接読み取り: I/O コネクタは、エクスポート リクエストのサイズに基づいて動的な数のストリームを生成します。これらのストリームを BigQuery から並行して読み取ります。
エクスポート ジョブ: BigQuery は、Cloud Storage に書き込むファイルの数を決定します。ファイルの数は、クエリとデータ量によって異なります。I/O コネクタは、エクスポートされたファイルを並行して読み取ります。
パフォーマンス
次の表に、BigQuery I/O のさまざまな読み取りオプションのパフォーマンス指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2
ワーカーで実行されています。Runner v2 は使用されていません。
1 億件のレコード | 1 KB | 1 列 | スループット(バイト) | スループット(要素) |
---|---|---|
ストレージ読み取り | 120 MBps | 88,000 要素/秒 |
Avro エクスポート | 105 MBps | 78,000 要素/秒 |
JSON のエクスポート | 110 MBps | 81,000 要素/秒 |
これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。
ベスト プラクティス
通常は、テーブルの直接読み取り(
Method.DIRECT_READ
)を使用することをおすすめします。Storage Read API は、データをエクスポートする中間手順を必要としないため、エクスポート ジョブよりもデータ パイプラインに適しています。直接読み取りを使用すると、Storage Read API の使用量に対して料金が発生します。BigQuery の料金ページのデータ抽出の料金をご覧ください。
エクスポート ジョブに追加料金はかかりません。ただし、エクスポート ジョブには上限があります。タイミングを優先し、コストを調整できる大規模なデータ移動の場合は、直接読み取りをおすすめします。
Storage Read API には割り当て上限があります。Google Cloud の指標を使用して、割り当ての使用状況をモニタリングします。
Storage Read API を使用すると、次のようなリースの有効期限エラーとセッション タイムアウト エラーがログに表示されることがあります。
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
`
これらのエラーは、オペレーションがタイムアウトよりも時間がかかる場合に発生することがあります。通常は、実行時間が 6 時間を超えるパイプラインで発生します。この問題を軽減するには、ファイル エクスポートに切り替えます。
Java SDK を使用する場合は、BigQuery テーブルのスキーマを表すクラスを作成することを検討してください。次に、パイプラインで
useBeamSchema
を呼び出して、Apache BeamRow
型と BigQuery のTableRow
型を自動的に変換します。スキーマクラスの例については、ExampleModel.java
をご覧ください。
例
このセクションのコードサンプルでは、テーブルを直接読み取ります。
エクスポート ジョブを使用する場合は、withMethod
の呼び出しを省略するか、Method.EXPORT
を指定します。次に、--tempLocation
パイプライン オプションを設定して、エクスポートされたファイルの Cloud Storage バケットを指定します。
これらのコード例では、ソーステーブルに次の列があることを前提としています。
name
(文字列)age
(整数)
JSON スキーマ ファイルとして指定。
[
{"name":"user_name","type":"STRING","mode":"REQUIRED"},
{"name":"age","type":"INTEGER","mode":"REQUIRED"}
]
Avro 形式のレコードを読み取る
BigQuery データを Avro 形式のレコードに読み取るには、read(SerializableFunction)
メソッドを使用します。このメソッドは、SchemaAndRecord
オブジェクトを解析し、カスタムデータ型を返すアプリケーション定義関数を使用します。コネクタからの出力はカスタムデータ型の PCollection
です。
次のコードは、BigQuery テーブルから PCollection<MyData>
を読み取ります。ここで、MyData
はアプリケーション定義クラスです。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
read
メソッドは SerializableFunction<SchemaAndRecord, T>
インターフェースを使用します。このインターフェースには、Avro レコードからカスタム データクラスに変換する関数が定義されています。上のコードサンプルでは、MyData.apply
メソッドがこの変換関数を実装しています。このサンプル関数は、Avro レコードの name
フィールドと age
フィールドを解析し、MyData
インスタンスを返します。
読み取る BigQuery テーブルを指定するには、前の例のように from
メソッドを呼び出します。詳細については、BigQuery I/O コネクタ ドキュメントのテーブル名をご覧ください。
TableRow
オブジェクトを読み取る
readTableRows
メソッドは、BigQuery データを TableRow
オブジェクトの PCollection
に読み込みます。各 TableRow
は、テーブルデータの単一行を保持する Key-Value ペアのマップです。from
メソッドを呼び出して、読み取る BigQuery テーブルを指定します。
次のコードは、BigQuery テーブルから PCollection<TableRows>
を読み取ります。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
また、この例には、TableRow
ディクショナリの値にアクセスする方法も示されています。整数値は、BigQuery でエクスポートされた JSON 形式と一致するように文字列としてエンコードされます。
列の射影とフィルタリング
直接読み取り(Method.DIRECT_READ
)を使用する場合、BigQuery から読み取られてネットワーク経由で送信されるデータの量を減らすことで、読み取りオペレーションを効率化できます。
- 列射影:
withSelectedFields
を呼び出して、テーブルから列のサブセットを読み取ります。この方法では、テーブルに多数の列がある場合に効率的に読み取ることができます。 - 行フィルタリング:
withRowRestriction
を呼び出して、サーバー側でデータをフィルタリングする述語を指定します。
フィルタの述語は確定的である必要があります。また、集計はサポートされていません。
次の例では、"user_name"
列と "age"
列を射影し、述語 "age > 18"
と一致しない行を除外しています。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
クエリ結果から読み取る
上記の例は、テーブルから行を読み取る方法を示しています。fromQuery
を呼び出すと、SQL クエリの結果から読み取ることもできます。この方法では、計算作業の一部を BigQuery に移行します。 また、このメソッドを使用してビューにクエリを実行し、BigQuery ビューまたはマテリアライズド ビューから読み取ることもできます。
次の例では、BigQuery 一般公開データセットに対してクエリを実行し、結果を読み取ります。パイプラインの実行後、BigQuery ジョブ履歴でクエリジョブを確認できます。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
次のステップ
- BigQuery I/O コネクタのドキュメントを確認する。
- Google 提供のテンプレートのリストを確認する。