Storage Write API を使用したデータのストリーミング
このドキュメントでは、BigQuery Storage Write API を使用して BigQuery にデータをストリーミングする方法について説明します。
ストリーミングのシナリオでは、データは継続的に到着し、最小限のレイテンシで読み取りに使用できる必要があります。ストリーミング ワークロードに BigQuery Storage Write API を使用する場合は、どのような保証が必要かを検討してください。
- アプリケーションで at-least-once セマンティクスのみが必要な場合は、デフォルト ストリームを使用します。
- exactly-once セマンティクスが必要な場合は、コミットタイプで 1 つ以上のストリームを作成し、ストリーム オフセットを使用して exactly-once の書き込みを保証します。
コミットタイプでは、サーバーが書き込みリクエストを確認するとすぐに、ストリームに書き込まれたデータがクエリに使用できるようになります。デフォルト ストリームでもコミットタイプが使用されますが、exactly-once は保証されません。
at-least-once セマンティクスにデフォルト ストリームを使用する
アプリケーションが宛先テーブルに重複するレコードが表示される可能性を受け入れることができる場合は、ストリーミング シナリオにデフォルト ストリームを使用することをおすすめします。
次のコードでは、デフォルト ストリームにデータを書き込む方法を示します。
Java
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証情報を設定するをご覧ください。
Node.js
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証情報を設定するをご覧ください。
Python
次の例は、デフォルト ストリームを使用して 2 つのフィールドを持つレコードを挿入する方法を示しています。
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
    "name",
    "age"
    ]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
    row = sample_data_pb2.SampleData()
    for field in TABLE_COLUMNS_TO_CHECK:
      # Optional fields will be passed as null if not provided
      if field in data:
        setattr(row, field, data[field])
    return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
    # The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
    def append_rows_proto2(
        project_id: str, dataset_id: str, table_id: str, data: dict
    ):
        write_client = bigquery_storage_v1.BigQueryWriteClient()
        parent = write_client.table_path(project_id, dataset_id, table_id)
        stream_name = f'{parent}/_default'
        write_stream = types.WriteStream()
        # Create a template with fields needed for the first request.
        request_template = types.AppendRowsRequest()
        # The request must contain the stream name.
        request_template.write_stream = stream_name
        # Generating the protocol buffer representation of the message descriptor.
        proto_schema = types.ProtoSchema()
        proto_descriptor = descriptor_pb2.DescriptorProto()
        sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
        proto_schema.proto_descriptor = proto_descriptor
        proto_data = types.AppendRowsRequest.ProtoData()
        proto_data.writer_schema = proto_schema
        request_template.proto_rows = proto_data
        # Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
        append_rows_stream = writer.AppendRowsStream(write_client, request_template)
        # Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
        proto_rows = types.ProtoRows()
        for row in data:
            proto_rows.serialized_rows.append(create_row_data(row))
        # Appends data to the given stream.
        request = types.AppendRowsRequest()
        proto_data = types.AppendRowsRequest.ProtoData()
        proto_data.rows = proto_rows
        request.proto_rows = proto_data
        append_rows_stream.send(request)
        print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
    ###### Uncomment the below block to provide additional logging capabilities ######
    #logging.basicConfig(
    #    level=logging.DEBUG,
    #    format="%(asctime)s [%(levelname)s] %(message)s",
    #    handlers=[
    #        logging.StreamHandler()
    #    ]
    #)
    ###### Uncomment the above block to provide additional logging capabilities ######
    with open('entries.json', 'r') as json_file:
        data = json.load(json_file)
    # Change this to your specific BigQuery project, dataset, table details
    BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
このサンプルコードは、コンパイルされたプロトコル モジュール sample_data_pb2.py に依存しています。コンパイルされたモジュールを作成するには、protoc --python_out=. sample_data.proto コマンドを実行します。ここで、protoc はプロトコル バッファ コンパイラです。sample_data.proto ファイルは、Python の例で使用されるメッセージの形式を定義します。protoc コンパイラをインストールするには、プロトコル バッファ - Google のデータ交換フォーマットの手順に沿って操作します。
sample_data.proto ファイルの内容は次のとおりです。
message SampleData {
  required string name = 1;
  required int64 age = 2;
}
このスクリプトでは、entries.json ファイルを使用します。このファイルには、BigQuery テーブルに挿入するサンプルの行データが含まれています。
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
多重化を使用する
デフォルト ストリームに対してのみ、ストリーム ライター レベルで多重化を有効にします。Java で多重化を有効にするには、StreamWriter オブジェクトまたは JsonStreamWriter オブジェクトを作成するときに setEnableConnectionPool メソッドを呼び出します。
接続プールを有効にすると、Java クライアント ライブラリがバックグラウンドで接続を管理し、既存の接続がビジー状態と判断された場合は接続をスケールアップします。自動スケールアップをより効果的に行うには、maxInflightRequests の上限を下げることを検討してください。
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();
Go で多重化を有効にするには、接続共有(多重化)をご覧ください。
exactly-once セマンティクスにコミットタイプを使用する
exactly-once セマンティクスが必要な場合は、コミットタイプで書き込みストリームを作成します。コミットタイプでは、クライアントがバックエンドから確認応答を受け取るとすぐにレコードをクエリで使用できます。
コミットタイプでは、レコード オフセットを使用して、ストリーム内で exactly-once 配信を行います。レコード オフセットを使用して、アプリケーションは AppendRows の呼び出しごとに次の追加オフセットを指定します。オフセット値が次に追加されたオフセットに一致する場合にのみ、書き込みオペレーションが実行されます。詳細については、ストリーム オフセットを管理して exactly-once セマンティクスを実現するをご覧ください。
オフセットを指定しない場合、レコードはストリームの現在の末尾に追加されます。この場合、追記リクエストでエラーが発生したときに再試行すると、ストリーム内でレコードが複数回出現する可能性があります。
コミットタイプを使用するには、次の手順を行います。
Java
- CreateWriteStreamを呼び出して、コミットタイプで 1 つ以上のストリームを作成します。
- ストリームごとに、AppendRowsをループで呼び出して、レコードのバッチを書き込みます。
- ストリームごとに FinalizeWriteStreamを呼び出して、ストリームを解放します。このメソッドを呼び出した後は、ストリームにこれ以上行を書き込むことはできません。コミットタイプではこの手順を省略できますが、アクティブなストリームの上限を超えないようにする効果があります。詳細については、ストリーム作成のレートを制限するをご覧ください。
Node.js
- createWriteStreamFullResponseを呼び出して、コミットタイプで 1 つ以上のストリームを作成します。
- ストリームごとに、appendRowsをループで呼び出して、レコードのバッチを書き込みます。
- ストリームごとに finalizeを呼び出して、ストリームを解放します。このメソッドを呼び出した後は、ストリームにこれ以上行を書き込むことはできません。コミットタイプではこの手順を省略できますが、アクティブなストリームの上限を超えないようにする効果があります。詳細については、ストリーム作成のレートを制限するをご覧ください。
ストリームを明示的に削除することはできません。ストリームは、システムが定義した有効期間(TTL)に従います。
- ストリームにトラフィックがない場合、コミットされたストリームの TTL は 3 日間です。
- ストリームにトラフィックがない場合、バッファリングされたストリームの TTL は、デフォルトで 7 日間です。
次のコードでは、コミットタイプを使用する方法を示します。
Java
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証情報を設定するをご覧ください。
Node.js
BigQuery 用のクライアント ライブラリをインストールして使用する方法については、BigQuery クライアント ライブラリをご覧ください。
BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、クライアント ライブラリの認証情報を設定するをご覧ください。
Apache Arrow 形式を使用してデータを取り込む
次のコードは、Apache Arrow 形式を使用してデータを取り込む方法を示しています。詳細なエンドツーエンドの例については、GitHub の PyArrow の例をご覧ください。
Python
この例では、デフォルト ストリームを使用してシリアル化された PyArrow テーブルを取り込む方法を示します。
from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1
def append_rows_with_pyarrow(
  pyarrow_table: pyarrow.Table,
  project_id: str,
  dataset_id: str,
  table_id: str,
):
  bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()
  # Create request_template.
  request_template = gapic_types.AppendRowsRequest()
  request_template.write_stream = (
      f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
  )
  arrow_data = gapic_types.AppendRowsRequest.ArrowData()
  arrow_data.writer_schema.serialized_schema = (
      pyarrow_table.schema.serialize().to_pybytes()
  )
  request_template.arrow_rows = arrow_data
  # Create AppendRowsStream.
  append_rows_stream = AppendRowsStream(
      bqstorage_write_client,
      request_template,
  )
  # Create request with table data.
  request = gapic_types.AppendRowsRequest()
  request.arrow_rows.rows.serialized_record_batch = (
      pyarrow_table.to_batches()[0].serialize().to_pybytes()
  )
  # Send request.
  future = append_rows_stream.send(request)
  # Wait for result.
  future.result()