Diffuser des données à l'aide de l'API Storage Write
Ce document explique comment utiliser l'API BigQuery Storage Write pour diffuser des données en streaming dans BigQuery.
Dans les scénarios de streaming, les données arrivent en continu et doivent être disponibles pour les lectures avec une latence minimale. Lorsque vous utilisez l'API BigQuery Storage Write pour les charges de travail de streaming, tenez compte des garanties dont vous avez besoin :
- Si votre application n'a besoin que de la sémantique de type "au moins une fois", utilisez le flux par défaut.
- Si vous avez besoin de la sémantique de type "exactement une fois", créez un ou plusieurs flux en type commit et utilisez des décalages de flux pour garantir des écritures de type "exactement une fois".
En type commit, les données écrites dans le flux sont disponibles pour la requête dès que le serveur a confirmé la requête d'écriture. Le flux par défaut utilise également le type commit, mais n'offre pas de garanties de type "exactement une fois".
Utiliser le flux par défaut pour la sémantique de type "au moins une fois"
Si votre application peut accepter que des enregistrements en double apparaissent dans la table de destination, nous vous recommandons d'utiliser le flux par défaut pour les scénarios de streaming.
Le code suivant montre comment écrire des données dans le flux par défaut :
Java
Pour savoir comment installer et utiliser la bibliothèque cliente pour BigQuery, consultez la page sur les bibliothèques clientes BigQuery. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour Java.
Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.
Node.js
Pour savoir comment installer et utiliser la bibliothèque cliente pour BigQuery, consultez la page sur les bibliothèques clientes BigQuery.
Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.
Python
Cet exemple montre comment insérer un enregistrement avec deux champs à l'aide du flux par défaut:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
Cet exemple de code dépend du module de protocole compilé sample_data_pb2.py
. Pour créer le module compilé, exécutez la commande protoc --python_out=. sample_data.proto
, où protoc
est le compilateur de tampon de protocole. Le fichier sample_data.proto
définit le format des messages utilisés dans l'exemple Python. Pour installer le compilateur protoc
, suivez les instructions de la page Protocol Buffers : format d'échange de données de Google.
Voici le contenu du fichier sample_data.proto
:
message SampleData {
required string name = 1;
required int64 age = 2;
}
Ce script utilise le fichier entities.json
, qui contient des exemples de données de ligne à insérer dans la table BigQuery:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Utiliser le multiplexage
Vous activez le multiplexing au niveau du rédacteur de flux pour le flux par défaut uniquement. Pour activer le multiplexage Java, appelez la méthode setEnableConnectionPool
lorsque vous créez un objet StreamWriter
ou JsonStreamWriter
:
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .build();
Pour activer le multiplexage Go, consultez la section Partage de connexion (multiplexage).
Utiliser le type commit pour la sémantique de type "exactement une fois"
Si vous avez besoin d'une sémantique d'écriture de type "exactement une fois", créez un flux d'écriture en type commit. En type commit, les enregistrements sont disponibles pour la requête dès que le client reçoit l'accusé de réception du backend.
Le type commit permet d'effectuer une distribution "exactement une fois" dans un flux au moyen de décalages d'enregistrement. En utilisant des décalages d'enregistrement, l'application spécifie le prochain décalage d'ajout dans chaque appel à AppendRows
. L'opération d'écriture n'est effectuée que si la valeur de décalage correspond au décalage d'ajout suivant. Pour plus d'informations, consultez la section Gérer les décalages de flux pour obtenir une sémantique de type "exactement une fois".
Si vous ne fournissez pas de décalage, les enregistrements sont ajoutés à la fin actuelle du flux. Dans ce cas, si une requête d'ajout renvoie une erreur, toute nouvelle tentative pourrait entraîner l'affichage de l'enregistrement plusieurs fois dans le flux.
Pour utiliser le type commit, procédez comme suit :
Java
- Appelez
CreateWriteStream
pour créer un ou plusieurs flux en type commit. - Pour chaque flux, appelez
AppendRows
dans une boucle pour écrire des lots d'enregistrements. - Appelez
FinalizeWriteStream
pour chaque flux afin de libérer le flux. Après avoir appelé cette méthode, vous ne pouvez plus écrire de lignes dans le flux. Cette étape est facultative en type commit, mais permet d'éviter de dépasser la limite pour les flux actifs. Pour en savoir plus, consultez la section Limiter le taux de création de flux.
Node.js
- Appelez
createWriteStreamFullResponse
pour créer un ou plusieurs flux en type commit. - Pour chaque flux, appelez
appendRows
dans une boucle pour écrire des lots d'enregistrements. - Appelez
finalize
pour chaque flux afin de libérer le flux. Après avoir appelé cette méthode, vous ne pouvez plus écrire de lignes dans le flux. Cette étape est facultative en type commit, mais permet d'éviter de dépasser la limite pour les flux actifs. Pour en savoir plus, consultez la section Limiter le taux de création de flux.
Vous ne pouvez pas supprimer un flux explicitement. Les flux suivent la valeur TTL (Time To Live) définie par le système :
- Un flux validé a une valeur TTL de trois jours s'il ne présente pas de trafic.
- Par défaut, un flux mis en mémoire tampon a une valeur TTL de sept jours s'il ne présente pas de trafic.
Le code suivant montre comment utiliser le type commit :
Java
Pour savoir comment installer et utiliser la bibliothèque cliente pour BigQuery, consultez la page sur les bibliothèques clientes BigQuery. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour Java.
Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.
Node.js
Pour savoir comment installer et utiliser la bibliothèque cliente pour BigQuery, consultez la page sur les bibliothèques clientes BigQuery.
Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.