Cette page décrit les flux de modifications dans Spanner pour les bases de données en dialecte Google SQL et en dialecte PostgreSQL, y compris:
- Modèle de partitionnement basé sur la division
- Format et contenu des enregistrements de flux de modifications
- Syntaxe de bas niveau utilisée pour interroger ces enregistrements
- Exemple de workflow de requête
Vous utilisez l'API Spanner pour interroger directement les flux de modifications. Les applications qui utilisent Dataflow pour lire les données de flux de modifications n'ont pas besoin de travailler directement avec le modèle de données décrit ici.
Pour obtenir un guide d'introduction plus complet sur les flux de modifications, consultez la section Présentation des flux de modifications.
Partitions de flux de modifications
Lorsqu'une modification se produit sur une table surveillée par un flux de modifications, Spanner écrit un enregistrement de flux de modifications correspondant dans la base de données, de manière synchrone dans la même transaction que la modification de données. Cela signifie que si la transaction réussit, Spanner a également capturé et conservé la modification. En interne, Spanner colocalise l'enregistrement du flux de modifications et la modification des données afin qu'ils soient traités par le même serveur afin de réduire les coûts d'écriture.
Dans le cadre de la DML pour une division particulière, Spanner ajoute l'écriture à la division de données du flux de modifications correspondante dans la même transaction. En raison de cette colocalisation, les flux de modifications n'ajoutent pas de coordination supplémentaire entre les ressources de diffusion, ce qui réduit les coûts liés à la validation des transactions.
Spanner s'adapte en divisant et en fusionnant de manière dynamique les données en fonction de la charge et de la taille de la base de données, et en répartissant les divisions entre les ressources de diffusion.
Pour permettre aux flux de modifications d'écrire et de lire à l'échelle, Spanner divise et fusionne le stockage interne des flux de modifications avec les données de la base de données, ce qui évite automatiquement les points chauds. Pour permettre la lecture des enregistrements de flux de modifications en quasi-temps réel à mesure que les écritures de base de données évoluent, l'API Spanner est conçue pour qu'un flux de modifications puisse être interrogé simultanément à l'aide de partitions de flux de modifications. Les partitions de flux de modifications sont mappées sur les divisions de données de flux de modifications contenant les enregistrements de flux de modifications. Les partitions d'un flux de modifications changent de manière dynamique au fil du temps et sont corrélées à la façon dont Spanner divise et fusionne de manière dynamique les données de la base de données.
Une partition de flux de modifications contient des enregistrements pour une plage de clés immuables pour une période spécifique. Toute partition de flux de modifications peut être divisée en une ou plusieurs partitions de flux de modifications, ou fusionnée avec d'autres partitions de flux de modifications. Lorsque ces événements de fractionnement ou de fusion se produisent, des partitions enfants sont créées pour capturer les modifications de leurs plages de clés immuables respectives pour la période suivante. En plus des enregistrements de modification des données, une requête de flux de modifications renvoie des enregistrements de partition enfant pour avertir les lecteurs des nouvelles partitions de flux de modifications à interroger, ainsi que des enregistrements de battement de cœur pour indiquer la progression lorsque aucune écriture n'a été effectuée récemment.
Lorsque vous interrogez une partition de flux de modifications spécifique, les enregistrements de modification sont renvoyés par ordre de code temporel de validation. Chaque enregistrement de modification est renvoyé exactement une fois. L'ordre des enregistrements de modification n'est pas garanti entre les partitions du flux de modifications. Les enregistrements de modification pour une clé primaire spécifique ne sont renvoyés que sur une seule partition pour une période donnée.
En raison de la lignée de partition parent-enfant, afin de traiter les modifications d'une clé particulière dans l'ordre du code temporel de validation, les enregistrements renvoyés par les partitions enfants ne doivent être traités qu'après les enregistrements de toutes les partitions parents.
Modifier les fonctions de lecture du flux et la syntaxe des requêtes
GoogleSQL
Pour interroger des flux de modifications, utilisez l'API ExecuteStreamingSql
. Spanner crée automatiquement une fonction de lecture spéciale avec le flux de modifications. La fonction de lecture permet d'accéder aux enregistrements du flux de modifications. La convention d'attribution de noms des fonctions de lecture est READ_change_stream_name
.
En supposant qu'un flux de modifications SingersNameStream
existe dans la base de données, la syntaxe de requête pour GoogleSQL est la suivante:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
La fonction de lecture accepte les arguments suivants:
Nom de l'argument | Type | Obligatoire ? | Description |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obligatoire | Indique que les enregistrements dont la valeur commit_timestamp est supérieure ou égale à start_timestamp doivent être renvoyés. La valeur doit être comprise dans la période de conservation du flux de modifications, et être inférieure ou égale à l'heure actuelle, et supérieure ou égale au code temporel de la création du flux de modifications. |
end_timestamp |
TIMESTAMP |
Facultatif (par défaut: NULL ) |
Indique que les enregistrements dont la valeur commit_timestamp est inférieure ou égale à end_timestamp doivent être renvoyés. La valeur doit être comprise dans la période de conservation du flux de modifications et supérieure ou égale à start_timestamp . La requête se termine après avoir renvoyé tous les ChangeRecords jusqu'à end_timestamp ou lorsque vous mettez fin à la connexion. Si end_timestamp est défini sur NULL ou n'est pas spécifié, l'exécution de la requête se poursuit jusqu'à ce que tous les ChangeRecords soient renvoyés ou jusqu'à ce que vous mettiez fin à la connexion. |
partition_token |
STRING |
Facultatif (par défaut: NULL ) |
Spécifie la partition de flux de modifications à interroger en fonction du contenu des enregistrements de partitions enfants. Si NULL ou aucune valeur n'est spécifiée, cela signifie que le lecteur interroge le flux de modifications pour la première fois et qu'il n'a obtenu aucun jeton de partition spécifique à partir duquel effectuer des requêtes. |
heartbeat_milliseconds |
INT64 |
Obligatoire | Détermine la fréquence à laquelle un ChangeRecord de battement de cœur est renvoyé en cas d'absence de transactions validées dans cette partition.
La valeur doit être comprise entre 1,000 (une seconde) et 300,000 (cinq minutes). |
read_options |
ARRAY |
Facultatif (par défaut: NULL ) |
Ajoute des options de lecture réservées à une utilisation ultérieure. La seule valeur autorisée est NULL . |
Nous vous recommandons de créer une méthode d'assistance pour créer le texte de la requête de la fonction de lecture et y associer des paramètres, comme illustré dans l'exemple suivant.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Pour interroger des flux de modifications, utilisez l'API ExecuteStreamingSql
. Spanner crée automatiquement une fonction de lecture spéciale avec le flux de modifications. La fonction de lecture permet d'accéder aux enregistrements du flux de modifications. La convention d'attribution de noms des fonctions de lecture est spanner.read_json_change_stream_name
.
En supposant qu'un flux de modifications SingersNameStream
existe dans la base de données, la syntaxe de requête pour PostgreSQL est la suivante:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
La fonction de lecture accepte les arguments suivants:
Nom de l'argument | Type | Obligatoire ? | Description |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obligatoire | Indique que les enregistrements de modification dont commit_timestamp est supérieur ou égal à start_timestamp doivent être renvoyés. La valeur doit être comprise dans la période de conservation du flux de modifications, et être inférieure ou égale à l'heure actuelle, et supérieure ou égale au code temporel de la création du flux de modifications. |
end_timestamp |
timestamp with timezone |
Facultatif (par défaut: NULL ) |
Indique que les enregistrements de modification dont commit_timestamp est inférieur ou égal à end_timestamp doivent être renvoyés. La valeur doit être comprise dans la période de conservation du flux de modifications et supérieure ou égale à start_timestamp .
La requête se termine après avoir renvoyé tous les enregistrements de modification jusqu'à end_timestamp ou jusqu'à ce que vous mettiez fin à la connexion.
Si la valeur est NULL , l'exécution de la requête se poursuit jusqu'à ce que tous les enregistrements de modification soient renvoyés ou jusqu'à ce que vous mettiez fin à la connexion. |
partition_token |
text |
Facultatif (par défaut: NULL ) |
Spécifie la partition de flux de modifications à interroger en fonction du contenu des enregistrements de partitions enfants. Si NULL ou aucune valeur n'est spécifiée, cela signifie que le lecteur interroge le flux de modifications pour la première fois et qu'il n'a obtenu aucun jeton de partition spécifique à partir duquel effectuer des requêtes. |
heartbeat_milliseconds |
bigint |
Obligatoire | Détermine la fréquence à laquelle un ChangeRecord de battement de cœur est renvoyé lorsqu'aucune transaction n'est validée dans cette partition.
La valeur doit être comprise entre 1,000 (une seconde) et 300,000 (cinq minutes). |
null |
null |
Obligatoire | Réservé pour une utilisation ultérieure |
Nous vous recommandons de créer une méthode d'assistance pour créer le texte de la fonction de lecture et y associer des paramètres, comme illustré dans l'exemple suivant.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Modifier le format des enregistrements de flux
GoogleSQL
La fonction de lecture des flux de modifications renvoie une seule colonne ChangeRecord
de type ARRAY<STRUCT<...>>
. Dans chaque ligne, ce tableau contient toujours un seul élément.
Les éléments du tableau ont le type suivant:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Ce STRUCT
contient trois champs: data_change_record
, heartbeat_record
et child_partitions_record
, chacun de type ARRAY<STRUCT<...>>
. Dans chaque ligne renvoyée par la fonction de lecture du flux de modifications, un seul de ces trois champs contient une valeur. Les deux autres sont vides ou NULL
. Ces champs de tableau ne contiennent qu'un seul élément.
Les sections suivantes examinent chacun de ces trois types d'enregistrements.
PostgreSQL
La fonction de lecture des flux de modifications renvoie une seule colonne ChangeRecord
de type JSON
avec la structure suivante:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Cet objet comporte trois clés possibles: data_change_record
, heartbeat_record
et child_partitions_record
. Le type de valeur correspondant est JSON
. Dans chaque ligne renvoyée par la fonction de lecture du flux de modifications, une seule de ces trois clés existe.
Les sections suivantes examinent chacun de ces trois types d'enregistrements.
Enregistrements de modification des données
Un enregistrement de modification de données contient un ensemble de modifications apportées à une table avec le même type de modification (insertion, mise à jour ou suppression) validé au même horodatage de validation dans une partition de flux de modifications pour la même transaction. Plusieurs enregistrements de modification de données peuvent être renvoyés pour la même transaction sur plusieurs partitions de flux de modifications.
Tous les enregistrements de modification de données comportent des champs commit_timestamp
, server_transaction_id
et record_sequence
, qui déterminent ensemble l'ordre dans le flux de modifications pour un enregistrement de flux. Ces trois champs suffisent à dériver l'ordre des modifications et à assurer la cohérence externe.
Notez que plusieurs transactions peuvent avoir le même code temporel de validation si elles touchent des données qui ne se chevauchent pas. Le champ server_transaction_id
permet de distinguer l'ensemble de modifications (potentiellement dans les partitions de flux de modifications) émises au cours de la même transaction. L'associer aux champs record_sequence
et number_of_records_in_transaction
vous permet également de mettre en tampon et d'organiser tous les enregistrements d'une transaction particulière.
Les champs d'un enregistrement de modification de données incluent les éléments suivants:
GoogleSQL
Champ | Type | Description |
---|---|---|
commit_timestamp |
TIMESTAMP |
Indique le code temporel de la modification. |
record_sequence |
STRING |
Indique le numéro de séquence de l'enregistrement dans la transaction.
Les numéros de séquence sont uniques et augmentent de façon monotone (mais ne sont pas nécessairement contigus) dans une transaction. Triez les enregistrements pour le même server_transaction_id par record_sequence afin de reconstruire l'ordre des modifications dans la transaction.
Spanner peut optimiser cet ordre pour améliorer les performances. Il ne correspond pas toujours à l'ordre d'origine que vous fournissez. |
server_transaction_id |
STRING |
Fournit une chaîne unique qui représente la transaction dans laquelle la modification a été validée. Cette valeur ne doit être utilisée que dans le contexte du traitement des enregistrements de flux de modifications et n'est pas corrélée avec l'ID de transaction dans l'API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indique s'il s'agit du dernier enregistrement d'une transaction dans la partition actuelle. |
table_name |
STRING |
Nom de la table concernée par la modification. |
value_capture_type |
STRING |
Décrit le type de capture de valeur spécifié dans la configuration du flux de modifications lorsque cette modification a été capturée. Le type de capture de valeur peut être l'un des suivants:
Par défaut, il s'agit de |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indique le nom de la colonne, son type, s'il s'agit d'une clé primaire et la position de la colonne telle que définie dans le schéma (ordinal_position ). La première colonne d'une table du schéma a une position ordinale de 1 . Le type de colonne peut être imbriqué pour les colonnes de tableau. Le format correspond à la structure de type décrite dans la documentation de référence de l'API Spanner.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Décrit les modifications apportées, y compris les valeurs de clé primaire, les anciennes valeurs et les nouvelles valeurs des colonnes modifiées ou suivies.
La disponibilité et le contenu des anciennes et nouvelles valeurs dépendent de l'value_capture_type configuré. Les champs new_values et old_values ne contiennent que les colonnes non clés. |
mod_type |
STRING |
Décrit le type de modification. Spécifiez l'un des contrôles suivants : INSERT , UPDATE ou DELETE . |
number_of_records_in_transaction |
INT64 |
Indique le nombre d'enregistrements de modifications de données faisant partie de cette transaction dans toutes les partitions de flux de modifications. |
number_of_partitions_in_transaction |
INT64 |
Indique le nombre de partitions qui renvoient des enregistrements de modification de données pour cette transaction. |
transaction_tag |
STRING |
Indique le tag de transaction associé à cette transaction. |
is_system_transaction |
BOOL |
Indique si la transaction est une transaction système. |
PostgreSQL
Champ | Type | Description |
---|---|---|
commit_timestamp |
STRING |
Indique le code temporel de la modification. |
record_sequence |
STRING |
Indique le numéro de séquence de l'enregistrement dans la transaction.
Les numéros de séquence sont uniques et augmentent de façon monotone (mais ne sont pas nécessairement contigus) dans une transaction. Triez les enregistrements pour le même server_transaction_id par record_sequence afin de reconstruire l'ordre des modifications dans la transaction. |
server_transaction_id |
STRING |
Fournit une chaîne unique qui représente la transaction dans laquelle la modification a été validée. La valeur ne doit être utilisée que dans le contexte du traitement des enregistrements de flux de modifications et n'est pas corrélée avec l'ID de transaction dans l'API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indique s'il s'agit du dernier enregistrement d'une transaction dans la partition actuelle. |
table_name |
STRING |
Indique le nom de la table concernée par la modification. |
value_capture_type |
STRING |
Décrit le type de capture de valeur spécifié dans la configuration du flux de modifications lorsque cette modification a été capturée. Le type de capture de valeur peut être l'un des suivants:
Par défaut, il s'agit de |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indique le nom de la colonne, le type de colonne, s'il s'agit d'une clé primaire et la position de la colonne telle que définie dans le schéma (ordinal_position ). La première colonne d'une table du schéma a une position ordinale de 1 . Le type de colonne peut être imbriqué pour les colonnes de tableau. Le format correspond à la structure de type décrite dans la documentation de référence de l'API Spanner.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Décrit les modifications apportées, y compris les valeurs de clé primaire, les anciennes valeurs et les nouvelles valeurs des colonnes modifiées ou suivies. La disponibilité et le contenu des anciennes et nouvelles valeurs dépendent de l'value_capture_type configuré. Les champs new_values et old_values ne contiennent que les colonnes non clés.
|
mod_type |
STRING |
Décrit le type de modification. Spécifiez l'un des contrôles suivants : INSERT , UPDATE ou DELETE . |
number_of_records_in_transaction |
INT64 |
Indique le nombre d'enregistrements de modifications de données faisant partie de cette transaction dans toutes les partitions de flux de modifications. |
number_of_partitions_in_transaction |
NUMBER |
Indique le nombre de partitions qui renvoient des enregistrements de modification de données pour cette transaction. |
transaction_tag |
STRING |
Indique le tag de transaction associé à cette transaction. |
is_system_transaction |
BOOLEAN |
Indique si la transaction est une transaction système. |
Exemple d'enregistrement de modification des données
Vous trouverez ci-dessous deux exemples d'enregistrements de modification de données. Ils décrivent une seule transaction impliquant un transfert entre deux comptes. Les deux comptes se trouvent dans des partitions de flux de modifications distinctes.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
L'enregistrement de modification de données suivant est un exemple d'enregistrement avec le type de capture de valeur NEW_VALUES
. Notez que seules les nouvelles valeurs sont renseignées.
Seule la colonne LastUpdate
a été modifiée. Seule cette colonne a donc été renvoyée.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
L'enregistrement de modification de données suivant est un exemple d'enregistrement avec le type de capture de valeur NEW_ROW
. Seule la colonne LastUpdate
a été modifiée, mais toutes les colonnes suivies sont renvoyées.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
L'enregistrement de modification de données suivant est un exemple d'enregistrement avec le type de capture de valeur NEW_ROW_AND_OLD_VALUES
. Seule la colonne LastUpdate
a été modifiée, mais toutes les colonnes suivies sont renvoyées. Ce type de capture de valeur capture la nouvelle valeur et l'ancienne valeur de LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Enregistrements de pulsation
Lorsqu'un enregistrement de battement de cœur est renvoyé, il indique que toutes les modifications avec un commit_timestamp
inférieur ou égal à l'timestamp
de l'enregistrement de battement de cœur ont été renvoyées, et que les futurs enregistrements de données de cette partition doivent avoir des codes temporels de validation plus élevés que ceux renvoyés par l'enregistrement de battement de cœur. Les enregistrements de battement de cœur sont renvoyés lorsqu'aucune modification de données n'est écrite dans une partition. Lorsque des modifications de données sont écrites dans la partition, data_change_record.commit_timestamp
peut être utilisé à la place de heartbeat_record.timestamp
pour indiquer que le lecteur progresse dans la lecture de la partition.
Vous pouvez utiliser les enregistrements de battement de cœur renvoyés sur les partitions pour synchroniser les lecteurs sur toutes les partitions. Une fois que tous les lecteurs ont reçu un battement de cœur supérieur ou égal à un code temporel A
, ou qu'ils ont reçu des données ou des enregistrements de partition enfant supérieurs ou égaux à ce code temporel A
, ils savent qu'ils ont reçu tous les enregistrements validés à ce code temporel A
ou avant, et peuvent commencer à traiter les enregistrements mis en mémoire tampon (par exemple, en triant les enregistrements interpartitions par code temporel et en les regroupant par server_transaction_id
).
Un enregistrement de battement de cœur ne contient qu'un seul champ:
GoogleSQL
Champ | Type | Description |
---|---|---|
timestamp |
TIMESTAMP |
Indique l'horodatage de l'enregistrement de pulsation. |
PostgreSQL
Champ | Type | Description |
---|---|---|
timestamp |
STRING |
Indique l'horodatage de l'enregistrement de pulsation. |
Exemple d'enregistrement de pulsation
Exemple d'enregistrement de battement de cœur, indiquant que tous les enregistrements dont le code temporel est inférieur ou égal à celui de cet enregistrement ont été renvoyés:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Enregistrements de partition enfant
Les enregistrements de partition enfant renvoient des informations sur les partitions enfants: leurs jetons de partition, les jetons de leurs partitions parents et l'start_timestamp
qui représente le code temporel le plus ancien pour lequel les partitions enfants contiennent des enregistrements de modification. Les enregistrements dont les codes temporels de validation sont immédiatement antérieurs à child_partitions_record.start_timestamp
sont renvoyés dans la partition actuelle. Après avoir renvoyé tous les enregistrements de partition enfant pour cette partition, cette requête renvoie un état de réussite, indiquant que tous les enregistrements ont été renvoyés pour cette partition.
Les champs d'un enregistrement de partition enfant incluent les éléments suivants:
GoogleSQL
Champ | Type | Description |
---|---|---|
start_timestamp |
TIMESTAMP |
Indique que les enregistrements de modification de données renvoyés par les partitions enfants de cet enregistrement de partition enfant ont un code temporel de validation supérieur ou égal à start_timestamp . Lorsqu'une requête interroge une partition enfant, elle doit spécifier le jeton de partition enfant et un start_timestamp supérieur ou égal à child_partitions_token.start_timestamp . Tous les enregistrements de partitions enfants renvoyés par une partition ont le même start_timestamp , et le code temporel se situe toujours entre le start_timestamp et le end_timestamp spécifiés dans la requête. |
record_sequence |
STRING |
Indique un numéro de séquence croissant de manière monotone qui peut être utilisé pour définir l'ordre des enregistrements de partition enfant lorsqu'il existe plusieurs enregistrements de partition enfant renvoyés avec le même start_timestamp dans une partition donnée. Le jeton de partition, start_timestamp et record_sequence identifient de manière unique un enregistrement de partition enfant.
|
child_partitions |
[ { "token" : "STRING", "parent_partition_tokens" : ["STRING"] } ] |
Renvoie un ensemble de partitions enfants et les informations associées. Cela inclut la chaîne de jeton de partition utilisée pour identifier la partition enfant dans les requêtes, ainsi que les jetons de ses partitions parentes. |
PostgreSQL
Champ | Type | Description |
---|---|---|
start_timestamp |
STRING |
Indique que les enregistrements de modification de données renvoyés par les partitions enfants de cet enregistrement de partition enfant ont un code temporel de validation supérieur ou égal à start_timestamp . Lorsqu'une requête interroge une partition enfant, elle doit spécifier le jeton de partition enfant et un start_timestamp supérieur ou égal à child_partitions_token.start_timestamp . Tous les enregistrements de partitions enfants renvoyés par une partition ont le même start_timestamp , et le code temporel se situe toujours entre les valeurs start_timestamp et end_timestamp spécifiées dans la requête.
|
record_sequence |
STRING |
Indique un numéro de séquence croissant de manière monotone qui peut être utilisé pour définir l'ordre des enregistrements de partition enfant lorsqu'il existe plusieurs enregistrements de partition enfant renvoyés avec le même start_timestamp dans une partition donnée. Le jeton de partition, start_timestamp et record_sequence identifient de manière unique un enregistrement de partition enfant.
|
child_partitions |
[ { "token": "STRING", "parent_partition_tokens": ["STRING"], }, [...] ] |
Renvoie un tableau de partitions enfants et des informations associées. Cela inclut la chaîne de jeton de partition utilisée pour identifier la partition enfant dans les requêtes, ainsi que les jetons de ses partitions parentes. |
Exemple d'enregistrement de partition enfant
Voici un exemple d'enregistrement de partition enfant:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Workflow des requêtes de flux de modifications
Exécutez des requêtes de flux de modifications à l'aide de l'API ExecuteStreamingSql
, avec une transaction en lecture seule à usage unique et une limite de code temporel forte. La fonction de lecture du flux de modifications vous permet de spécifier les start_timestamp
et end_timestamp
pour la période d'intérêt. Tous les enregistrements de modifications au cours de la période de conservation sont accessibles à l'aide de la limite de code temporel en lecture seule.
Toutes les autres TransactionOptions
sont non valides pour les requêtes de flux de modifications. De plus, si TransactionOptions.read_only.return_read_timestamp
est défini sur true
, une valeur spéciale de kint64max - 1
est renvoyée dans le message Transaction
qui décrit la transaction, au lieu d'un code temporel de lecture valide. Cette valeur spéciale doit être supprimée et ne doit pas être utilisée pour les requêtes ultérieures.
Chaque requête de flux de modifications peut renvoyer un nombre illimité de lignes, chacune contenant un enregistrement de modification de données, un enregistrement de battement de cœur ou un enregistrement de partitions enfants. Vous n'avez pas besoin de définir d'échéance pour la demande.
Exemple de workflow d'interrogation de flux de modifications
Le workflow de requête de flux commence par l'émission de la toute première requête de flux de modifications en spécifiant partition_token
à NULL
. La requête doit spécifier la fonction de lecture du flux de modifications, le code temporel de début et de fin de l'intérêt, ainsi que l'intervalle de battement de cœur. Lorsque end_timestamp
est NULL
, la requête continue de renvoyer les modifications de données jusqu'à la fin de la partition.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Traitez les enregistrements de données de cette requête jusqu'à ce que tous les enregistrements de partition enfant soient renvoyés. Dans l'exemple suivant, deux enregistrements de partition enfant et trois jetons de partition sont renvoyés, puis la requête se termine. Les enregistrements de partition enfant d'une requête spécifique partagent toujours le même start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Pour traiter les modifications après 2022-05-01T09:00:01Z
, créez trois nouvelles requêtes et exécutez-les en parallèle. Utilisées ensemble, les trois requêtes renvoient les modifications de données pour la même plage de clés que leur parent. Définissez toujours le start_timestamp
sur le start_timestamp
dans le même enregistrement de partition enfant et utilisez le même end_timestamp
et le même intervalle de battement cardiaque pour traiter les enregistrements de manière cohérente dans toutes les requêtes.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
La requête sur child_token_2
se termine après avoir renvoyé un autre enregistrement de partition enfant. Cet enregistrement indique qu'une nouvelle partition couvre les modifications pour child_token_2
et child_token_3
à partir de 2022-05-01T09:30:15Z
. La requête sur child_token_3
renvoie exactement le même enregistrement, car les deux sont les partitions parents du nouvel child_token_4
. Pour garantir un traitement strict des enregistrements de données pour une clé particulière, la requête sur child_token_4
doit commencer une fois que tous les parents ont terminé. Dans ce cas, les parents sont child_token_2
et child_token_3
. Ne créez qu'une seule requête pour chaque jeton de partition enfant. La conception du workflow de requête doit désigner un parent pour attendre et planifier la requête sur child_token_4
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Vous trouverez des exemples de gestion et d'analyse des enregistrements de flux de modifications dans le connecteur Dataflow SpannerIO d'Apache Beam sur GitHub.