Melakukan streaming data menggunakan Storage Write API
Dokumen ini menjelaskan cara menggunakan BigQuery Storage Write API untuk melakukan streaming data ke BigQuery.
Dalam skenario streaming, data tiba secara terus-menerus dan harus tersedia untuk bacaan dengan latensi minimal. Saat menggunakan BigQuery Storage Write API untukworkload streaming, pertimbangkan jaminan yang Anda perlukan:
- Jika aplikasi Anda hanya memerlukan semantik setidaknya satu kali, gunakan streaming default.
- Jika Anda memerlukan semantik tepat satu kali, buat satu atau beberapa streaming dalam jenis yang di-commit dan gunakan offset streaming untuk menjamin penulisan tepat satu kali.
Dalam jenis yang di-commit, data yang ditulis ke streaming data tersedia untuk kueri segera setelah server mengonfirmasi permintaan tulis. Aliran data default juga menggunakan jenis yang di-commit, tetapi tidak memberikan jaminan tepat satu kali.
Menggunakan aliran data default untuk semantik minimal satu kali
Jika aplikasi Anda dapat menerima kemungkinan data duplikat muncul di tabel tujuan, sebaiknya gunakan streaming default untuk skenario streaming.
Kode berikut menunjukkan cara menulis data ke streaming data default:
Java
Untuk mempelajari cara menginstal dan menggunakan library klien untuk BigQuery, lihat library klien BigQuery. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi API Java BigQuery.
Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.
Node.js
Untuk mempelajari cara menginstal dan menggunakan library klien untuk BigQuery, lihat library klien BigQuery.
Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.
Python
Contoh ini menunjukkan cara menyisipkan data dengan dua kolom menggunakan streaming data default:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
Contoh kode ini bergantung pada modul protokol yang dikompilasi sample_data_pb2.py
. Untuk membuat modul yang dikompilasi, jalankan perintah protoc --python_out=. sample_data.proto
, dengan protoc
sebagai compiler buffering protokol. File sample_data.proto
menentukan format pesan yang digunakan dalam contoh Python. Untuk menginstal compiler protoc
, ikuti petunjuk di Buffer Protokol - Format pertukaran data Google.
Berikut adalah isi file sample_data.proto
:
message SampleData {
required string name = 1;
required int64 age = 2;
}
Skrip ini menggunakan file entities.json
, yang berisi contoh data baris yang akan disisipkan ke dalam tabel BigQuery:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Menggunakan multiplexing
Anda mengaktifkan
multiplexing
pada level penulis streaming hanya untuk streaming default. Untuk mengaktifkan multiplexing di
Java, panggil metode setEnableConnectionPool
saat Anda membuat
objek StreamWriter
atau JsonStreamWriter
:
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .build();
Untuk mengaktifkan multiplexing di Go, lihat Connection Sharing (Multiplexing).
Menggunakan jenis yang di-commit untuk semantik tepat satu kali
Jika Anda memerlukan semantik penulisan tepat satu kali, buat streaming data tulis dalam jenis yang di-commit. Dalam jenis commit, data tersedia untuk kueri segera setelah klien menerima konfirmasi dari backend.
Jenis yang di-commit memberikan pengiriman tepat satu kali dalam streaming melalui penggunaan
offset data. Dengan menggunakan offset data, aplikasi akan menentukan offset penambahan
berikutnya di setiap panggilan ke AppendRows
. Operasi tulis
hanya dilakukan jika nilai offset cocok dengan offset penambahan berikutnya. Untuk mengetahui informasi
selengkapnya, lihat
Mengelola offset streaming untuk mencapai semantik tepat satu kali.
Jika Anda tidak memberikan offset, kumpulan data akan ditambahkan ke akhir streaming saat ini. Dalam hal ini, jika permintaan tambahkan menampilkan error, mencoba lagi dapat menyebabkan data muncul lebih dari sekali dalam streaming.
Untuk menggunakan jenis yang di-commit, lakukan langkah-langkah berikut:
Java
- Panggil
CreateWriteStream
untuk membuat satu atau beberapa streaming dalam jenis yang di-commit. - Untuk setiap aliran data, panggil
AppendRows
dalam satu loop untuk menulis batch kumpulan data. - Panggil
FinalizeWriteStream
untuk setiap streaming guna merilis streaming. Setelah memanggil metode ini, Anda tidak dapat menulis baris lagi ke aliran data. Langkah ini bersifat opsional dalam jenis commit, tetapi membantu mencegah melampaui batas pada streaming aktif. Untuk mengetahui informasi selengkapnya, lihat Membatasi kecepatan pembuatan streaming.
Node.js
- Panggil
createWriteStreamFullResponse
untuk membuat satu atau beberapa streaming dalam jenis yang di-commit. - Untuk setiap aliran data, panggil
appendRows
dalam satu loop untuk menulis batch kumpulan data. - Panggil
finalize
untuk setiap streaming guna merilis streaming. Setelah memanggil metode ini, Anda tidak dapat menulis baris lagi ke aliran data. Langkah ini bersifat opsional dalam jenis commit, tetapi membantu mencegah melampaui batas pada streaming aktif. Untuk mengetahui informasi selengkapnya, lihat Membatasi kecepatan pembuatan streaming.
Anda tidak dapat menghapus streaming secara eksplisit. Streaming mengikuti time to live (TTL) yang ditentukan sistem:
- Streaming yang di-commit memiliki TTL selama tiga hari jika tidak ada traffic pada streaming.
- Streaming yang di-buffer secara default memiliki TTL selama tujuh hari jika tidak ada traffic pada streaming.
Kode berikut menunjukkan cara menggunakan jenis yang di-commit:
Java
Untuk mempelajari cara menginstal dan menggunakan library klien untuk BigQuery, lihat library klien BigQuery. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi API Java BigQuery.
Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.
Node.js
Untuk mempelajari cara menginstal dan menggunakan library klien untuk BigQuery, lihat library klien BigQuery.
Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.