Transmitir dados usando a API StorageWrite
Neste documento, descrevemos como usar a API BigQuery Storage Write para fazer streaming de dados para o BigQuery.
Em cenários de streaming, os dados chegam continuamente e precisam estar disponíveis para leituras com latência mínima. Ao usar a API BigQuery Storage write para cargas de trabalho de streaming, pense em quais garantias são necessárias:
- Se o app só precisa da semântica do tipo "pelo menos uma vez", use o stream padrão.
- Se você precisar de semântica exatamente uma vez, crie um ou mais streams no modo confirmado e use deslocamentos de stream para garantir gravações exatamente uma vez.
No modo confirmado, os dados gravados no stream ficam disponíveis para consulta assim que o servidor reconhece a solicitação de gravação. O stream padrão também usa o modo confirmado, mas não oferece garantias exatas.
Usar o fluxo padrão para semânticas do tipo "pelo menos uma vez"
Se o aplicativo puder aceitar a possibilidade de registros duplicados aparecerem na tabela de destino, recomendamos o uso do stream padrão para cenários de streaming.
O código a seguir mostra como gravar dados no stream padrão:
Java
Para saber como instalar e usar a biblioteca de cliente do BigQuery, consulte Bibliotecas do cliente do BigQuery. Para mais informações, consulte a documentação de referência da API BigQuery Java.
Para autenticar no BigQuery, configure o Application Default Credentials. Para mais informações, acesse Configurar a autenticação para bibliotecas de cliente.
Node.js
Para saber como instalar e usar a biblioteca de cliente do BigQuery, consulte Bibliotecas do cliente do BigQuery.
Para autenticar no BigQuery, configure o Application Default Credentials. Para mais informações, acesse Configurar a autenticação para bibliotecas de cliente.
Python
Este exemplo mostra como inserir um registro com dois campos usando o fluxo padrão:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
O exemplo de código depende do módulo de protocolo compilado sample_data_pb2.py
. Para criar o módulo compilado, execute o
comando protoc --python_out=. sample_data.proto
, em que protoc
é o
compilador de buffer de protocolo. O arquivo sample_data.proto
define o formato
das mensagens usadas no exemplo do Python. Para instalar o compilador protoc
, siga as instruções em Buffers de protocolo: o formato de troca de dados do Google.
Confira o conteúdo do arquivo sample_data.proto
:
message SampleData {
required string name = 1;
required int64 age = 2;
}
Este script consome o arquivo entities.json
, que contém dados de amostra de linhas para serem inseridos na tabela do BigQuery:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Usar multiplexação
Você ativa a multiplexação no nível do gravador de stream apenas para o stream padrão. Para ativar a multiplexação no Java, chame o método setEnableConnectionPool
ao criar um objeto StreamWriter
ou JsonStreamWriter
:
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .build();
Para ativar a multiplexação no Go, consulte Compartilhamento de conexão (multiplexação).
Usar o modo confirmado para a semântica exatamente uma vez
Se você precisar de semântica de gravação única, crie um stream de gravação no modo comprometido. No modo confirmado, os registros estão disponíveis para consulta assim que o cliente recebe a confirmação do back-end.
O modo confirmado fornece uma entrega única uma vez em um stream por meio de deslocamentos de registro. Ao usar deslocamentos de registro, o aplicativo especifica o próximo
deslocamento de anexação em cada chamada para AppendRows
. A operação de gravação só será executada se o valor de ajuste corresponder ao próximo deslocamento de anexação. Para mais
informações, consulte
Gerenciar deslocamentos de stream para conseguir semânticas exatamente uma vez.
Se você não fornecer um deslocamento, os registros serão anexados à extremidade atual do fluxo. Nesse caso, se um pedido de anexo retornar um erro, uma nova tentativa poderá fazer com que o registro apareça mais de uma vez no stream.
Para usar o tipo de compromisso, siga estas etapas:
Java
- Chame
CreateWriteStream
para criar um ou mais streams no modo confirmado. - Para cada stream, chame
AppendRows
em um loop para gravar lotes de registros. - Chame
FinalizeWriteStream
para cada stream para liberar o stream. Depois de chamar esse método, não será possível gravar mais linhas no stream. Essa etapa é opcional no modo confirmado, mas ajuda a evitar exceder o limite de fluxos ativos. Para mais informações, consulte Limitar a taxa de criação de stream.
Node.js
- Chame
createWriteStreamFullResponse
para criar um ou mais streams no modo confirmado. - Para cada stream, chame
appendRows
em um loop para gravar lotes de registros. - Chame
finalize
para cada stream para liberar o stream. Depois de chamar esse método, não será possível gravar mais linhas no stream. Essa etapa é opcional no modo confirmado, mas ajuda a evitar exceder o limite de fluxos ativos. Para mais informações, consulte Limitar a taxa de criação de stream.
Não é possível excluir um stream explicitamente. Os streams seguem o time to live (TTL) definido pelo sistema:
- Se não houver tráfego no stream, ele será transmitido por TTL em três dias.
- Por padrão, um stream em buffer tem um TTL de sete dias se não houver tráfego nele.
O código a seguir mostra como usar o modo confirmado.
Java
Para saber como instalar e usar a biblioteca de cliente do BigQuery, consulte Bibliotecas do cliente do BigQuery. Para mais informações, consulte a documentação de referência da API BigQuery Java.
Para autenticar no BigQuery, configure o Application Default Credentials. Para mais informações, acesse Configurar a autenticação para bibliotecas de cliente.
Node.js
Para saber como instalar e usar a biblioteca de cliente do BigQuery, consulte Bibliotecas do cliente do BigQuery.
Para autenticar no BigQuery, configure o Application Default Credentials. Para mais informações, acesse Configurar a autenticação para bibliotecas de cliente.