Questa pagina descrive modifiche in tempo reale in Spanner per i database con dialetto Google SQL e per i database con dialetto PostgreSQL, tra cui:
- Il modello di partizione basato su suddivisione
- Il formato e i contenuti dei record dello stream di modifiche
- La sintassi di basso livello utilizzata per eseguire query su questi record
- Un esempio di flusso di lavoro delle query
Utilizza l'API Spanner per eseguire modifiche in tempo reale di modifiche. Le applicazioni che invece utilizzano Dataflow per leggere i dati dei stream di modifiche non devono lavorare direttamente con il modello dei dati descritto qui.
Per una guida introduttiva più ampia agli modifiche in tempo reale, consulta la Panoramica degli stream di modifiche.
Partizioni del flusso di modifiche
Quando si verifica una modifica in una tabella monitorata da uno stream di modifiche, Spanner scrive un record dello stream di modifiche corrispondente nel database in modo sincrono nella stessa transazione della modifica dei dati. Ciò significa che, se la transazione va a buon fine, Spanner ha anche acquisito e mantenuto la modifica. Internamente, Spanner colloca nello stesso luogo il record del flusso di modifiche e la modifica dei dati in modo che vengano elaborati dallo stesso server per ridurre al minimo il sovraccarico di scrittura.
Nell'ambito del DML per una determinata suddivisione, Spanner appende la scrittura alla suddivisione dei dati dello stream di modifiche corrispondente nella stessa transazione. A causa di questa co-localizzazione, gli stream di modifiche non richiedono un coordinamento aggiuntivo tra le risorse di pubblicazione, il che consente di ridurre al minimo il sovraccarico del commit delle transazioni.
Spanner esegue il ridimensionamento dividendo e unendo dinamicamente i dati in base al carico e alle dimensioni del database e distribuendo le suddivisioni tra le risorse di pubblicazione.
Per consentire la scalabilità delle letture e delle scritture delle modifiche in tempo reale, Spanner suddivide e unisce lo spazio di archiviazione interno delle modifiche in tempo reale insieme ai dati del database, evitando automaticamente gli hotspot. Per supportare la lettura dei record delle modifiche in tempo quasi reale man mano che le scritture del database vengono scalate, l'API Spanner è progettata per consentire di eseguire query su un flusso di modifiche contemporaneamente utilizzando le relative partizioni. Le partizioni del flusso di modifiche mappano le suddivisioni dei dati del flusso di modifiche che contengono i record del flusso di modifiche. Le partizioni di un flusso di modifiche cambiano dinamicamente nel tempo e sono correlate al modo in cui Spanner suddivide e unisce dinamicamente i dati del database.
Una partizione del flusso di modifiche contiene record per un intervallo di chiavi immutabili per un intervallo di tempo specifico. Qualsiasi partizione dello stream di modifiche può essere suddivisa in una o più partizioni dello stream di modifiche o essere unita ad altre partizioni dello stream di modifiche. Quando si verificano questi eventi di suddivisione o unione, vengono create partizioni secondarie per acquisire le modifiche relative ai rispettivi intervalli di chiavi immutabili per l'intervallo di tempo successivo. Oltre ai record di variazione dei dati, una query sul flusso di variazioni restituisce record di partizione secondaria per notificare ai lettori le nuove partizioni del flusso di variazioni su cui eseguire query, nonché record di heartbeat per indicare l'avanzamento quando non sono state eseguite scritture di recente.
Quando esegui una query su una determinata partizione dello stream di modifiche, i record di modifica vengono resi nell'ordine del timestamp del commit. Ogni record di modifica viene restituito esattamente una volta. L'ordinamento dei record delle modifiche non è garantito tra le partizioni dello stream delle modifiche. I record di modifica per una determinata chiave primaria vengono restituiti solo in una partizione per un determinato intervallo di tempo.
A causa della struttura delle partizioni principali e secondarie, per elaborare le modifiche per una determinata chiave in ordine di timestamp del commit, i record restituiti dalle partizioni secondarie devono essere elaborati solo dopo aver elaborato i record di tutte le partizioni principali.
Modificare le funzioni di lettura delle modifiche in tempo reale e la sintassi delle query
GoogleSQL
Per eseguire query sui modifiche in tempo reale, utilizza l'API
ExecuteStreamingSql
. Spanner crea automaticamente una funzione di lettura speciale insieme allo stream delle modifiche. La funzione di lettura fornisce l'accesso ai record dello stream di modifiche. La convenzione di denominazione della funzione di lettura è
READ_change_stream_name
.
Supponendo che nel database esista uno stream di modifiche SingersNameStream
, la sintassi delle query per GoogleSQL è la seguente:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
La funzione read accetta i seguenti argomenti:
Nome argomento | Tipo | Obbligatorio? | Descrizione |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obbligatorio | Specifica che devono essere restituiti i record con commit_timestamp maggiore o uguale a start_timestamp . Il
valore deve rientrare nel periodo di conservazione del flusso di modifiche e deve essere
inferiore o uguale all'ora corrente e maggiore o uguale al
timestamp della creazione del flusso di modifiche. |
end_timestamp |
TIMESTAMP |
Facoltativo (valore predefinito: NULL ) |
Specifica che devono essere restituiti i record con un valore commit_timestamp minore o uguale a end_timestamp . Il valore deve rientrare nel periodo di conservazione del flusso di modifiche e deve essere maggiore o uguale a start_timestamp . La
query termina dopo aver restituito tutti i ChangeRecords
fino al end_timestamp o quando termini la
connessione. Se end_timestamp è impostato su NULL
o non è specificato, l'esecuzione della query continua fino a quando non vengono restituiti tutti i
ChangeRecords o fino a quando non interrompi la
connessione. |
partition_token |
STRING |
Facoltativo (valore predefinito: NULL ) |
Specifica la partizione dello stream di modifiche su cui eseguire query in base ai contenuti dei record delle partizioni secondarie. Se è NULL o non è specificato, significa che il lettore sta eseguendo una query sul flusso di variazioni per la prima volta e non ha ottenuto token di partizione specifici da cui eseguire query. |
heartbeat_milliseconds |
INT64 |
Obbligatorio | Determina la frequenza con cui viene fornito un heartbeat ChangeRecord nel caso in cui non siano state effettuate transazioni in questa partizione.
Il valore deve essere compreso tra 1,000 (un secondo) e
300,000 (cinque minuti). |
read_options |
ARRAY |
Facoltativo (valore predefinito: NULL ) |
Aggiunge opzioni di lettura riservate per uso futuro. L'unico
valore consentito è NULL . |
Ti consigliamo di creare un metodo di supporto per creare il testo della query della funzione di lettura e associarvi i parametri, come mostrato nell'esempio seguente.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Per eseguire query sui modifiche in tempo reale, utilizza l'API
ExecuteStreamingSql
. Spanner crea automaticamente una funzione di lettura speciale insieme allo stream delle modifiche. La funzione di lettura fornisce l'accesso ai record dello stream di modifiche. La convenzione di denominazione della funzione di lettura è
spanner.read_json_change_stream_name
.
Supponendo che nel database esista uno stream di modifiche SingersNameStream
, la sintassi della query per PostgreSQL è la seguente:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
La funzione read accetta i seguenti argomenti:
Nome argomento | Tipo | Obbligatorio? | Descrizione |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obbligatorio | Specifica che devono essere restituiti i record di variazione con commit_timestamp
maggiore o uguale a start_timestamp . Il valore deve rientrare nel periodo di conservazione del stream di modifiche, deve essere minore o uguale all'ora corrente e maggiore o uguale al timestamp della creazione dello stream di modifiche. |
end_timestamp |
timestamp with timezone |
Facoltativo (valore predefinito: NULL ) |
Specifica che devono essere restituiti i record di modifica con commit_timestamp
minore o uguale a end_timestamp . Il valore deve rientrare nel periodo di conservazione del stream di modifiche
e deve essere maggiore o uguale a start_timestamp .
La query termina dopo aver restituito tutti i record di variazione fino al valore end_timestamp o fino a quando non termini la connessione.
Se è NULL , l'esecuzione della query continua finché non vengono restituiti tutti i record di variazione o fino a quando non termini la connessione. |
partition_token |
text |
Facoltativo (valore predefinito: NULL ) |
Specifica la partizione dello stream di modifiche su cui eseguire query in base ai contenuti dei record delle partizioni secondarie. Se è NULL o non è specificato, significa che il lettore sta eseguendo una query sul flusso di variazioni per la prima volta e non ha ottenuto token di partizione specifici da cui eseguire query. |
heartbeat_milliseconds |
bigint |
Obbligatorio | Determina la frequenza con cui viene fornito un heartbeat ChangeRecord quando non sono presenti transazioni committate in questa partizione.
Il valore deve essere compreso tra 1,000 (un secondo) e
300,000 (cinque minuti). |
null |
null |
Obbligatorio | Riservato per l'uso futuro |
Ti consigliamo di creare un metodo di supporto per creare il testo della funzione di lettura e associarvi i parametri, come mostrato nell'esempio seguente.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Modificare il formato dei record dei flussi di modifiche
GoogleSQL
La funzione di lettura degli modifiche in tempo reale restituisce una singola colonna ChangeRecord
di tipo ARRAY<STRUCT<...>>
. In ogni riga, questo array contiene sempre un singolo elemento.
Gli elementi dell'array hanno il seguente tipo:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Questo STRUCT
contiene tre campi: data_change_record
,
heartbeat_record
e child_partitions_record
, ciascuno di tipo
ARRAY<STRUCT<...>>
. In qualsiasi riga restituita dalla funzione di lettura dello stream di modifiche, solo uno di questi tre campi contiene un valore; gli altri due sono vuoti o NULL
. Questi campi di array contengono al massimo un elemento.
Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.
PostgreSQL
La funzione di lettura degli modifiche in tempo reale restituisce una singola colonna ChangeRecord
di tipo JSON
con la seguente struttura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Questo oggetto contiene tre possibili chiavi: data_change_record
,
heartbeat_record
e child_partitions_record
. Il tipo di valore corrispondente è JSON
. In qualsiasi riga restituita dalla funzione di lettura del flusso di modifiche, esiste solo una di queste tre chiavi.
Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.
Record di modifica dei dati
Un record di modifica dei dati contiene un insieme di modifiche a una tabella con lo stesso tipo di modifica (inserimento, aggiornamento o eliminazione) eseguito nello stesso timestamp di commit in una partizione dello stream di modifiche per la stessa transazione. È possibile restituire più record di variazione dei dati per la stessa transazione in più partizioni del flusso di variazioni.
Tutti i record di modifica dei dati hanno i campi commit_timestamp
, server_transaction_id
e record_sequence
, che insieme determinano l'ordine nel flusso di modifiche per un record dello stream. Questi tre campi sono sufficienti per dedurre
l'ordine delle modifiche e garantire la coerenza esterna.
Tieni presente che più transazioni possono avere lo stesso timestamp di commit se riguardano dati non sovrapposti. Il campo server_transaction_id
offre la possibilità di distinguere quale insieme di modifiche (potenzialmente tra le partizioni dello stream di modifiche) è stato emesso all'interno della stessa transazione. Se lo abbini ai campi record_sequence
e
number_of_records_in_transaction
, puoi anche mettere in coda e ordinare tutti i record di una determinata transazione.
I campi di un record di modifica dei dati includono quanto segue:
GoogleSQL
Campo | Tipo | Descrizione |
---|---|---|
commit_timestamp |
TIMESTAMP |
Indica il timestamp in cui è stata eseguita la commit della modifica. |
record_sequence |
STRING |
Indica il numero di sequenza del record all'interno della transazione.
I numeri di sequenza sono univoci e aumentano in modo monotonico (ma non necessariamente contigui) all'interno di una transazione. Ordina i record per lo stesso
server_transaction_id in base a record_sequence per
ricostruire l'ordine delle modifiche all'interno della transazione.
Spanner potrebbe ottimizzare questo ordinamento per migliorare il rendimento
e potrebbe non corrispondere sempre all'ordinamento originale fornito. |
server_transaction_id |
STRING |
Fornisce una stringa univoca a livello globale che rappresenta la transazione in cui è stata eseguita la modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record dello stream di modifiche e non è correlato all'ID transazione nell'API di Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica se si tratta dell'ultimo record di una transazione nella partizione corrente. |
table_name |
STRING |
Nome della tabella interessata dalla modifica. |
value_capture_type |
STRING |
Descrive il tipo di acquisizione dei valori specificato nella configurazione del stream di modifiche al momento dell'acquisizione di questa modifica. Il tipo di acquisizione del valore può essere uno dei seguenti:
Il valore predefinito è |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indica il nome della colonna, il tipo di colonna, se si tratta di una chiave primaria e la posizione della colonna come definita nello schema (ordinal_position ). La prima colonna di una tabella nello schema avrà una posizione ordinale pari a 1 . Il
tipo di colonna può essere nidificato per le colonne di array. Il formato corrisponde alla struttura del tipo descritta nella documentazione di riferimento dell'API Spanner.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Descrive le modifiche apportate, inclusi i valori della chiave primaria, i vecchi valori e i nuovi valori delle colonne modificate o monitorate.
La disponibilità e i contenuti dei valori precedenti e nuovi dipendono dal valore value_capture_type configurato. I campi new_values e
old_values contengono solo le colonne non chiave. |
mod_type |
STRING |
Descrive il tipo di modifica. Uno dei valori INSERT ,
UPDATE o DELETE . |
number_of_records_in_transaction |
INT64 |
Indica il numero di record di modifica dei dati che fanno parte di questa transazione in tutte le partizioni del flusso di modifiche. |
number_of_partitions_in_transaction |
INT64 |
Indica il numero di partizioni che restituiscono record di variazione dei dati per questa transazione. |
transaction_tag |
STRING |
Indica il tag transazione associato a questa transazione. |
is_system_transaction |
BOOL |
Indica se la transazione è una transazione di sistema. |
PostgreSQL
Campo | Tipo | Descrizione |
---|---|---|
commit_timestamp |
STRING |
Indica il timestamp in cui è stata eseguita la commit della modifica. |
record_sequence |
STRING |
Indica il numero di sequenza del record all'interno della transazione.
I numeri di sequenza sono univoci e aumentano in modo monotonico (ma non necessariamente contigui) all'interno di una transazione. Ordina i record per lo stesso
server_transaction_id in base a record_sequence per
ricostruire l'ordine delle modifiche all'interno della transazione. |
server_transaction_id |
STRING |
Fornisce una stringa univoca a livello globale che rappresenta la transazione in cui è stata eseguita la modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record dello stream di modifiche e non è correlato all'ID transazione nell'API Spanner |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica se si tratta dell'ultimo record di una transazione nella partizione corrente. |
table_name |
STRING |
Indica il nome della tabella interessata dalla modifica. |
value_capture_type |
STRING |
Descrive il tipo di acquisizione dei valori specificato nella configurazione del stream di modifiche quando questa modifica è stata acquisita. Il tipo di acquisizione del valore può essere uno dei seguenti:
Il valore predefinito è |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indica il nome della colonna, il tipo di colonna, se si tratta di una chiave primaria e la posizione della colonna come definita nello schema (ordinal_position ). La prima colonna di una tabella nello schema avrà una posizione ordinale pari a 1 . Il
tipo di colonna può essere nidificato per le colonne di array. Il formato corrisponde alla struttura del tipo descritta nel riferimento all'API Spanner.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Descrive le modifiche apportate, inclusi i valori della chiave primaria, i valori precedenti e i nuovi valori delle colonne modificate o monitorate. La disponibilità e i contenuti dei valori precedenti e nuovi dipendono dal value_capture_type configurato. I campi new_values e old_values contengono solo le colonne non chiave.
|
mod_type |
STRING |
Descrive il tipo di modifica. Uno dei valori INSERT ,
UPDATE o DELETE . |
number_of_records_in_transaction |
INT64 |
Indica il numero di record di modifica dei dati che fanno parte di questa transazione in tutte le partizioni del flusso di modifiche. |
number_of_partitions_in_transaction |
NUMBER |
Indica il numero di partizioni che restituiscono record di variazione dei dati per questa transazione. |
transaction_tag |
STRING |
Indica il tag transazione associato a questa transazione. |
is_system_transaction |
BOOLEAN |
Indica se la transazione è una transazione di sistema. |
Esempio di record di modifica dei dati
Di seguito è riportata una coppia di record di modifica dei dati di esempio. Descrivono una singola transazione in cui è presente un trasferimento tra due account. I due account si trovano in partizioni di stream di variazioni separate.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
Il seguente record di modifica dei dati è un esempio di record con il tipo di acquisizione del valore NEW_VALUES
. Tieni presente che vengono compilati solo i nuovi valori.
È stata modificata solo la colonna LastUpdate
, pertanto è stata restituita solo quella colonna.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Il seguente record di modifica dei dati è un esempio di record con il tipo di acquisizione del valore NEW_ROW
. È stata modificata solo la colonna LastUpdate
, ma vengono restituite tutte le colonne monitorate.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Il seguente record di modifica dei dati è un esempio di record con il tipo di acquisizione del valore NEW_ROW_AND_OLD_VALUES
. È stata modificata solo la colonna LastUpdate
, ma vengono restituite tutte le colonne monitorate. Questo tipo di acquisizione del valore acquisisce
il nuovo valore e il valore precedente di LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Record heartbeat
Quando viene restituito un record heartbeat, indica che tutte le modifiche concommit_timestamp
inferiore o uguale atimestamp
del record heartbeat sono state restituite e che i record di dati futuri in questa
partizione devono avere timestamp di commit superiori a quelli restituiti dal
record heartbeat. I record heartbeat vengono restituiti quando non sono presenti modifiche ai dati scritte in una partizione. Quando vengono scritte modifiche ai dati nella partizione, è possibile utilizzare data_change_record.commit_timestamp
anziché heartbeat_record.timestamp
per indicare che il lettore sta procedendo nella lettura della partizione.
Puoi utilizzare i record heartbeat restituiti nelle partizioni per sincronizzare
i lettori in tutte le partizioni. Una volta che tutti i lettori hanno ricevuto un heartbeat maggiore o uguale a un timestamp A
o hanno ricevuto dati o record delle partizioni secondarie maggiori o uguali al timestamp A
, sanno di aver ricevuto tutti i record committati prima o in quel timestamp A
e possono iniziare a elaborare i record in buffer, ad esempio ordinare i record tra partizioni in base al timestamp e raggrupparli per server_transaction_id
.
Un record heartbeat contiene un solo campo:
GoogleSQL
Campo | Tipo | Descrizione |
---|---|---|
timestamp |
TIMESTAMP |
Indica il timestamp del record heartbeat. |
PostgreSQL
Campo | Tipo | Descrizione |
---|---|---|
timestamp |
STRING |
Indica il timestamp del record heartbeat. |
Record heartbeat di esempio
Un esempio di record heartbeat che comunica che sono stati restituiti tutti i record con timestamp inferiori o uguali a quello di questo record:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Record delle partizioni secondarie
I record delle partizioni secondarie restituiscono informazioni sulle partizioni secondarie: i relativi token di partizione, i token delle partizioni principali e il valore start_timestamp
che rappresenta il timestamp precedente per cui le partizioni secondarie contengono record di modifica. I record i cui timestamp di commit
sono immediatamente precedenti a child_partitions_record.start_timestamp
vengono
restituito nella partizione corrente. Dopo aver restituito tutti i record delle partizioni secondarie per questa partizione, questa query restituisce uno stato di successo, che indica che tutti i record sono stati restituiti per questa partizione.
I campi di un record della partizione secondaria includono quanto segue:
GoogleSQL
Campo | Tipo | Descrizione |
---|---|---|
start_timestamp |
TIMESTAMP |
Indica che i record di modifica dei dati restituiti dalle partizioni secondarie in questo record della partizione secondaria hanno un timestamp di commit maggiore o uguale a start_timestamp . Quando esegui una query su una
partizione secondaria, la query deve specificare il token della partizione secondaria e un
start_timestamp maggiore o uguale a
child_partitions_token.start_timestamp . Tutti i record delle partizioni secondarie
retitrati da una partizione hanno lo stesso start_timestamp
e il timestamp rientra sempre tra start_timestamp e end_timestamp specificati dalla query. |
record_sequence |
STRING |
Indica un numero di sequenza in aumento monotonico che può essere utilizzato per definire l'ordinamento dei record della partizione secondaria quando sono presenti più record della partizione secondaria restituiti con lo stesso start_timestamp in una determinata partizione. Il token partizione, start_timestamp
e record_sequence identificano in modo univoco un record della partizione secondaria.
|
child_partitions |
[ { "token" : "STRING", "parent_partition_tokens" : ["STRING"] } ] |
Restituisce un insieme di partizioni secondarie e le relative informazioni associate. Sono inclusi la stringa del token della partizione utilizzata per identificare la partizione secondaria nelle query, nonché i token delle relative partizioni principali. |
PostgreSQL
Campo | Tipo | Descrizione |
---|---|---|
start_timestamp |
STRING |
Indica che i record di modifica dei dati restituiti dalle partizioni secondarie in questo record della partizione secondaria hanno un timestamp di commit maggiore o uguale a start_timestamp . Quando esegui una query su una
partizione secondaria, la query deve specificare il token della partizione secondaria e un valore
start_timestamp maggiore o uguale a
child_partitions_token.start_timestamp . Tutti i record delle partizioni secondarie
retituti da una partizione hanno lo stesso start_timestamp
e il timestamp rientra sempre tra start_timestamp e end_timestamp specificati dalla query.
|
record_sequence |
STRING |
Indica un numero di sequenza in aumento monotonico che può essere utilizzato per definire l'ordinamento dei record della partizione secondaria quando sono presenti più record della partizione secondaria restituiti con lo stesso start_timestamp in una determinata partizione. Il token partizione, start_timestamp
e record_sequence identificano in modo univoco un record della partizione secondaria.
|
child_partitions |
[ { "token": "STRING", "parent_partition_tokens": ["STRING"], }, [...] ] |
Restituisce un array di partizioni secondarie e le relative informazioni associate. Sono inclusi la stringa del token della partizione utilizzata per identificare la partizione secondaria nelle query, nonché i token delle relative partizioni principali. |
Esempio di record della partizione secondaria
Di seguito è riportato un esempio di record della partizione secondaria:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Flusso di lavoro delle query sui flussi di modifiche
Esegui query sui flussi di variazioni utilizzando l'API
ExecuteStreamingSql
, con una
transazione di sola lettura
monouso e un
limite di timestamp rigoroso. La funzione di lettura del stream di variazioni consente di specificare start_timestamp
e
end_timestamp
per l'intervallo di tempo di interesse. Tutti i record di modifica
all'interno del periodo di conservazione sono accessibili utilizzando il vincolo di timestamp di sola lettura.
Tutti gli altri
TransactionOptions
non sono validi per le query sugli stream di modifiche. Inoltre, se TransactionOptions.read_only.return_read_timestamp
è impostato su true
, nel messaggio Transaction
che descrive la transazione viene restituito un valore speciale kint64max - 1
anziché un timestamp di lettura valido. Questo valore speciale deve essere ignorato e non deve essere utilizzato per le query successive.
Ogni query dello stream di modifiche può restituire un numero qualsiasi di righe, ciascuna contenente un record di modifica dei dati, un record di heartbeat o un record delle partizioni secondarie. Non è necessario impostare una scadenza per la richiesta.
Esempio di flusso di lavoro di query sul flusso di modifiche
Il flusso di lavoro delle query in streaming inizia con l'emissione della prima query sul flusso di modifiche specificando partition_token
e NULL
. La query deve specificare la funzione di lettura per lo stream di modifiche, i timestamp di inizio e di fine di interesse e l'intervallo di heartbeat. Quando end_timestamp
è NULL
, la query continua a restituire le modifiche ai dati fino al termine della partizione.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Elabora i record di dati di questa query fino a quando non vengono restituiti tutti i record della partizione secondaria. Nell'esempio seguente vengono restituiti due record di partizione secondaria e tre token di partizione, dopodiché la query termina. I record della partizione secondaria
di una query specifica condividono sempre lo stesso start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Per elaborare le modifiche dopo il giorno 2022-05-01T09:00:01Z
, crea tre nuove query e
eseguile in parallelo. Se utilizzate insieme, le tre query restituiscono le modifiche ai dati per lo stesso intervallo di chiavi coperto dalla query principale. Imposta sempre start_timestamp
su
start_timestamp
nello stesso record della partizione secondaria e utilizza lo stesso
end_timestamp
e lo stesso intervallo di heartbeat per elaborare i record in modo coerente
in tutte le query.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
La query su child_token_2
termina dopo aver restituito un altro record della partizione secondaria. Questo record indica che una nuova partizione copre le modifiche sia per child_token_2
sia per child_token_3
a partire da 2022-05-01T09:30:15Z
. La query su child_token_3
restituisce lo stesso record esatto, perché entrambe sono le partizioni principali del nuovo child_token_4
. Per garantire un'elaborazione rigorosamente ordinata dei record di dati per una determinata chiave, la query su child_token_4
deve essere avviata dopo il completamento di tutti i record principali. In questo caso, i gruppi principali sono
child_token_2
e child_token_3
. Crea una sola query per ogni
token di partizione secondaria. Il design del flusso di lavoro delle query deve designare un'entità principale in attesa e
pianificare la query su child_token_4
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Puoi trovare esempi di gestione e analisi dei record dello stream di modifiche nel connettore Apache Beam SpannerIO Dataflow su GitHub.