Transmita dados através da API Storage Write
Este documento descreve como usar a API BigQuery Storage Write para fazer stream de dados para o BigQuery.
Em cenários de streaming, os dados chegam continuamente e devem estar disponíveis para leituras com uma latência mínima. Quando usar a API Storage Write do BigQuery para cargas de trabalho de streaming, considere as garantias de que precisa:
- Se a sua aplicação só precisar de semântica de, pelo menos, uma vez, use a transmissão predefinida.
- Se precisar de semântica exatamente uma vez, crie uma ou mais streams do tipo committed e use deslocamentos de streams para garantir escritas exatamente uma vez.
No tipo committed, os dados escritos na stream estão disponíveis para consulta assim que o servidor reconhece o pedido de escrita. A stream predefinida também usa o tipo committed, mas não oferece garantias de exatamente uma vez.
Use a stream predefinida para a semântica de pelo menos uma vez
Se a sua aplicação puder aceitar a possibilidade de registos duplicados serem apresentados na tabela de destino, recomendamos que use a stream predefinida para cenários de streaming.
O código seguinte mostra como escrever dados na stream predefinida:
Java
Para saber como instalar e usar a biblioteca cliente do BigQuery, consulte o artigo Bibliotecas cliente do BigQuery. Para mais informações, consulte a API Java BigQuery documentação de referência.
Para se autenticar no BigQuery, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para bibliotecas de cliente.
Node.js
Para saber como instalar e usar a biblioteca cliente do BigQuery, consulte o artigo Bibliotecas cliente do BigQuery.
Para se autenticar no BigQuery, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para bibliotecas de cliente.
Python
Este exemplo mostra como inserir um registo com dois campos usando a stream predefinida:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
Este exemplo de código depende do módulo de protocolo compilado sample_data_pb2.py
. Para criar o módulo compilado, execute o comando protoc --python_out=. sample_data.proto
, em que protoc
é o compilador de buffers de protocolo. O ficheiro sample_data.proto
define o formato das mensagens usadas no exemplo de Python. Para instalar o compilador protoc
, siga as instruções em Buffers de protocolo: o formato de intercâmbio de dados da Google.
Seguem-se os conteúdos do ficheiro sample_data.proto
:
message SampleData {
required string name = 1;
required int64 age = 2;
}
Este script usa o ficheiro entries.json
, que contém dados de linhas de exemplo a inserir na tabela do BigQuery:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Use a multiplexagem
Ativa a
multiplexagem
ao nível do gravador de streams apenas para a stream predefinida. Para ativar a multiplexagem em Java, chame o método setEnableConnectionPool
quando construir um objeto StreamWriter
ou JsonStreamWriter
.
Depois de ativar o conjunto de ligações, a biblioteca de cliente Java gere as suas ligações em segundo plano, aumentando as ligações se as ligações existentes forem consideradas demasiado ocupadas. Para que o aumento automático seja mais eficaz, deve considerar diminuir o limite.maxInflightRequests
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();
Para ativar a multiplexagem no Go, consulte o artigo Partilha de ligações (multiplexagem).
Use o tipo committed para semântica exatamente uma vez
Se precisar de semântica de escrita exatamente uma vez, crie um fluxo de escrita do tipo committed. No tipo comprometido, os registos estão disponíveis para consulta assim que o cliente recebe a confirmação do back-end.
O tipo Committed oferece uma entrega exatamente uma vez numa stream através da utilização de
desvios de registos. Ao usar desvios de registo, a aplicação especifica o desvio de anexação seguinte em cada chamada para AppendRows
. A operação de escrita só é realizada se o valor de deslocamento corresponder ao deslocamento de anexação seguinte. Para mais
informações, consulte o artigo
Faça a gestão dos desvios de streams para alcançar uma semântica exatamente uma vez.
Se não fornecer um desvio, os registos são anexados ao fim atual da stream. Nesse caso, se um pedido de anexação devolver um erro, a repetição do mesmo pode fazer com que o registo apareça mais do que uma vez na stream.
Para usar o tipo comprometido, siga estes passos:
Java
- Chame
CreateWriteStream
para criar uma ou mais streams do tipo comprometido. - Para cada stream, chame
AppendRows
num ciclo para escrever lotes de registos. - Chame
FinalizeWriteStream
para cada stream para libertar a stream. Depois de chamar este método, não pode escrever mais linhas na stream. Este passo é opcional no tipo de compromisso, mas ajuda a evitar exceder o limite de streams ativas. Para mais informações, consulte o artigo Limite a taxa de criação de streams.
Node.js
- Chame
createWriteStreamFullResponse
para criar uma ou mais streams do tipo comprometido. - Para cada stream, chame
appendRows
num ciclo para escrever lotes de registos. - Chame
finalize
para cada stream para libertar a stream. Depois de chamar este método, não pode escrever mais linhas na stream. Este passo é opcional no tipo de compromisso, mas ajuda a evitar exceder o limite de streams ativas. Para mais informações, consulte o artigo Limite a taxa de criação de streams.
Não é possível eliminar um stream explicitamente. Os streams seguem o tempo de vida (TTL) definido pelo sistema:
- Uma stream comprometida tem um TTL de três dias se não houver tráfego na stream.
- Por predefinição, uma stream com buffer tem um TTL de sete dias se não houver tráfego na stream.
O código seguinte mostra como usar o tipo committed:
Java
Para saber como instalar e usar a biblioteca cliente do BigQuery, consulte o artigo Bibliotecas cliente do BigQuery. Para mais informações, consulte a API Java BigQuery documentação de referência.
Para se autenticar no BigQuery, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para bibliotecas de cliente.
Node.js
Para saber como instalar e usar a biblioteca cliente do BigQuery, consulte o artigo Bibliotecas cliente do BigQuery.
Para se autenticar no BigQuery, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para bibliotecas de cliente.
Use o formato Apache Arrow para carregar dados
O código seguinte mostra como carregar dados usando o formato Apache Arrow. Para um exemplo mais detalhado e completo, consulte o exemplo do PyArrow no GitHub.
Python
Este exemplo mostra como carregar uma tabela PyArrow serializada através do fluxo predefinido.
from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1
def append_rows_with_pyarrow(
pyarrow_table: pyarrow.Table,
project_id: str,
dataset_id: str,
table_id: str,
):
bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create request_template.
request_template = gapic_types.AppendRowsRequest()
request_template.write_stream = (
f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
)
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
arrow_data.writer_schema.serialized_schema = (
pyarrow_table.schema.serialize().to_pybytes()
)
request_template.arrow_rows = arrow_data
# Create AppendRowsStream.
append_rows_stream = AppendRowsStream(
bqstorage_write_client,
request_template,
)
# Create request with table data.
request = gapic_types.AppendRowsRequest()
request.arrow_rows.rows.serialized_record_batch = (
pyarrow_table.to_batches()[0].serialize().to_pybytes()
)
# Send request.
future = append_rows_stream.send(request)
# Wait for result.
future.result()