In questa pagina vengono descritti in dettaglio i seguenti attributi delle modifiche in tempo reale:
- Il suo modello di partizionamento basato su suddivisione
- Il formato e il contenuto dei record delle modifiche in tempo reale
- La sintassi di basso livello utilizzata per eseguire query su questi record
- Esempio di flusso di lavoro delle query
Le informazioni su questa pagina sono più pertinenti per l'utilizzo dell'API Spanner per eseguire direttamente query sulle modifiche in tempo reale. Le applicazioni che invece utilizzano Dataflow per leggere i dati delle modifiche in tempo reale non devono lavorare direttamente con il modello dei dati descritto qui.
Per una guida introduttiva più ampia ai flussi di modifiche, consulta la Panoramica delle modifiche in tempo reale.
Partizioni delle modifiche in tempo reale
Quando si verifica una modifica in una tabella controllata da un flusso di modifiche, Spanner scrive un record di modifiche in tempo reale corrispondente nel database, in modo sincrono nella stessa transazione della modifica dei dati. Ciò garantisce che, se la transazione ha esito positivo, Spanner ha anche acquisito e mantenuto la modifica correttamente. Internamente, Spanner colloca il record di modifiche in tempo reale e le modifiche ai dati in modo che vengano elaborati dallo stesso server per ridurre al minimo l'overhead delle operazioni di scrittura.
Come parte del DML a una determinata suddivisione, Spanner aggiunge la scrittura ai corrispondenti dati di modifiche in tempo reale nella stessa transazione. A causa di questa colocation, i flussi di modifiche non aggiungono ulteriore coordinamento tra le risorse di gestione, riducendo al minimo l'overhead del commit della transazione.
Spanner offre scalabilità dividendo e unendo dinamicamente i dati in base al carico e alle dimensioni del database e distribuendo le suddivisioni tra le risorse di gestione.
Per consentire la scrittura e la lettura delle modifiche in tempo reale sulla scalabilità, Spanner divide e unisce lo spazio di archiviazione interno delle modifiche in tempo reale con i dati del database, evitando automaticamente gli hotspot. Per supportare la lettura dei record di modifiche in tempo reale quasi in tempo reale durante la scalabilità delle scritture del database, l'API Spanner è progettata per eseguire query su un flusso di modifiche contemporaneamente utilizzando le partizioni delle modifiche in tempo reale. Le partizioni delle modifiche in tempo reale corrispondono alle suddivisioni dei dati delle modifiche in tempo reale che contengono i record delle modifiche in tempo reale. Le partizioni di un flusso di modifiche cambiano dinamicamente nel tempo e sono correlate al modo in cui Spanner suddivide e unisce dinamicamente i dati del database.
Una partizione di modifiche in tempo reale contiene i record per un intervallo di chiavi immutabile per un intervallo di tempo specifico. Qualsiasi partizione di modifiche in tempo reale può essere suddivisa in una o più partizioni di modifiche in tempo reale oppure unite ad altre partizioni di flusso di modifiche. Quando si verificano questi eventi di suddivisione o unione, vengono create partizioni figlio per acquisire le modifiche per i rispettivi intervalli di chiavi immutabili per l'intervallo di tempo successivo. Oltre ai record delle modifiche ai dati, una query di modifiche in tempo reale restituisce record di partizione figlio per notificare ai lettori le nuove partizioni di modifiche in tempo reale su cui è necessario eseguire query, nonché record di heartbeat per indicare l'avanzamento del progresso quando di recente non sono state eseguite scritture.
Quando si esegue una query su una particolare partizione di modifiche in tempo reale, i record delle modifiche vengono restituiti in ordine di timestamp di commit. Ogni record di modifica viene restituito esattamente una volta. Tra le partizioni delle modifiche in tempo reale, l'ordine dei record delle modifiche non è garantito. I record delle modifiche per una determinata chiave primaria vengono restituiti solo su una partizione per un determinato intervallo di tempo.
A causa della derivazione della partizione padre-figlio, al fine di elaborare le modifiche per una particolare chiave nell'ordine del timestamp di commit, i record restituiti dalle partizioni figlio devono essere elaborati solo dopo che sono stati elaborati i record di tutte le partizioni padre.
Funzioni di lettura delle modifiche in tempo reale e sintassi delle query
GoogleSQL
Puoi eseguire query sulle modifiche in tempo reale utilizzando l'API ExecuteStreamingSql
. Spanner crea automaticamente una funzione
di lettura speciale insieme alle modifiche. La funzione di lettura fornisce accesso
ai record del flusso di modifiche. La convenzione di denominazione delle funzioni di lettura è READ_change_stream_name
.
Supponendo che nel database esista un flusso di modifiche SingersNameStream
, la sintassi della query per GoogleSQL è la seguente:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
La funzione di lettura accetta i seguenti argomenti:
Nome argomento | Tipo | Obbligatorio? | Descrizione |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obbligatorio | Specifica che devono essere restituiti i record con commit_timestamp maggiore o uguale a start_timestamp . Il valore deve trovarsi all'interno del periodo di conservazione delle modifiche in tempo reale, deve essere inferiore o uguale all'ora attuale e maggiore o uguale al timestamp della creazione della modifica in tempo reale. |
end_timestamp |
TIMESTAMP |
Facoltativo (valore predefinito: NULL ) |
Specifica che i record con commit_timestamp minore o uguale a end_timestamp devono essere restituiti. Il valore deve essere compreso nel periodo di conservazione delle modifiche ed essere maggiore o uguale a start_timestamp . La query
termina dopo aver restituito tutti i ChangeRecord fino a end_timestamp
o dopo che l'utente ha terminato la connessione. Se NULL o non
specificato, la query viene eseguita fino a quando non vengono restituiti tutti i ChangeRecord o
fino a quando l'utente non termina la connessione. |
partition_token |
STRING |
Facoltativo (valore predefinito: NULL ) |
Specifica la partizione di modifiche in tempo reale su cui eseguire la query, in base al contenuto dei record delle partizioni figlio. Se NULL o non specificato, significa che il lettore esegue query sul flusso di modifiche per la prima volta e non ha ottenuto alcun token di partizione specifico da cui eseguire la query. |
heartbeat_milliseconds |
INT64 |
Obbligatorio | Determina la frequenza con cui viene restituito un ChangeRecord di heartbeat nel caso in cui non ci siano transazioni impegnate in questa partizione.
Il valore deve essere compreso tra 1,000 (un secondo) e 30,0000 (cinque
minuti). |
read_options |
ARRAY |
Facoltativo (valore predefinito: NULL ) |
Ulteriori opzioni di lettura riservate per uso futuro. Al momento, l'unico valore consentito è NULL . |
Ti consigliamo di utilizzare un metodo pratico per creare il testo della query della funzione di lettura e di associarvi i parametri, come mostrato nell'esempio seguente.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Puoi eseguire query sulle modifiche in tempo reale utilizzando
l'API ExecuteStreamingSql
.
Spanner crea automaticamente una funzione
di lettura speciale insieme alle modifiche. La funzione di lettura fornisce accesso
ai record del flusso di modifiche. La convenzione di denominazione delle funzioni di lettura è spanner.read_json_change_stream_name
.
Supponendo che nel database esista un flusso di modifiche SingersNameStream
, la sintassi delle query per PostgreSQL è la seguente:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
La funzione di lettura accetta i seguenti argomenti:
Nome argomento | Tipo | Obbligatorio? | Descrizione |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obbligatorio | Specifica che è necessario restituire i record di modifica con commit_timestamp maggiore o uguale a start_timestamp . Il valore deve trovarsi all'interno del periodo di conservazione delle modifiche in tempo reale, deve essere inferiore o uguale all'ora attuale e maggiore o uguale al timestamp della creazione della modifica in tempo reale. |
end_timestamp |
timestamp with timezone |
Facoltativo (valore predefinito: NULL ) |
Specifica che devono essere restituiti i record di modifica con commit_timestamp minore o uguale a end_timestamp . Il valore deve essere compreso nel periodo di conservazione delle modifiche ed essere maggiore o uguale a start_timestamp .
La query viene completata dopo la restituzione di tutti i record delle modifiche fino a
end_timestamp o dopo che l'utente ha terminato la connessione.
Se NULL , la query viene eseguita finché non vengono restituiti tutti i record delle modifiche o finché l'utente non termina la connessione. |
partition_token |
text |
Facoltativo (valore predefinito: NULL ) |
Specifica la partizione di modifiche in tempo reale su cui eseguire la query, in base al contenuto dei record delle partizioni figlio. Se NULL o non specificato, significa che il lettore esegue query sul flusso di modifiche per la prima volta e non ha ottenuto alcun token di partizione specifico da cui eseguire la query. |
heartbeat_milliseconds |
bigint |
Obbligatorio | Determina la frequenza con cui viene restituito un ChangeRecord heartbeat nel caso in cui non ci siano transazioni impegnate in questa partizione.
Il valore deve essere compreso tra 1,000 (un secondo) e 300,000 (cinque
minuti). |
null |
null |
Obbligatorio | Riservato per un uso futuro |
Consigliamo di utilizzare un metodo pratico per creare il testo della funzione di lettura e di associarvi i parametri, come mostrato nell'esempio seguente.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Formato di record delle modifiche in tempo reale
GoogleSQL
La funzione di lettura delle modifiche in tempo reale restituisce una singola colonna ChangeRecord
di tipo
ARRAY<STRUCT<...>>
. In ogni riga, questo array contiene sempre un singolo elemento.
Gli elementi array hanno il seguente tipo:
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
Lo struct contiene tre campi: data_change_record
,
heartbeat_record
e child_partitions_record
, ciascuno di tipo
ARRAY<STRUCT<...>>
. In ogni riga restituita dalla funzione di lettura delle modifiche in tempo reale, solo uno di questi tre campi contiene un valore; gli altri due sono vuoti o NULL
. Questi campi di array contengono al massimo un elemento.
Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.
PostgreSQL
La funzione di lettura delle modifiche in tempo reale restituisce una singola colonna ChangeRecord
di
tipo JSON
con la seguente struttura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Esistono tre possibili chiavi in questo oggetto: data_change_record
,
heartbeat_record
e child_partitions_record
, il tipo di valore corrispondente
è JSON
.
In ogni riga restituita dalla funzione di lettura delle modifiche in tempo reale, è presente solo una di queste tre chiavi.
Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.
Record delle modifiche dei dati
Un record di modifiche dei dati contiene un insieme di modifiche a una tabella con lo stesso tipo di modifica (insert, update o delete) con commit nello stesso timestamp di commit in una partizione di modifiche in tempo reale per la stessa transazione. È possibile restituire più record delle modifiche dei dati per la stessa transazione in più partizioni di modifiche in tempo reale.
Tutti i record delle modifiche dei dati hanno campi commit_timestamp
, server_transaction_id
e record_sequence
, che insieme determinano l'ordine nel flusso di modifiche per un record di flusso. Questi tre campi sono sufficienti per ricavare
l'ordine delle modifiche e fornire coerenza esterna.
Tieni presente che più transazioni possono avere lo stesso timestamp di commit se
riguardano dati non sovrapposti. Il campo server_transaction_id
offre la possibilità di distinguere quale insieme di modifiche (potenzialmente
tra le partizioni di modifiche in tempo reale) è stata apportata all'interno della stessa
transazione. L'accoppiamento con i campi record_sequence
e
number_of_records_in_transaction
consente anche di eseguire il buffering e di ordinare
tutti i record di una determinata transazione.
I campi di un record di modifica dei dati includono quanto segue:
GoogleSQL
Campo | Tipo | Descrizione |
---|---|---|
commit_timestamp |
TIMESTAMP |
Timestamp in cui è stato eseguito il commit della modifica. |
record_sequence |
STRING |
Il numero di sequenza per il record all'interno della transazione. I numeri di sequenza sono garantiti come univoci e in aumento monotonico (ma non necessariamente contigui) all'interno di una transazione. Ordina i record per lo stesso server_transaction_id in base a record_sequence per ricostruire l'ordine delle modifiche all'interno della transazione. |
server_transaction_id |
STRING |
Una stringa univoca a livello globale che rappresenta la transazione in cui è stata eseguita la modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record delle modifiche in tempo reale e non è correlato all'ID transazione nell'API Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica se questo è l'ultimo record per una transazione nella partizione corrente. |
table_name |
STRING |
Nome della tabella interessata dalla modifica. |
value_capture_type |
STRING |
Descrive il tipo di acquisizione del valore specificato nella configurazione del flusso di modifiche al momento dell'acquisizione della modifica. Il tipo di acquisizione valore può essere |
column_types |
ARRAY<STRUCT< |
Il nome della colonna, il tipo di colonna (se si tratta di una chiave primaria) e la posizione della colonna come definita nello schema ("ordinal_position"). La prima colonna di una tabella nello schema avrebbe la posizione ordinale "1". Il tipo di colonna può essere nidificato per le colonne di array. Il formato corrisponde alla struttura del tipo descritta nel riferimento API Spanner. |
mods |
ARRAY<STRUCT< |
Descrive le modifiche apportate, tra cui le coppie chiave-valore primarie, i vecchi valori e i nuovi valori delle colonne modificate o monitorate.
La disponibilità e il contenuto dei valori vecchi e nuovi dipenderanno dal valore configurato value_capture_type. I campi new_values e old_values contengono solo le colonne non chiave. |
mod_type |
STRING |
Descrive il tipo di modifica. Uno tra INSERT , UPDATE o
DELETE . |
number_of_records_in_transaction |
INT64 |
Il numero di record delle modifiche dei dati che fanno parte di questa transazione in tutte le partizioni delle modifiche in tempo reale. |
number_of_partitions_in_transaction |
INT64 |
Il numero di partizioni che restituirà i record delle modifiche dei dati per questa transazione. |
transaction_tag |
STRING |
Tag transazione associato a questa transazione. |
is_system_transaction |
BOOL |
Indica se la transazione è di sistema. |
PostgreSQL
Campo | Tipo | Descrizione |
---|---|---|
commit_timestamp |
STRING |
Il timestamp in cui è stato eseguito il commit della modifica. |
record_sequence |
STRING |
Il numero di sequenza per il record all'interno della transazione. I numeri di sequenza sono garantiti come univoci e in aumento monotonico (ma non necessariamente contigui) all'interno di una transazione. Ordina i record per lo stesso "server_transaction_id" in base a "record_Sequence" per ricostruire l'ordine delle modifiche all'interno della transazione. |
server_transaction_id |
STRING |
Una stringa univoca a livello globale che rappresenta la transazione in cui è stata eseguita la modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record delle modifiche in tempo reale e non è correlato all'ID transazione nell'API Spanner |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica se questo è l'ultimo record per una transazione nella partizione corrente. |
table_name |
STRING |
Nome della tabella interessata dalla modifica. |
value_capture_type |
STRING |
Descrive il tipo di acquisizione del valore specificato nella configurazione del flusso di modifiche al momento dell'acquisizione della modifica. Il tipo di acquisizione valore può essere |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
Il nome della colonna, il tipo di colonna (se si tratta di una chiave primaria) e la posizione della colonna come definita nello schema ("ordinal_position"). La prima colonna di una tabella nello schema avrebbe la posizione ordinale "1". Il tipo di colonna può essere nidificato per le colonne di array. Il formato corrisponde alla struttura del tipo descritta nel riferimento API Spanner. |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
Descrive le modifiche apportate, tra cui le coppie chiave-valore primarie, i vecchi valori e i nuovi valori delle colonne modificate o monitorate. La disponibilità e il contenuto dei valori vecchi e nuovi dipenderanno dal valore configurato value_capture_type. I campi new_values e
old_values contengono solo le colonne non chiave.
|
mod_type |
STRING |
Descrive il tipo di modifica. Uno tra INSERT , UPDATE o
DELETE . |
number_of_records_in_transaction |
INT64 |
Il numero di record delle modifiche dei dati che fanno parte di questa transazione in tutte le partizioni delle modifiche in tempo reale. |
number_of_partitions_in_transaction |
NUMBER |
Il numero di partizioni che restituirà i record delle modifiche dei dati per questa transazione. |
transaction_tag |
STRING |
Tag transazione associato a questa transazione. |
is_system_transaction |
BOOLEAN |
Indica se la transazione è di sistema. |
Segue un paio di record di modifiche dei dati di esempio. Descrivono una singola transazione in cui si verifica un trasferimento tra due account. Tieni presente che i due account sono in partizioni di modifiche in tempo reale separate.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
Il seguente record di modifiche dati è un esempio di record con il tipo di acquisizione valori "NEW_VALUES"
. Tieni presente che vengono compilati solo i nuovi valori.
È stata modificata solo la colonna "LastUpdate"
, quindi è stata restituita solo questa colonna.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Il seguente record di modifiche dati è un esempio di record con il tipo di acquisizione valori "NEW_ROW"
. È stata modificata solo la colonna "LastUpdate"
, ma vengono restituite tutte le colonne monitorate.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Il seguente record di modifiche dati è un esempio di record con il tipo di acquisizione valori "NEW_ROW_AND_OLD_VALUES"
. È stata modificata solo la colonna "LastUpdate"
, ma vengono restituite tutte le colonne monitorate. Questo tipo di acquisizione del valore acquisisce il nuovo valore e il valore precedente di LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Record di heartbeat
Quando viene restituito un record di heartbeat, indica che sono state restituite tutte le modifiche con
commit_timestamp
inferiore o uguale al
timestamp
del record di heartbeat e i record di dati futuri in questa
partizione devono avere timestamp di commit più elevati rispetto a quelli restituiti dal
record heartbeat. I record di heartbeat vengono restituiti quando non vengono apportate modifiche ai dati scritti in una partizione. Quando sono presenti modifiche ai dati scritte
nella partizione, è possibile utilizzare data_change_record.commit_timestamp
al posto di heartbeat_record.timestamp
per indicare che il lettore sta avanzando
nella lettura della partizione.
Puoi utilizzare i record di heartbeat restituiti sulle partizioni per sincronizzare i lettori in tutte le partizioni. Una volta che tutti i lettori hanno ricevuto un battito cardiaco maggiore o uguale a un timestamp A
o hanno ricevuto record di dati o di partizione secondaria maggiore o uguale al timestamp A
, i lettori sanno di aver ricevuto tutti i record confermati a quel timestamp A
o prima di quel timestamp e possono iniziare a elaborare i record presenti nel buffer, ad esempio ordinando i record interpartizionati per timestamp e raggruppandoli per server_transaction_id
.
Un record di heartbeat contiene un solo campo:
GoogleSQL
Campo | Tipo | Descrizione |
---|---|---|
timestamp |
TIMESTAMP |
Timestamp del record di heartbeat. |
PostgreSQL
Campo | Tipo | Descrizione |
---|---|---|
timestamp |
STRING |
Timestamp del record di heartbeat. |
Un record di heartbeat di esempio, in cui viene comunicato che sono stati restituiti tutti i record con timestamp inferiori o uguali al timestamp di questo record:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Record di partizioni secondarie
Un record delle partizioni figlio restituisce informazioni sulle partizioni figlio: i token di partizione, i token delle partizioni padre e il start_timestamp
che rappresenta il primo timestamp per cui le partizioni figlio contengono record delle modifiche. I record i cui timestamp di commit sono immediatamente precedenti a child_partitions_record.start_timestamp
vengono restituiti nella partizione attuale. Dopo aver restituito tutti i record delle partizioni figlio per questa partizione, questa query restituirà uno stato di operazione riuscita, a indicare che tutti i record sono stati restituiti per questa partizione.
I campi di un record di partizioni figlio includono quanto segue:
GoogleSQL
Campo | Tipo | Descrizione |
---|---|---|
start_timestamp |
TIMESTAMP |
I record delle modifiche dei dati restituiti dalle partizioni figlio in questo record di partizione figlio hanno un timestamp di commit maggiore o uguale a start_timestamp . Quando esegui una query su una partizione secondaria, la query deve specificare il token di partizione figlio e un valore start_timestamp maggiore o uguale a child_partitions_token.start_timestamp . Tutti i record delle partizioni secondarie restituiti da una partizione hanno lo stesso start_timestamp e il timestamp è sempre compreso tra i valori start_timestamp e end_timestamp specificati della query. |
record_sequence |
STRING |
Un numero di sequenza crescente in modo monotonico che può essere utilizzato per definire l'ordine del record delle partizioni figlio quando vengono restituiti più record di partizioni figlio con lo stesso start_timestamp in una determinata partizione. Il token di partizione, start_timestamp e record_sequence identificano in modo univoco un record di partizioni figlio. |
child_partitions |
ARRAY<STRUCT< |
Restituisce un insieme di partizioni figlio e le informazioni associate. Ciò include la stringa del token di partizione utilizzata per identificare la partizione secondaria nelle query, nonché i token delle partizioni principali. |
PostgreSQL
Campo | Tipo | Descrizione |
---|---|---|
start_timestamp |
STRING |
I record delle modifiche dei dati restituiti dalle partizioni figlio in questo record delle partizioni figlio hanno un timestamp di commit maggiore o uguale a start_timestamp . Quando esegui una query su una partizione figlio, la query deve specificare il token di partizione figlio e un valore start_timestamp maggiore o uguale a child_partitions_token.start_timestamp . Tutti i record delle partizioni secondarie restituiti da una partizione hanno lo stesso start_timestamp e il timestamp è sempre compreso tra i valori start_timestamp e end_timestamp specificati della query.
|
record_sequence |
STRING |
Un numero di sequenza crescente in modo monotonico che può essere utilizzato per definire l'ordine del record delle partizioni figlio quando vengono restituiti più record di partizioni figlio con lo stesso start_timestamp in una determinata partizione. Il token di partizione, start_timestamp e record_sequence identificano in modo univoco un record di partizioni figlio. |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
Restituisce un array di partizioni figlio e le informazioni associate. Ciò include la stringa del token di partizione utilizzata per identificare la partizione secondaria nelle query, nonché i token delle partizioni principali. |
Di seguito è riportato un esempio di record di partizione figlio:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Flusso di lavoro delle query sulle modifiche in tempo reale
Esegui query di modifiche in tempo reale utilizzando l'API ExecuteStreamingSql
, con una transazione di sola lettura monouso e un lega di timestamp significativo. La funzione di lettura delle modifiche in tempo reale consente di specificare start_timestamp
e end_timestamp
per l'intervallo di tempo di interesse. Tutti i record delle modifiche
durante il periodo di conservazione sono accessibili tramite un vincolo di timestamp elevato
di sola lettura.
Tutti gli altri elementi
TransactionOptions
non sono validi per le query di modifiche in tempo reale. Inoltre, se TransactionOptions.read_only.return_read_timestamp
è impostato su true, nel messaggio Transaction
verrà restituito un valore speciale kint64max - 1
che descrive la transazione, anziché un timestamp di lettura valido. Questo valore speciale deve essere ignorato e non utilizzato per eventuali query successive.
Ogni query di modifiche in tempo reale può restituire un numero illimitato di righe, ciascuna contenente un record di modifiche dei dati, un record di heartbeat o un record di partizioni figlio. Non è necessario impostare una scadenza per la richiesta.
Esempio:
Il flusso di lavoro delle query in modalità flusso inizia con l'invio della primissima query
di modifiche in tempo reale, specificando partition_token
in NULL
. La query deve specificare la funzione di lettura per il flusso di modifiche, il timestamp di inizio e di fine di interesse e l'intervallo di heartbeat. Quando il valore end_timestamp
è NULL
, la query continua a restituire modifiche ai dati fino al termine della partizione.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Elabora i record di dati di questa query finché i record di partizione figlio non vengono restituiti. Nell'esempio riportato di seguito, vengono restituiti due record di partizione figlio e tre token
di partizione, quindi la query viene terminata. I record di partizione figlio di una
query specifica condividono sempre lo stesso start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Per elaborare le modifiche future dopo il giorno 2022-05-01T09:00:01Z
, crea tre nuove query ed eseguile in parallelo. Insieme, tutte e tre le query restituiscono modifiche
ai dati future per lo stesso intervallo di chiavi coperto dall'elemento principale. Imposta sempre start_timestamp
su start_timestamp
nello stesso record di partizione figlio e utilizza lo stesso end_timestamp
e lo stesso intervallo heartbeat per elaborare i record in modo coerente in tutte le query.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Dopo un po' di tempo, la query su child_token_2
viene completata dopo la restituzione di un altro record di partizione figlio. Questo record indica che una nuova partizione coprirà le modifiche future per child_token_2
e child_token_3
a partire dal giorno 2022-05-01T09:30:15Z
. La query su child_token_3
restituirà esattamente lo stesso record, perché entrambe sono le partizioni padre del nuovo child_token_4
.
Per garantire un'elaborazione ordinata rigorosa dei record di dati per una determinata chiave,
la query su child_token_4
deve iniziare solo al termine di tutti gli elementi padre,
che in questo caso sono child_token_2
e child_token_3
. Crea una sola query per ogni token di partizione figlio. La progettazione del flusso di lavoro delle query deve nominare un padre che attenda e pianifica la query su child_token_4
.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Puoi trovare esempi di gestione e analisi dei record di modifiche in tempo reale nel connettore Dataflow di SpannerIO Apache Beam su GitHub.