Partizioni, record e query dei flussi di modifiche

In questa pagina vengono descritti in dettaglio i seguenti attributi delle modifiche in tempo reale:

  • Il suo modello di partizionamento diviso
  • Il formato e i contenuti dei record di modifiche in tempo reale
  • La sintassi di basso livello utilizzata per eseguire query su questi record
  • Esempio del flusso di lavoro per le query

Le informazioni contenute in questa pagina sono quelle più pertinenti per l'utilizzo dell'API Spanner per eseguire modifiche in tempo reale di modifiche. Le applicazioni che invece utilizzano Dataflow per leggere i dati delle modifiche in tempo reale non devono lavorare direttamente con il modello dei dati descritto qui.

Per una guida introduttiva più completa alle modifiche in tempo reale, consulta Panoramica delle modifiche in tempo reale.

Partizioni delle modifiche in tempo reale

Quando si verifica una modifica in una tabella controllata da un flusso di modifiche, Spanner scrive un record di modifiche in tempo reale corrispondente nel database, in modo sincrono nella stessa transazione della modifica dei dati. Ciò garantisce che, se la transazione ha esito positivo, Spanner ha anche acquisito e mantenuto correttamente la modifica. Internamente, Spanner posiziona il record di modifiche in tempo reale e la modifica dei dati in modo che vengano elaborati dallo stesso server al fine di ridurre al minimo l'overhead di scrittura.

Come parte del codice DML a una determinata suddivisione, Spanner aggiunge la scrittura ai dati del flusso di modifiche corrispondenti suddivisi nella stessa transazione. A causa di questa colocation, i flussi di modifiche non aggiungono ulteriore coordinamento tra le risorse di gestione, il che riduce al minimo l'overhead del commit della transazione.

immagine

Spanner scala mediante la suddivisione e l'unione dinamica dei dati in base al carico e alle dimensioni del database e la distribuzione delle suddivisioni tra le risorse di gestione.

Per abilitare le operazioni di scrittura e lettura per la scalabilità delle modifiche in tempo reale, Spanner suddivide e unisce lo spazio di archiviazione interno delle modifiche in tempo reale ai dati del database, evitando automaticamente gli hotspot. Per supportare la lettura dei record delle modifiche in tempo reale quasi in tempo reale durante la scrittura del database sulla scala, l'API Spanner è progettata per eseguire query su un flusso di modifiche contemporaneamente utilizzando le partizioni delle modifiche in tempo reale. Le partizioni delle modifiche in tempo reale corrispondono alle suddivisioni dei dati delle modifiche in tempo reale che contengono i record delle modifiche in tempo reale. Le partizioni di un flusso di modifiche cambiano dinamicamente nel tempo e sono correlate al modo in cui Spanner suddivide e unisce dinamicamente i dati del database.

Una partizione di modifiche in tempo reale contiene i record per un intervallo di chiavi immutabili per un intervallo di tempo specifico. Qualsiasi partizione di modifiche in tempo reale può essere suddivisa in una o più partizioni di modifiche in tempo reale oppure unite ad altre partizioni di modifiche in tempo reale. Quando si verificano questi eventi di suddivisione o unione, vengono create partizioni figlio per acquisire le modifiche per i rispettivi intervalli di chiavi immutabili per l'intervallo di tempo successivo. Oltre ai record delle modifiche ai dati, una query di modifiche in tempo reale restituisce i record delle partizioni figlio per comunicare ai lettori le nuove partizioni di modifiche in tempo reale su cui è necessario eseguire una query, nonché i record Heartbeat per indicare l'avanzamento dell'avanzamento quando di recente non sono state eseguite scritture.

Quando esegui una query su una determinata partizione di modifiche in tempo reale, i record delle modifiche vengono restituiti in ordine di timestamp di commit. Ogni record di modifica viene restituito esattamente una volta. Nelle partizioni delle modifiche in tempo reale, l'ordinamento dei record delle modifiche non è garantito. I record delle modifiche per una determinata chiave primaria vengono restituiti solo su una partizione per un determinato intervallo di tempo.

A causa della derivazione della partizione padre-figlio, per elaborare le modifiche per una determinata chiave nell'ordine del timestamp di commit, i record restituiti dalle partizioni figlio devono essere elaborati solo dopo che sono stati elaborati i record di tutte le partizioni padre.

Funzioni di lettura delle modifiche in tempo reale e sintassi delle query

GoogleSQL

Puoi eseguire query sulle modifiche in tempo reale utilizzando l'API ExecuteStreamingSql. Spanner crea automaticamente una funzione di lettura speciale insieme al flusso di modifiche. La funzione di lettura consente di accedere ai record del flusso di modifiche. La convenzione di denominazione delle funzioni di lettura è READ_change_stream_name.

Se nel database è presente un flusso di modifiche SingersNameStream, la sintassi delle query per GoogleSQL è la seguente:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

La funzione di lettura accetta i seguenti argomenti:

Nome argomento Tipo Obbligatorio? Descrizione
start_timestamp TIMESTAMP Obbligatorio Specifica che devono essere restituiti i record con commit_timestamp maggiore o uguale a start_timestamp. Il valore deve essere compreso nel periodo di conservazione delle modifiche in tempo reale, deve essere inferiore o uguale all'ora attuale e maggiore o uguale al timestamp di creazione del flusso di modifiche.
end_timestamp TIMESTAMP Facoltativo (valore predefinito: NULL) Specifica che devono essere restituiti i record con commit_timestamp minore o uguale a end_timestamp. Il valore deve rientrare nel periodo di conservazione delle modifiche in tempo reale ed essere maggiore o uguale a start_timestamp. La query viene completata dopo la restituzione di tutti i ChangeRecords fino a end_timestamp o l'utente termina la connessione. Se NULL o non è specificato, la query viene eseguita fino a quando non vengono restituiti tutti i ChangeRecord o fino a quando l'utente non termina la connessione.
partition_token STRING Facoltativo (valore predefinito: NULL) Specifica la partizione di modifiche in tempo reale su cui eseguire la query, in base ai contenuti dei record delle partizioni figlio. Se NULL o non è specificato, significa che il lettore sta eseguendo una query sul flusso di modifiche per la prima volta e non ha ottenuto alcun token di partizione specifico da cui eseguire la query.
heartbeat_milliseconds INT64 Obbligatorio Determina la frequenza con cui viene restituito un ChangeRecord Heartbeat nel caso in cui non siano presenti transazioni impegnate in questa partizione. Il valore deve essere compreso tra 1,000 (un secondo) e 30,0000 (cinque minuti).
read_options ARRAY Facoltativo (valore predefinito: NULL) Opzioni di lettura aggiuntive riservate per uso futuro. Attualmente, l'unico valore consentito è NULL.

È consigliabile creare un metodo pratico per creare il testo della query della funzione di lettura e dei parametri di associazione al testo, come mostrato nell'esempio che segue.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Puoi eseguire query sulle modifiche in tempo reale utilizzando l'API ExecuteStreamingSql. Spanner crea automaticamente una funzione di lettura speciale insieme al flusso di modifiche. La funzione di lettura consente di accedere ai record del flusso di modifiche. La convenzione di denominazione delle funzioni di lettura è spanner.read_json_change_stream_name.

Supponendo che nel database esista un flusso di modifiche SingersNameStream, la sintassi delle query per PostgreSQL è la seguente:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

La funzione di lettura accetta i seguenti argomenti:

Nome argomento Tipo Obbligatorio? Descrizione
start_timestamp timestamp with time zone Obbligatorio Specifica che devono essere restituiti i record delle modifiche con commit_timestamp maggiore o uguale a start_timestamp. Il valore deve essere compreso nel periodo di conservazione delle modifiche in tempo reale, deve essere inferiore o uguale all'ora attuale e maggiore o uguale al timestamp di creazione del flusso di modifiche.
end_timestamp timestamp with timezone Facoltativo (valore predefinito: NULL) Specifica che devono essere restituiti i record delle modifiche con commit_timestamp minore o uguale a end_timestamp. Il valore deve rientrare nel periodo di conservazione delle modifiche in tempo reale ed essere maggiore o uguale a start_timestamp. La query termina dopo aver restituito tutti i record delle modifiche fino al giorno end_timestamp o l'utente termina la connessione. Se NULL, la query viene eseguita fino a quando non vengono restituiti tutti i record delle modifiche o fino a quando l'utente non termina la connessione.
partition_token text Facoltativo (valore predefinito: NULL) Specifica la partizione di modifiche in tempo reale su cui eseguire la query, in base ai contenuti dei record delle partizioni figlio. Se NULL o non è specificato, significa che il lettore sta eseguendo una query sul flusso di modifiche per la prima volta e non ha ottenuto alcun token di partizione specifico da cui eseguire la query.
heartbeat_milliseconds bigint Obbligatorio Determina la frequenza con cui verrà restituito un ChangeRecord Heartbeat nel caso in cui non siano presenti transazioni impegnate in questa partizione. Il valore deve essere compreso tra 1,000 (un secondo) e 300,000 (cinque minuti).
null null Obbligatorio Riservato per uso futuro

È consigliabile creare un metodo pratico per creare il testo della funzione di lettura e i parametri di associazione al testo, come mostrato nell'esempio che segue.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Formato di record delle modifiche in tempo reale

GoogleSQL

La funzione di lettura delle modifiche in tempo reale restituisce una singola colonna ChangeRecord di tipo ARRAY<STRUCT<...>>. In ogni riga, questo array contiene sempre un singolo elemento.

Gli elementi array hanno il tipo seguente:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Questo struct contiene tre campi: data_change_record, heartbeat_record e child_partitions_record, ciascuno di tipo ARRAY<STRUCT<...>>. In ogni riga restituita dalla funzione di lettura di modifiche in tempo reale, solo uno di questi tre campi contiene un valore; gli altri due sono vuoti o NULL. Questi campi array contengono al massimo un elemento.

Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.

PostgreSQL

La funzione di lettura delle modifiche in tempo reale restituisce una singola colonna ChangeRecord di tipo JSON con la seguente struttura:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

In questo oggetto sono presenti tre chiavi possibili: data_change_record, heartbeat_record e child_partitions_record; il tipo di valore corrispondente è JSON. In ogni riga restituita dalla funzione di lettura di modifiche in tempo reale, è presente solo una di queste tre chiavi.

Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.

Record di modifiche dei dati

Un record di modifica dei dati contiene un insieme di modifiche a una tabella con lo stesso tipo di modifica (inserimento, aggiornamento o eliminazione) impegnata con lo stesso timestamp del commit in una partizione di modifiche in tempo reale per la stessa transazione. È possibile restituire più record di modifiche ai dati per la stessa transazione tra più partizioni di modifiche in tempo reale.

Tutti i record delle modifiche dei dati hanno campi commit_timestamp, server_transaction_id e record_sequence, che insieme determinano l'ordine nel flusso di modifiche per un record di stream. Questi tre campi sono sufficienti per ricavare l'ordine delle modifiche e fornire coerenza esterna.

Tieni presente che più transazioni possono avere lo stesso timestamp di commit se si riferiscono a dati non sovrapposti. Il campo server_transaction_id offre la possibilità di distinguere quale insieme di modifiche (potenzialmente nelle partizioni delle modifiche in tempo reale) è stato applicato nella stessa transazione. L'accoppiamento con i campi record_sequence e number_of_records_in_transaction consente anche di eseguire il buffering e l'ordine di tutti i record di una determinata transazione.

I campi di un record di modifica dei dati includono:

GoogleSQL

Campo Tipo Descrizione
commit_timestamp TIMESTAMP Il timestamp in cui è stato eseguito il commit della modifica.
record_sequence STRING Il numero di sequenza per il record all'interno della transazione. I numeri di sequenza sono garantiti in modo univoco e in aumento monotonico (ma non necessariamente contigui) all'interno di una transazione. Ordina i record per lo stesso server_transaction_id per record_sequence per ricostruire l'ordine delle modifiche nella transazione.
server_transaction_id STRING Una stringa univoca a livello globale che rappresenta la transazione in cui è stato eseguito il commit della modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record di modifiche in tempo reale e non è correlato all'ID transazione nell'API Spanner.
is_last_record_in_transaction_in_partition BOOL Indica se questo è l'ultimo record di una transazione nella partizione corrente.
table_name STRING Il nome della tabella interessata dalla modifica.
value_capture_type STRING

Descrive il tipo di acquisizione del valore specificato nella configurazione delle modifiche in tempo reale quando è stata acquisita questa modifica.

Il tipo di acquisizione del valore può essere "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" o "NEW_ROW_AND_OLD_VALUES". Il valore predefinito è "OLD_AND_NEW_VALUES". Per maggiori informazioni, consulta la sezione sui tipi di acquisizione del valore.

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
Nome della colonna, tipo di colonna (se si tratta di una chiave primaria) e posizione della colonna come definita nello schema ("ordinal_position"). La prima colonna di una tabella nello schema avrà una posizione ordinale "1". Il tipo di colonna può essere nidificato per le colonne array. Il formato corrisponde alla struttura dei tipi descritta nel riferimento per l'API Spanner.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Descrive le modifiche apportate, incluse le coppie chiave-valore primarie, i valori precedenti e i nuovi valori delle colonne modificate o monitorate. La disponibilità e i contenuti dei valori vecchi e nuovi dipenderanno dal valore value_capture_type configurato. I campi new_values e old_values contengono solo le colonne non chiave.
mod_type STRING Descrive il tipo di cambiamento. Uno tra INSERT, UPDATE o DELETE.
number_of_records_in_transaction INT64 Il numero di record di modifica dei dati che fanno parte di questa transazione in tutte le partizioni di modifiche in tempo reale.
number_of_partitions_in_transaction INT64 Il numero di partizioni che restituiranno i record delle modifiche ai dati per questa transazione.
transaction_tag STRING Tag Transaction associato a questa transazione.
is_system_transaction BOOL Indica se si tratta di una transazione di sistema.

PostgreSQL

Campo Tipo Descrizione
commit_timestamp STRING Il timestamp in cui è stato eseguito il commit della modifica.
record_sequence STRING Il numero di sequenza per il record all'interno della transazione. I numeri di sequenza sono garantiti in modo univoco e in aumento monotonico (ma non necessariamente contigui) all'interno di una transazione. Ordina i record per lo stesso "server_transaction_id" per "record_phrase" per ricostruire l'ordine delle modifiche all'interno della transazione.
server_transaction_id STRING Una stringa univoca a livello globale che rappresenta la transazione in cui è stato eseguito il commit della modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record di modifiche in tempo reale e non è correlato all'ID transazione nell'API Spanner
is_last_record_in_transaction_in_partition BOOLEAN Indica se questo è l'ultimo record di una transazione nella partizione corrente.
table_name STRING Il nome della tabella interessata dalla modifica.
value_capture_type STRING

Descrive il tipo di acquisizione del valore specificato nella configurazione delle modifiche in tempo reale quando è stata acquisita questa modifica.

Il tipo di acquisizione del valore può essere "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" o "NEW_ROW_AND_OLD_VALUES". Il valore predefinito è "OLD_AND_NEW_VALUES". Per maggiori informazioni, consulta la sezione sui tipi di acquisizione del valore.

column_types

[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
Nome della colonna, tipo di colonna (se si tratta di una chiave primaria) e posizione della colonna come definita nello schema ("ordinal_position"). La prima colonna di una tabella nello schema avrà una posizione ordinale "1". Il tipo di colonna può essere nidificato per le colonne array. Il formato corrisponde alla struttura dei tipi descritta nel riferimento per l'API Spanner.
mods

[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Descrive le modifiche apportate, incluse le coppie chiave-valore primarie, i valori precedenti e i nuovi valori delle colonne modificate o monitorate. La disponibilità e i contenuti dei valori vecchi e nuovi dipenderanno dal valore value_capture_type configurato. I campi new_values e old_values contengono solo le colonne non chiave.
mod_type STRING Descrive il tipo di cambiamento. Uno tra INSERT, UPDATE o DELETE.
number_of_records_in_transaction INT64 Il numero di record di modifica dei dati che fanno parte di questa transazione in tutte le partizioni di modifiche in tempo reale.
number_of_partitions_in_transaction NUMBER Il numero di partizioni che restituiranno i record delle modifiche ai dati per questa transazione.
transaction_tag STRING Tag Transaction associato a questa transazione.
is_system_transaction BOOLEAN Indica se si tratta di una transazione di sistema.

Di seguito sono riportati alcuni esempi di record di modifiche dei dati. Descrivono una singola transazione in cui avviene un trasferimento tra due account. Tieni presente che i due account si trovano in partizioni di modifiche in tempo reale separate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

Il seguente record di modifica dei dati è un esempio di record con tipo di acquisizione di valori "NEW_VALUES". Tieni presente che vengono compilati solo i nuovi valori. È stata modificata solo la colonna "LastUpdate" e di conseguenza è stata restituita solo quella colonna.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Il seguente record di modifica dei dati è un esempio di record con tipo di acquisizione di valori "NEW_ROW". È stata modificata solo la colonna "LastUpdate", ma vengono restituite tutte le colonne monitorate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Il seguente record di modifica dei dati è un esempio di record con tipo di acquisizione di valori "NEW_ROW_AND_OLD_VALUES". È stata modificata solo la colonna "LastUpdate", ma vengono restituite tutte le colonne monitorate. Questo tipo di acquisizione valore acquisisce il nuovo valore e il valore precedente di LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Record Heartbeat

Quando viene restituito un record Heartbeat, questo indica che sono state restituite tutte le modifiche con commit_timestamp minore o uguale al valore timestamp del record Heartbeat e che i record di dati futuri in questa partizione devono avere timestamp di commit più elevati rispetto a quelli restituiti dal record heartbeat. I record Heartbeat vengono restituiti se non vengono scritte modifiche ai dati di una partizione. Se vengono scritte modifiche ai dati nella partizione, è possibile utilizzare data_change_record.commit_timestamp anziché heartbeat_record.timestamp per indicare che il lettore sta avanzando nella lettura della partizione.

Puoi utilizzare record Heartbeat restituiti sulle partizioni per sincronizzare i lettori tra tutte le partizioni. Una volta che tutti i lettori hanno ricevuto un battito cardiaco maggiore o uguale a un timestamp A o hanno ricevuto record di partizioni figlio o dati maggiori o uguali al timestamp A, i lettori sanno di aver ricevuto tutti i record impegnati prima di quel timestamp A e possono iniziare a elaborare i record nel buffer, ad esempio ordinando i record tra partizioni per timestamp e raggruppandoli per server_transaction_id.

Un record heartbeat contiene un solo campo:

GoogleSQL

Campo Tipo Descrizione
timestamp TIMESTAMP Il timestamp dell'heartbeat.

PostgreSQL

Campo Tipo Descrizione
timestamp STRING Il timestamp dell'heartbeat.

Un esempio di record Heartbeat, che comunica che sono stati restituiti tutti i record con timestamp minori o uguali al timestamp di questo record:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Record di partizioni figlio

Un record delle partizioni figlio restituisce informazioni sulle partizioni figlio: i token di partizione, i token delle partizioni padre e start_timestamp, che rappresenta il primo timestamp per cui le partizioni figlio contengono i record delle modifiche. I record con timestamp di commit immediatamente precedenti a child_partitions_record.start_timestamp vengono restituiti nella partizione attuale. Dopo aver restituito tutti i record delle partizioni figlio per questa partizione, questa query restituirà uno stato riuscito, a indicare che tutti i record sono stati restituiti per questa partizione.

I campi di un record di partizioni figlio includono quanto segue:

GoogleSQL

Campo Tipo Descrizione
start_timestamp TIMESTAMP I record di modifiche dei dati restituiti dalle partizioni figlio in questo record della partizione figlio hanno un timestamp di commit maggiore o uguale a start_timestamp. Quando esegui una query su una partizione figlio, la query deve specificare il token della partizione figlio e un valore start_timestamp maggiore o uguale a child_partitions_token.start_timestamp. Tutti i record delle partizioni figlio restituiti da una partizione hanno lo stesso start_timestamp e il timestamp rientra sempre tra i valori start_timestamp specificati della query e end_timestamp.
record_sequence STRING Un numero di sequenza crescente che può essere utilizzato per definire l'ordine del record delle partizioni figlio quando vengono restituiti più record di partizioni figlio con lo stesso start_timestamp in una particolare partizione. Il token di partizione, start_timestamp e record_sequence, identificano in modo univoco un record delle partizioni figlio.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Restituisce un insieme di partizioni figlio e le informazioni associate. Questo include la stringa del token di partizione utilizzata per identificare la partizione figlio nelle query, nonché i token delle partizioni padre.

PostgreSQL

Campo Tipo Descrizione
start_timestamp STRING I record di modifiche dei dati restituiti dalle partizioni figlio in questo record delle partizioni figlio hanno un timestamp di commit maggiore o uguale a start_timestamp. Quando si esegue una query su una partizione figlio, la query deve specificare il token della partizione figlio e un valore start_timestamp maggiore o uguale a child_partitions_token.start_timestamp. Tutti i record delle partizioni figlio restituiti da una partizione hanno lo stesso start_timestamp e il timestamp rientra sempre tra i valori start_timestamp e end_timestamp specificati della query.
record_sequence STRING Un numero di sequenza crescente che può essere utilizzato per definire l'ordine del record delle partizioni figlio quando vengono restituiti più record di partizioni figlio con lo stesso start_timestamp in una particolare partizione. Il token di partizione, start_timestamp e record_sequence, identificano in modo univoco un record delle partizioni figlio.
child_partitions

[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Restituisce un array di partizioni figlio e le relative informazioni associate. Questo include la stringa del token di partizione utilizzata per identificare la partizione figlio nelle query, nonché i token delle partizioni padre.

Di seguito è riportato un esempio di record di partizione figlio:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Flusso di lavoro delle query relative alle modifiche in tempo reale

Esegui query di modifiche in tempo reale utilizzando l'API ExecuteStreamingSql, con una transazione di sola lettura monouso e un timestamp vincolato elevato. La funzione di lettura delle modifiche in tempo reale consente di specificare start_timestamp e end_timestamp per l'intervallo di tempo che ti interessa. Tutti i record delle modifiche all'interno del periodo di conservazione sono accessibili mediante il timestamp di sola lettura efficace.

Tutti gli altri TransactionOptions non sono validi per le query di modifiche in tempo reale. Inoltre, se TransactionOptions.read_only.return_read_timestamp è impostato su true, nel messaggio Transaction che descrive la transazione verrà restituito un valore speciale pari a kint64max - 1, anziché un timestamp di lettura valido. Questo valore speciale deve essere ignorato e non utilizzato per eventuali query successive.

Ogni query in tempo reale può restituire un numero qualsiasi di righe, ognuna delle quali contiene un record di modifiche dei dati, un record di heartbeat o un record delle partizioni figlio. Non è necessario impostare una scadenza per la richiesta.

Esempio:

Il flusso di lavoro delle query in modalità flusso inizia con l'invio della prima query del flusso di modifiche specificando partition_token in NULL. La query deve specificare la funzione di lettura per il flusso di modifiche, il timestamp di inizio e di fine di interesse e l'intervallo di battito cardiaco. Quando end_timestamp è NULL, la query continua a restituire le modifiche ai dati fino al termine della partizione.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Elabora i record di dati di questa query finché i record delle partizioni figlio non vengono restituiti. Nell'esempio seguente, vengono restituiti due record di partizioni figlio e tre token di partizione, quindi la query termina. I record di partizioni figlio di una query specifica condividono sempre lo stesso start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Per elaborare le modifiche future dopo il giorno 2022-05-01T09:00:01Z, crea tre nuove query ed eseguile in parallelo. Insieme, le tre query restituiscono le modifiche future ai dati per lo stesso intervallo di chiavi coperto dall'elemento padre. Imposta sempre start_timestamp su start_timestamp nello stesso record di partizione figlio e utilizza lo stesso intervallo di end_timestamp e di heartbeat per elaborare i record in modo coerente su tutte le query.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Dopo un po' di tempo, la query su child_token_2 termina dopo aver restituito un altro record di partizione figlio. Questo record indica che una nuova partizione coprirà le modifiche future sia per child_token_2 che per child_token_3 a partire dal giorno 2022-05-01T09:30:15Z. La query su child_token_3 restituirà lo stesso identico record, perché entrambe sono le partizioni padre del nuovo child_token_4. Per garantire un trattamento ordinato rigoroso dei record di dati per una determinata chiave, la query su child_token_4 deve iniziare solo al termine di tutti gli elementi principali, ovvero in questo caso child_token_2 e child_token_3. Crea una sola query per ogni token di partizione figlio. La progettazione del flusso di lavoro delle query deve nominare un elemento padre che attenda e pianifica la query su child_token_4.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Puoi trovare esempi di gestione e analisi dei record di modifiche in tempo reale nel connettore Dataflow SpannerIO di Apache Beam su GitHub.