Daten mit der Storage Write API streamen
In diesem Dokument wird beschrieben, wie Sie mit der BigQuery Storage Write API Daten in BigQuery streamen.
In Streaming-Szenarien kommen Daten kontinuierlich an und sollten für Lesevorgänge mit minimaler Latenz verfügbar sein. Überlegen Sie bei der Verwendung der BigQuery Storage Write API für Streaming-Arbeitslasten, welche Garantien Sie benötigen:
- Wenn Ihre Anwendung eine "Mindestens einmal"-Semantik benötigt, verwenden Sie den Standardstream.
- Wenn Sie eine "Genau einmal"-Semantik benötigen, erstellen Sie einen oder mehrere Streams vom Typ „Zugesichert“ und verwenden Sie Stream-Offsets, um „Genau einmal“-Schreibvorgänge zu gewährleisten.
Beim Typ „Zugesichert“ stehen Daten, die in den Stream geschrieben werden, für eine Abfrage zur Verfügung, sobald der Server die Schreibanfrage bestätigt hat. Der Standardstream verwendet auch den Typ „Zugesichert“, bietet jedoch keine „Genau einmal“-Garantien.
Standardstream für "Mindestens einmal"-Semantik verwenden
Wenn Ihre Anwendung die Möglichkeit doppelter Datensätze akzeptiert, die in der Zieltabelle angezeigt werden, empfehlen wir die Verwendung des Standardstreams für Streaming-Szenarien.
Der folgende Code zeigt, wie Daten in den Standardstream geschrieben werden:
Java
Informationen zum Installieren und Verwenden der Clientbibliothek für BigQuery finden Sie unter BigQuery-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery Java API.
Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.
Node.js
Informationen zum Installieren und Verwenden der Clientbibliothek für BigQuery finden Sie unter BigQuery-Clientbibliotheken.
Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.
Python
In diesem Beispiel wird gezeigt, wie ein Datensatz mit zwei Feldern mit dem Standardstream eingefügt wird:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
Dieses Codebeispiel hängt vom kompilierten Protokollmodul sample_data_pb2.py
ab. Führen Sie zum Erstellen des kompilierten Moduls den Befehl protoc --python_out=. sample_data.proto
aus, wobei protoc
der Protokollzwischenspeicher-Compiler ist. Die Datei sample_data.proto
definiert das Format der im Python-Beispiel verwendeten Nachrichten. Folgen Sie der Anleitung unter Protokollpuffer – Das Datenaustauschformat von Google, um den protoc
-Compiler zu installieren.
Der Inhalt der Datei sample_data.proto
sieht so aus:
message SampleData {
required string name = 1;
required int64 age = 2;
}
Dieses Script verwendet die Datei entities.json
, die Beispielzeilendaten enthält, die in die BigQuery-Tabelle eingefügt werden sollen:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Multiplexing verwenden
Sie aktivieren Multiplexing auf der Stream-Writer-Ebene nur für den Standard-Stream. Zur Aktivierung des Multiplexing in Java rufen Sie die setEnableConnectionPool
-Methode beim Erstellen eines StreamWriter
- oder JsonStreamWriter
-Objekts auf:
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .build();
Informationen zum Aktivieren des Multiplexing in Go finden Sie unter Verbindungsfreigabe (Multiplexing).
Typ „Zugesichert“ für „Genau einmal“-Semantik verwenden
Wenn Sie eine „Genau einmal“-Semantik für Schreibvorgänge benötigen, erstellen Sie einen Schreibstream vom Typ „Zugesichert“. Beim Typ „Zugesichert“ sind Datensätze für die Abfrage verfügbar, sobald der Client vom Backend die Bestätigung erhält.
Der Typ „Zugesichert“ bietet eine „Genau einmal“-Übermittlung in einem Stream über die Verwendung von Datensatz-Offsets. Mithilfe von Datensatz-Offsets gibt die Anwendung bei jedem Aufruf von AppendRows
das nächste Anfüge-Offset an. Der Schreibvorgang wird nur ausgeführt, wenn der Versatzwert dem nächsten Anfüge-Offset entspricht. Weitere Informationen finden Sie unter Stream-Offsets für eine „Exactly-Once“-Semantik verwalten.
Wenn Sie keinen Offset angeben, werden Datensätze an das aktuelle Ende des Streams angehängt. Wenn in diesem Fall eine Anfügungsanfrage einen Fehler zurückgibt, kann ein erneuter Versuch dazu führen, dass der Datensatz mehr als einmal im Stream auftaucht.
Führen Sie die folgenden Schritte aus, um den Typ „Zugesichert“ zu verwenden:
Java
- Rufen Sie
CreateWriteStream
auf, um einen oder mehrere Streams vom Typ „Zugesichert“ zu erstellen. - Rufen Sie für jeden Stream
AppendRows
in einer Schleife auf, um Datensätze in Batches zu schreiben. - Rufen Sie
FinalizeWriteStream
für jeden Stream auf, um den Stream freizugeben. Nach dem Aufrufen dieser Methode können Sie keine weiteren Zeilen in den Stream schreiben. Dieser Schritt ist beim Typ „Zugesichert“ optional, verhindert jedoch, dass das Limit für aktive Streams überschritten wird. Weitere Informationen finden Sie unter Rate der Streamerstellung begrenzen.
Node.js
- Rufen Sie
createWriteStreamFullResponse
auf, um einen oder mehrere Streams vom Typ „Zugesichert“ zu erstellen. - Rufen Sie für jeden Stream
appendRows
in einer Schleife auf, um Datensätze in Batches zu schreiben. - Rufen Sie
finalize
für jeden Stream auf, um den Stream freizugeben. Nach dem Aufrufen dieser Methode können Sie keine weiteren Zeilen in den Stream schreiben. Dieser Schritt ist beim Typ „Zugesichert“ optional, verhindert jedoch, dass das Limit für aktive Streams überschritten wird. Weitere Informationen finden Sie unter Rate der Streamerstellung begrenzen.
Sie können einen Stream nicht explizit löschen. Streams folgen der systemdefinierten Gültigkeitsdauer (TTL):
- Ein zugesicherter Stream hat eine TTL von drei Tagen, wenn kein Traffic im Stream vorhanden ist.
- Ein gepufferter Stream hat standardmäßig eine TTL von sieben Tagen, wenn kein Traffic im Stream vorhanden ist.
Der folgende Code zeigt die Verwendung des Typs „Zugesichert“.
Java
Informationen zum Installieren und Verwenden der Clientbibliothek für BigQuery finden Sie unter BigQuery-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery Java API.
Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.
Node.js
Informationen zum Installieren und Verwenden der Clientbibliothek für BigQuery finden Sie unter BigQuery-Clientbibliotheken.
Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.