Storage Write API を使用したデータのストリーミング
このドキュメントでは、BigQuery Storage Write API を使用して BigQuery にデータをストリーミングする方法について説明します。
ストリーミングのシナリオでは、データは継続的に到着し、最小限のレイテンシで読み取りに使用できる必要があります。ストリーミング ワークロードに BigQuery Storage Write API を使用する場合は、どのような保証が必要かを検討してください。
- アプリケーションで少なくとも 1 回のセマンティクスのみが必要な場合は、デフォルト ストリームを使用します。
- 1 回限りのセマンティクスが必要な場合は、コミットタイプで 1 つ以上のストリームを作成し、ストリーム オフセットを使用して 1 回限りの書き込みを保証します。
コミットタイプでは、サーバーが書き込みリクエストを確認するとすぐに、ストリームに書き込まれたデータがクエリに使用できるようになります。デフォルト ストリームでもコミットタイプが使用されますが、1 回限りの配信は保証されません。
少なくとも 1 回セマンティクスにデフォルト ストリームを使用する
アプリケーションが宛先テーブルに重複するレコードが表示される可能性を受け入れることができる場合は、ストリーミング シナリオにデフォルト ストリームを使用することをおすすめします。
次のコードでは、デフォルト ストリームにデータを書き込む方法を示します。
Java
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証情報を設定するをご覧ください。
Node.js
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証情報を設定するをご覧ください。
Python
次の例は、デフォルト ストリームを使用して 2 つのフィールドを持つレコードを挿入する方法を示しています。
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
このサンプルコードは、コンパイルされたプロトコル モジュール sample_data_pb2.py
に依存しています。コンパイルされたモジュールを作成するには、protoc --python_out=. sample_data.proto
コマンドを実行します。ここで、protoc
はプロトコル バッファ コンパイラです。sample_data.proto
ファイルは、Python の例で使用されるメッセージの形式を定義します。protoc
コンパイラをインストールするには、プロトコル バッファ - Google のデータ交換フォーマットの手順に沿って操作します。
sample_data.proto
ファイルの内容は次のとおりです。
message SampleData {
required string name = 1;
required int64 age = 2;
}
このスクリプトは、BigQuery テーブルに挿入するサンプル行データを含む entities.json
ファイルを使用します。
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
多重化を使用する
デフォルト ストリームに対してのみ、ストリーム ライター レベルで多重化を有効にします。Java で多重化を有効にするには、StreamWriter
オブジェクトまたは JsonStreamWriter
オブジェクトを作成するときに setEnableConnectionPool
メソッドを呼び出します。
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .build();
Go で多重化を有効にするには、接続共有(多重化)をご覧ください。
1 回限りのセマンティクスにコミットタイプを使用する
1 回限りの書き込みセマンティクスが必要な場合は、コミットタイプで書き込みストリームを作成します。コミットタイプでは、クライアントがバックエンドから確認応答を受け取るとすぐにレコードをクエリで使用できます。
コミットタイプでは、レコード オフセットを使用して、ストリーム内で 1 回だけ配信を行います。レコード オフセットを使用して、アプリケーションは AppendRows
の呼び出しごとに次の追加オフセットを指定します。オフセット値が次に追加されたオフセットに一致する場合にのみ、書き込みオペレーションが実行されます。詳細については、ストリーム オフセットを管理して 1 回限りのセマンティクスを実現するをご覧ください。
オフセットを指定しない場合、レコードはストリームの現在の末尾に追加されます。この場合、追加リクエストがエラーを返すと、ストリーム内でレコードが複数回出現する可能性があります。
コミットタイプを使用するには、次の手順を行います。
Java
CreateWriteStream
を呼び出して、コミットタイプで 1 つ以上のストリームを作成します。- ストリームごとに、
AppendRows
をループで呼び出して、レコードのバッチを書き込みます。 - ストリームごとに
FinalizeWriteStream
を呼び出して、ストリームを解放します。このメソッドを呼び出した後は、ストリームにこれ以上行を書き込むことはできません。コミットタイプではこの手順を省略できますが、アクティブなストリームの上限を超えないようにする効果があります。詳細については、ストリーム作成のレートを制限するをご覧ください。
Node.js
createWriteStreamFullResponse
を呼び出して、コミットタイプで 1 つ以上のストリームを作成します。- ストリームごとに、
appendRows
をループで呼び出して、レコードのバッチを書き込みます。 - ストリームごとに
finalize
を呼び出して、ストリームを解放します。このメソッドを呼び出した後は、ストリームにこれ以上行を書き込むことはできません。コミットタイプではこの手順を省略できますが、アクティブなストリームの上限を超えないようにする効果があります。詳細については、ストリーム作成のレートを制限するをご覧ください。
ストリームを明示的に削除することはできません。ストリームは、システムが定義した有効期間(TTL)に従います。
- ストリームにトラフィックがない場合、コミットされたストリームの TTL は 3 日間です。
- ストリームにトラフィックがない場合、バッファリングされたストリームの TTL は、デフォルトで 7 日間です。
次のコードでは、コミットモードを使用する方法を示します。
Java
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証情報を設定するをご覧ください。
Node.js
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証情報を設定するをご覧ください。