Eseguire lo streaming dei dati utilizzando l'API Storage Write
Questo documento descrive come utilizzare l'API BigQuery Storage Write per inserire flussi di dati in BigQuery.
Negli scenari di streaming, i dati arrivano continuamente e dovrebbero essere disponibili per le letture con latenza minima. Quando utilizzi l'API BigQuery Storage Write per i carichi di lavoro streaming, valuta le garanzie di cui hai bisogno:
- Se la tua applicazione ha bisogno solo della semantica almeno una volta, utilizza lo stream predefinito.
- Se hai bisogno della semantica exactly-once, crea uno o più stream in tipo di commit e utilizza gli offset dello stream per garantire le scritture exactly-once.
Nel tipo di commit, i dati scritti nello stream sono disponibili per le query non appena il server conferma la richiesta di scrittura. Anche lo stream predefinito utilizza il tipo impegnato, ma non fornisce garanzie di invio esattamente una volta.
Utilizza lo stream predefinito per la semantica almeno una volta
Se la tua applicazione può accettare la possibilità che nella tabella di destinazione vengano visualizzati record duplicati, ti consigliamo di utilizzare lo stream predefinito per gli scenari di streaming.
Il seguente codice mostra come scrivere i dati nello stream predefinito:
Java
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Java.
Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.
Node.js
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery.
Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.
Python
Questo esempio mostra come inserire un record con due campi utilizzando lo stream predefinito:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
Questo esempio di codice dipende dal modulo di protocollo compilato sample_data_pb2.py
. Per creare il modulo compilato, esegui il comando protoc --python_out=. sample_data.proto
, dove protoc
è il compilatore del buffer del protocollo. Il file sample_data.proto
definisce il formato
dei messaggi utilizzati nell'esempio Python. Per installare il compilatore protoc
, segui le istruzioni riportate in Protocol Buffers - Google's data interchange format.
Ecco i contenuti del file sample_data.proto
:
message SampleData {
required string name = 1;
required int64 age = 2;
}
Questo script utilizza il file entities.json
, che contiene i dati di riga di esempio da inserire nella tabella BigQuery:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Utilizzare il multiplexing
Attiva il
multiplexing
a livello di stream writer solo per lo stream predefinito. Per attivare il multiplexing in Java, chiama il metodo setEnableConnectionPool
quando crei un oggetto StreamWriter
o JsonStreamWriter
:
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .build();
Per attivare il multiplexing in Go, consulta Condivisione della connessione (multiplexing).
Utilizza il tipo di commit per la semantica esattamente una volta
Se hai bisogno di una semantica di scrittura esattamente una volta, crea uno stream di scrittura di tipo commit. Nel tipo di commit, i record sono disponibili per le query non appena il client riceve il riconoscimento dal back-end.
Il tipo di commit garantisce la consegna "exactly-once" all'interno di uno stream tramite l'utilizzo di offset dei record. Utilizzando gli offset dei record, l'applicazione specifica l'offset di accodamento successivo in ogni chiamata a AppendRows
. L'operazione di scrittura viene eseguita solo se il valore dell'offset corrisponde all'offset di accodamento successivo. Per ulteriori informazioni, consulta Gestire gli offset dello stream per ottenere la semantica esattamente una volta.
Se non fornisci un offset, i record vengono aggiunti alla fine corrente dello stream. In questo caso, se una richiesta di accodamento restituisce un errore, riprovare potrebbe comportare la visualizzazione del record più di una volta nello stream.
Per utilizzare il tipo di commit, segui questi passaggi:
Java
- Chiama
CreateWriteStream
per creare uno o più stream nel tipo di commit. - Per ogni stream, chiama
AppendRows
in un ciclo per scrivere batch di record. - Chiama
FinalizeWriteStream
per ogni stream per rilasciarlo. Dopo aver chiamato questo metodo, non puoi più scrivere righe nello stream. Questo passaggio è facoltativo per il tipo di impegno, ma aiuta a evitare di superare il limite di stream attivi. Per ulteriori informazioni, consulta Limitare la frequenza di creazione dei flussi.
Node.js
- Chiama
createWriteStreamFullResponse
per creare uno o più stream nel tipo di commit. - Per ogni stream, chiama
appendRows
in un ciclo per scrivere batch di record. - Chiama
finalize
per ogni stream per rilasciarlo. Dopo aver chiamato questo metodo, non puoi più scrivere righe nello stream. Questo passaggio è facoltativo per il tipo di impegno, ma aiuta a evitare di superare il limite di stream attivi. Per ulteriori informazioni, consulta Limitare la frequenza di creazione dei flussi.
Non puoi eliminare uno stream in modo esplicito. Gli stream rispettano la durata (TTL) definita dal sistema:
- Uno stream impegnato ha un TTL di tre giorni se non c'è traffico nello stream.
- Per impostazione predefinita, uno stream con buffer ha un TTL di sette giorni se non c'è traffico nello stream.
Il seguente codice mostra come utilizzare il tipo di commit:
Java
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Java.
Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.
Node.js
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery.
Per autenticarti in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, consulta Configurare l'autenticazione per le librerie client.