Modifica partizioni, record e query dei flussi di dati

Questa pagina descrive in dettaglio i seguenti attributi degli stream di modifiche:

  • Il suo modello di partizionamento basato su suddivisione
  • Il formato e i contenuti dei record dello stream di modifiche
  • La sintassi di basso livello utilizzata per eseguire query su questi record
  • Esempio di flusso di lavoro delle query

Le informazioni riportate in questa pagina sono più pertinenti per l'utilizzo dell'API Spanner per eseguire query direttamente sui flussi di modifiche. Le applicazioni che invece utilizzano Dataflow per leggere i dati dei stream di modifiche non devono lavorare direttamente con il modello di dati descritto qui.

Per una guida introduttiva più ampia agli stream di modifiche, consulta la Panoramica degli stream di modifiche.

Partizioni delle modifiche in tempo reale

Quando si verifica una modifica in una tabella controllata da un flusso di modifiche, Spanner scrive un record di modifiche in tempo reale corrispondente nel database, in modo sincrono nella stessa transazione con la modifica dei dati. Questo garantisce che, se la transazione ha esito positivo, Spanner ha anche ha acquisito correttamente e ha mantenuto la modifica. Internamente, Spanner colloca nello stesso luogo il record del flusso di modifiche e la modifica dei dati in modo che vengano elaborati dallo stesso server per ridurre al minimo il sovraccarico di scrittura.

Nell'ambito del DML per una determinata suddivisione, Spanner appende la scrittura alla suddivisione dei dati dello stream di modifiche corrispondente nella stessa transazione. A causa di questa colocation, cambia I flussi di dati non aggiungono ulteriore coordinazione tra le risorse di distribuzione, il che riduce al minimo l'overhead del commit della transazione.

immagine

Spanner si espande suddividendo e unendo dinamicamente i dati in base al carico e alle dimensioni del database e distribuendo le suddivisioni tra le risorse di pubblicazione.

A abilita le scritture e le letture dei modifiche in tempo reale per la scalabilità, Spanner e unisce la memoria interna delle modifiche in tempo reale ai dati del database, evitando automaticamente gli hotspot. Per supportare la lettura dei record del flusso di modifiche in quasi tempo reale man mano che le scritture del database vengono scalate, l'API Spanner è progettata per consentire di eseguire query su un flusso di modifiche contemporaneamente utilizzando le partizioni del flusso di modifiche. Le partizioni del flusso di modifiche mappano le suddivisioni dei dati del flusso di modifiche che contengono i record del flusso di modifiche. Una modifica alle partizioni di una modifica in tempo reale in modo dinamico nel tempo e sono correlati al modo in cui Spanner divide e unisce dinamicamente i dati del database.

Una partizione del flusso di modifiche contiene record per un intervallo di chiavi immutabili per un intervallo di tempo specifico. Qualsiasi partizione di modifiche in tempo reale può essere suddivisa in una o più partizioni di modifiche in tempo reale oppure unite ad altre partizioni di flusso di modifiche. Quando questi che si verificano eventi di suddivisione o unione, le partizioni secondarie vengono create per acquisire le modifiche per i rispettivi intervalli di chiavi immutabili per l'intervallo di tempo successivo. Inoltre, ai record delle modifiche ai dati, una query di modifica in tempo reale restituisce i record di partizione figlio notifica ai lettori le nuove partizioni delle modifiche in tempo reale su cui è necessario eseguire query; come record di heartbeat per indicare l'avanzamento in avanti quando non sono state eseguite scritture di recente.

Quando si esegue una query su una particolare partizione di modifiche in tempo reale, i record delle modifiche vengono restituiti nell'ordine del timestamp di commit. Ogni record di modifica viene restituito esattamente una volta. Tra le partizioni di modifiche in tempo reale, non è garantito l'ordine delle modifiche record. I record delle modifiche per una determinata chiave primaria vengono restituiti solo in una per un determinato intervallo di tempo.

A causa della derivazione della partizione padre-figlio, al fine di elaborare le modifiche per un chiave specifica nell'ordine del timestamp di commit, i record restituiti devono essere elaborate solo dopo i record di tutte le partizioni padre partizioni sono state elaborate.

Modificare le funzioni di lettura delle modifiche in tempo reale e la sintassi delle query

GoogleSQL

Per eseguire query sulle modifiche in tempo reale, puoi utilizzare ExecuteStreamingSql tramite Google Cloud CLI o tramite l'API Compute Engine. Spanner crea automaticamente una funzione di lettura speciale con le modifiche in tempo reale. La funzione di lettura fornisce accesso alla modifica i record dello stream. La convenzione di denominazione delle funzioni di lettura READ_change_stream_name.

Presumendo che nel database esista un flusso di modifiche SingersNameStream, la sintassi delle query per GoogleSQL è la seguente:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

La funzione di lettura accetta i seguenti argomenti:

Nome argomento Tipo Obbligatorio? Descrizione
start_timestamp TIMESTAMP Obbligatorio Specifica che i record con commit_timestamp maggiore o uguale a start_timestamp da restituire. Il valore deve rientrare nel periodo di conservazione del stream di modifiche, deve essere minore o uguale all'ora corrente e maggiore o uguale al timestamp della creazione dello stream di modifiche.
end_timestamp TIMESTAMP Facoltativo (valore predefinito: NULL) Specifica che devono essere restituiti i record con commit_timestamp minore o uguale a end_timestamp. Il valore deve rientrare nel periodo di conservazione del stream di modifiche e deve essere maggiore o uguale a start_timestamp. La query termina dopo aver restituito tutti i record ChangeRecord fino a end_timestamp o dopo che l'utente ha terminato la connessione. Se NULL o meno specificato, la query viene eseguita finché non vengono restituiti tutti i ChangeRecord o dall'utente termina la connessione.
partition_token STRING Facoltativo (valore predefinito: NULL) Specifica la partizione dello stream di modifiche su cui eseguire query in base ai contenuti dei record delle partizioni secondarie. Se NULL o non specificato, significa che Reader sta eseguendo una query sul flusso di modifiche per la prima volta e ha non hai ottenuto token di partizione specifici da cui eseguire la query.
heartbeat_milliseconds INT64 Obbligatorio Determina la frequenza con cui viene restituito un ChangeRecord di heartbeat nel caso in cui non ci siano transazioni con carico in questa partizione. Il valore deve essere compreso tra 1,000 (un secondo) e 300,000 (cinque minuti).
read_options ARRAY Facoltativo (valore predefinito: NULL) Opzioni di lettura aggiuntive riservate per l'uso futuro. Attualmente, l'unico valore consentito è NULL.

Ti consigliamo di creare un metodo di utilità per creare il testo della query della funzione di lettura e associarvi i parametri, come mostrato nell'esempio seguente.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Esegui query sui flussi di variazioni utilizzando l'API ExecuteStreamingSql. Spanner crea automaticamente una funzione di lettura speciale insieme allo stream delle modifiche. La funzione di lettura fornisce accesso alla modifica i record dello stream. La convenzione di denominazione della funzione di lettura è spanner.read_json_change_stream_name.

Presumendo che nel database esista un flusso di modifiche SingersNameStream, la sintassi delle query per PostgreSQL è la seguente:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

La funzione di lettura accetta i seguenti argomenti:

Nome argomento Tipo Obbligatorio? Descrizione
start_timestamp timestamp with time zone Obbligatorio Specifica che i record di modifica con commit_timestamp maggiore o uguale a start_timestamp da restituire. Il valore deve rientrare nel periodo di conservazione del stream di modifiche, deve essere minore o uguale all'ora corrente e maggiore o uguale al timestamp della creazione dello stream di modifiche.
end_timestamp timestamp with timezone Facoltativo (valore predefinito: NULL) Specifica che devono essere restituiti i record di variazione con commit_timestamp minore o uguale a end_timestamp. Il valore deve rientrare nella conservazione delle modifiche in tempo reale e maggiore o uguale al start_timestamp. La query viene completata dopo aver restituito tutti i record delle modifiche fino a end_timestamp o l'utente interrompe la connessione. Se NULL, la query viene eseguita finché non vengono restituiti tutti i record delle modifiche o l'utente interrompe la connessione.
partition_token text Facoltativo (valore predefinito: NULL) Specifica la partizione di modifiche in tempo reale su cui eseguire la query, in base alla contenuto delle partizioni figlio record. Se è NULL o non è specificato, significa che il lettore sta eseguendo una query sul flusso di modifiche per la prima volta e non ha ottenuto alcun token di partizione specifico da cui eseguire query.
heartbeat_milliseconds bigint Obbligatorio Determina la frequenza con cui viene restituito un ChangeRecord di heartbeat nel caso in cui non ci siano transazioni con carico in questa partizione. Il valore deve essere compreso tra 1,000 (un secondo) e 300,000 (cinque minuti).
null null Obbligatorio Riservato per l'uso futuro

Consigliamo di utilizzare un metodo pratico per creare il testo del tag la funzione di lettura e i parametri di associazione, come mostrato di seguito esempio.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Formato di record delle modifiche in tempo reale

GoogleSQL

La funzione di lettura degli stream di variazioni restituisce una singola colonna ChangeRecord di tipo ARRAY<STRUCT<...>>. In ogni riga, questo array contiene sempre un singolo elemento.

Gli elementi dell'array hanno il seguente tipo:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Lo struct contiene tre campi: data_change_record, heartbeat_record e child_partitions_record, ciascuno del tipo ARRAY<STRUCT<...>>. In qualsiasi riga restituita dalla funzione di lettura dello stream di modifiche, solo uno di questi tre campi contiene un valore; gli altri due sono vuoti o NULL. Questi campi di array contengono al massimo un elemento.

Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.

PostgreSQL

La funzione di lettura degli stream di variazioni restituisce una singola colonna ChangeRecord di tipo JSON con la seguente struttura:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

Questo oggetto contiene tre possibili chiavi: data_change_record, heartbeat_record e child_partitions_record. Il tipo di valore corrispondente è JSON. In qualsiasi riga restituita dalla funzione di lettura del flusso di modifiche, esiste solo una di queste tre chiavi.

Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.

Record delle modifiche dei dati

Un record di modifica dei dati contiene un insieme di modifiche a una tabella con lo stesso tipo di modifica (inserimento, aggiornamento o eliminazione) eseguito nello stesso timestamp di commit in una partizione dello stream di modifiche per la stessa transazione. È possibile restituire più record di variazione dei dati per la stessa transazione in più partizioni del flusso di variazioni.

Tutti i record delle modifiche ai dati hanno commit_timestamp, server_transaction_id, e record_sequence, che insieme determinano l'ordine della modifica "Stream" per registrare lo stream. Questi tre campi sono sufficienti per dedurre l'ordine delle modifiche e garantire la coerenza esterna.

Tieni presente che più transazioni possono avere lo stesso timestamp di commit se toccano dati non sovrapposti. Il campo server_transaction_id offre la possibilità di distinguere quale insieme di modifiche (potenzialmente tra le partizioni dello stream di modifiche) è stato emesso all'interno della stessa transazione. Accoppiandolo con record_sequence e I campi number_of_records_in_transaction consentono di eseguire il buffering e l'ordinamento tutti i record di una determinata transazione.

I campi di un record di modifica dei dati includono quanto segue:

GoogleSQL

Campo Tipo Descrizione
commit_timestamp TIMESTAMP Timestamp in cui è stato eseguito il commit della modifica.
record_sequence STRING Il numero di sequenza del record all'interno della transazione. I numeri di sequenza sono garantiti unici e in aumento monotonico (ma non necessariamente contigui) all'interno di una transazione. Ordina i record per lo stesso server_transaction_id in base a record_sequence per ricostruire l'ordine delle modifiche all'interno della transazione. Questo ordinamento potrebbe essere ottimizzato da Spanner per prestazioni migliori e potrebbe non corrispondere sempre a quello originale fornito dagli utenti.
server_transaction_id STRING Una stringa univoca a livello globale che rappresenta la transazione in cui è stata eseguita la modifica. Il valore deve essere nell'elaborazione dei record delle modifiche in tempo reale e non sono è correlato all'ID transazione nell'API Spanner.
is_last_record_in_transaction_in_partition BOOL Indica se si tratta dell'ultimo record di una transazione nella partizione corrente.
table_name STRING Nome della tabella interessata dalla modifica.
value_capture_type STRING

Descrive il tipo di acquisizione del valore specificato nel configurazione delle modifiche in tempo reale al momento dell'acquisizione di questa modifica.

Il tipo di acquisizione valore può essere "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" o "NEW_ROW_AND_OLD_VALUES". Per impostazione predefinita, è "OLD_AND_NEW_VALUES". Per ulteriori informazioni, consulta i tipi di acquisizione dei valori.

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
Il nome della colonna, il tipo di colonna se si tratta di una chiave primaria e la posizione della colonna definita nello schema ("ordinal_position"). La prima colonna di una tabella nello schema avrebbe la posizione ordinale "1". Il tipo di colonna possono essere nidificati per le colonne di array. Il formato corrisponde alla struttura del tipo descritto nel riferimento per l'API Spanner.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Descrive le modifiche apportate, inclusa la chiave primaria valori, i vecchi valori e i nuovi valori delle colonne modificate o monitorate. La disponibilità e il contenuto dei valori vecchi e nuovi dipenderanno dal valore configurato value_capture_type. I campi new_values e old_values contengono solo le colonne non chiave.
mod_type STRING Descrive il tipo di modifica. Uno tra INSERT, UPDATE o DELETE.
number_of_records_in_transaction INT64 Il numero di record delle modifiche dei dati che fanno parte di questo in tutte le partizioni delle modifiche in tempo reale.
number_of_partitions_in_transaction INT64 Il numero di partizioni che restituirà i record delle modifiche dei dati per questa transazione.
transaction_tag STRING Tag transazione associato a questa transazione.
is_system_transaction BOOL Indica se la transazione è di sistema.

PostgreSQL

Campo Tipo Descrizione
commit_timestamp STRING Il timestamp in cui è stato eseguito il commit della modifica.
record_sequence STRING Il numero di sequenza del record all'interno della transazione. I numeri di sequenza sono garantiti unici e in aumento monotonico (ma non necessariamente contigui) all'interno di una transazione. Ordina i record per lo stesso 'attributo "server_transaction_id" in base a "record_sequence" per ricostruire l'ordine delle modifiche all'interno della transaction.
server_transaction_id STRING Una stringa univoca a livello globale che rappresenta la transazione in cui è stata eseguita la modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record dello stream di modifiche e non è correlato all'ID transazione nell'API Spanner
is_last_record_in_transaction_in_partition BOOLEAN Indica se si tratta dell'ultimo record di una transazione nella partizione corrente.
table_name STRING Nome della tabella interessata dalla modifica.
value_capture_type STRING

Descrive il tipo di acquisizione del valore specificato nel configurazione delle modifiche in tempo reale al momento dell'acquisizione di questa modifica.

Il tipo di acquisizione valore può essere "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" o "NEW_ROW_AND_OLD_VALUES". Il valore predefinito è "OLD_AND_NEW_VALUES". Per ulteriori informazioni, consulta i tipi di acquisizione dei valori.

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
Il nome della colonna, il tipo di colonna se si tratta di una chiave primaria e la posizione della colonna definita nello schema ("ordinal_position"). La prima colonna di una tabella nello schema avrebbe la posizione ordinale "1". Il tipo di colonna possono essere nidificati per le colonne di array. Il formato corrisponde alla struttura del tipo descritta nel riferimento all'API Spanner.
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Descrive le modifiche apportate, inclusi i valori della chiave primaria, i valori precedenti e i nuovi valori delle colonne modificate o monitorate. La disponibilità e il contenuto dei valori vecchi e nuovi dipenderanno sul tipo configurato value_capture_type. I campi new_values e old_values contengono solo le colonne non chiave.
mod_type STRING Descrive il tipo di modifica. Uno tra INSERT, UPDATE o DELETE.
number_of_records_in_transaction INT64 Il numero di record delle modifiche dei dati che fanno parte di questo in tutte le partizioni delle modifiche in tempo reale.
number_of_partitions_in_transaction NUMBER Il numero di partizioni che restituiranno i record di variazione dei dati per questa transazione.
transaction_tag STRING Tag transazione associato a questa transazione.
is_system_transaction BOOLEAN Indica se la transazione è una transazione di sistema.

Di seguito è riportata una coppia di record di modifica dei dati di esempio. Descrivono una singola transazione in cui è presente un trasferimento tra due account. Tieni presente che i due account si trovano in partizioni di stream di modifiche separate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

Il seguente record di modifiche dei dati è un esempio di record con il valore tipo di acquisizione "NEW_VALUES". Tieni presente che vengono compilati solo i nuovi valori. È stata modificata solo la colonna "LastUpdate", quindi solo quella colonna è stato restituito.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Il seguente record di modifiche dei dati è un esempio di record con il valore tipo di acquisizione "NEW_ROW". Solo "LastUpdate" è stata modificata, ma vengono restituite tutte le colonne monitorate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Il seguente record di modifica dei dati è un esempio di record con il tipo di acquisizione del valore "NEW_ROW_AND_OLD_VALUES". Solo "LastUpdate" è stata modificata, ma vengono restituite tutte le colonne monitorate. Questo tipo di acquisizione del valore acquisisce il nuovo valore e il valore precedente di LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Record heartbeat

Quando viene restituito un record heartbeat, indica che tutte le modifiche concommit_timestamp inferiore o uguale atimestamp del record heartbeat sono state restituite e che i record di dati futuri in questa partizione devono avere timestamp di commit superiori a quelli restituiti dal record heartbeat. I record di heartbeat vengono restituiti quando non sono disponibili dati modifiche scritte in una partizione. Quando sono presenti modifiche ai dati la partizione, puoi usare data_change_record.commit_timestamp di heartbeat_record.timestamp per indicare che il lettore sta progredendo l'avanzamento della lettura della partizione.

Puoi usare record di heartbeat restituiti sulle partizioni per eseguire la sincronizzazione lettori in tutte le partizioni. Una volta che tutti i lettori hanno ricevuto un heartbeat maggiore o uguale a un timestamp A o hanno ricevuto dati o record di partizioni secondarie maggiori o uguali al timestamp A, sanno di aver ricevuto tutti i record committati prima o in quel timestamp A e possono iniziare a elaborare i record in buffer, ad esempio ordinare i record tra partizioni in base al timestamp e raggrupparli per server_transaction_id.

Un record heartbeat contiene un solo campo:

GoogleSQL

Campo Tipo Descrizione
timestamp TIMESTAMP Il timestamp del record heartbeat.

PostgreSQL

Campo Tipo Descrizione
timestamp STRING Il timestamp del record heartbeat.

Un record di heartbeat di esempio, in cui tutti i record con timestamp è stato restituito un valore minore o uguale al timestamp di questo record:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Record di partizioni secondarie

Un record di partizioni figlio restituisce informazioni sulle partizioni figlio: i token di partizione, i token delle partizioni principali e start_timestamp che rappresenta il primo timestamp che l'account secondario contengono record di modifiche. I record i cui timestamp di commit sono immediatamente precedenti a child_partitions_record.start_timestamp vengono restituito nella partizione corrente. Dopo aver restituito tutti i record delle partizioni secondarie per questa partizione, questa query restituirà uno stato di esito positivo, indicando che tutti i record sono stati restituiti per questa partizione.

I campi di un record delle partizioni secondarie includono quanto segue:

GoogleSQL

Campo Tipo Descrizione
start_timestamp TIMESTAMP Record delle modifiche dei dati restituiti da un account bambino le partizioni di questo record di partizione figlio hanno un timestamp di commit maggiore o uguale a start_timestamp. Quando esegui una query su una partizione figlio, specifica il token di partizione figlio e un valore start_timestamp maggiore o uguale a child_partitions_token.start_timestamp. Tutti i record delle partizioni secondarie restituiti da una partizione hanno lo stesso start_timestamp e il timestamp rientra sempre tra start_timestamp e end_timestamp specificati nella query.
record_sequence STRING Una sequenza monotonica crescente numero che può essere utilizzato per definire l'ordine dei di partizioni figlio, quando sono presenti record delle partizioni secondarie restituiti con lo stesso start_timestamp in un particolare partizione. Il token partizione, start_timestamp e record_sequence identificano in modo univoco un record delle partizioni secondarie.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Restituisce un insieme di partizioni figlio e le informazioni associate. Include la stringa del token di partizione utilizzata per identificare l'elemento figlio partizionata nelle query, nonché i token del partizioni di Compute Engine.

PostgreSQL

Campo Tipo Descrizione
start_timestamp STRING Record delle modifiche dei dati restituiti da un account bambino le partizioni in questo record di partizioni figlio hanno un timestamp di commit maggiore o uguale a start_timestamp. Quando invii una query a un account bambino la partizione, la query deve specificare il token di partizione figlio e start_timestamp maggiore o uguale a child_partitions_token.start_timestamp. Tutte le partizioni figlio i record restituiti da una partizione hanno lo stesso start_timestamp e il timestamp è sempre compreso tra gli attributi start_timestamp e la durata della query end_timestamp.
record_sequence STRING Una sequenza monotonica crescente numero che può essere utilizzato per definire l'ordine dei di partizioni figlio, quando sono presenti record delle partizioni secondarie restituiti con lo stesso start_timestamp in un particolare partizione. Il token partizione, start_timestamp e record_sequence identificano in modo univoco un record delle partizioni secondarie.
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Restituisce un array di partizioni secondarie e le relative informazioni associate. Include la stringa del token di partizione utilizzata per identificare l'elemento figlio partizionata nelle query, nonché i token del partizioni di Compute Engine.

Di seguito è riportato un esempio di record della partizione secondaria:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Flusso di lavoro delle query sui flussi di modifiche

Esegui query sui flussi di variazioni utilizzando l'API ExecuteStreamingSql, con una transazione di sola lettura monouso e un limite di timestamp rigoroso. Il cambiamento la funzione di lettura del flusso consente di specificare start_timestamp e end_timestamp per l'intervallo di tempo che ti interessa. Tutti i record delle modifiche entro il periodo di conservazione sono accessibili tramite l'efficace al timestamp associato.

Tutti gli altri TransactionOptions non sono validi per le query sui flussi di variazioni. Inoltre, se TransactionOptions.read_only.return_read_timestamp è impostato su true, nel messaggio Transaction che descrive la transazione verrà restituito un valore speciale kint64max - 1 anziché un timestamp di lettura valido. Questo valore speciale deve essere scartato e non deve essere utilizzato per per le query successive.

Ogni query dello stream di modifiche può restituire un numero qualsiasi di righe, ciascuna contenente un record di modifica dei dati, un record di heartbeat o un record delle partizioni secondarie. Non è necessario impostare una scadenza per la richiesta.

Esempio:

Il flusso di lavoro delle query in streaming inizia con l'emissione della prima query sul flusso di modifiche specificando partition_token e NULL. La query deve specificare la funzione di lettura per il flusso di modifiche, il timestamp di inizio e di fine di interesse e l'intervallo di battito cardiaco. Quando il valore di end_timestamp è NULL, la query mantiene restituendo modifiche ai dati fino al termine della partizione.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Elabora i record di dati di questa query finché i record di partizione figlio non vengono restituito. Nell'esempio seguente, due record di partizione figlio e tre partizioni vengono restituiti i token, quindi la query viene terminata. I record della partizione secondaria di una query specifica condividono sempre lo stesso start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Per elaborare le modifiche future dopo il giorno 2022-05-01T09:00:01Z, crea tre nuove query ed eseguile in parallelo. Le tre query insieme restituiscono le modifiche future dei dati per lo stesso intervallo di chiavi coperto dalla query principale. Imposta sempre start_timestamp a start_timestamp nello stesso record di partizione figlio e utilizza lo stesso intervallo end_timestamp e heartbeat per elaborare i record in modo coerente in tutte le query.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Dopo un po' di tempo, la query su child_token_2 termina dopo aver restituito un altro record della partizione secondaria. Questi record indicano che una nuova partizione coverà le modifiche future sia per child_token_2 che per child_token_3 a partire da 2022-05-01T09:30:15Z. La query su child_token_3 restituirà lo stesso record esatto, perché entrambe sono le partizioni principali del nuovo child_token_4. Per garantire un trattamento rigoroso e ordinato dei record di dati per una determinata chiave, la query su child_token_4 deve iniziare solo al termine di tutti gli elementi padre, che in questo caso sono child_token_2 e child_token_3. Crea una sola query per ogni token della partizione secondaria. Il design del flusso di lavoro delle query deve designare un amministratore per attendere e pianificare la query su child_token_4.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Puoi trovare esempi di gestione e analisi dei record dei flussi di variazioni nel connettore Apache Beam SpannerIO Dataflow su GitHub.