Esta página descreve as streams de alterações no Spanner para bases de dados com dialeto GoogleSQL e bases de dados com dialeto PostgreSQL, incluindo:
- O modelo de particionamento baseado na divisão
- O formato e o conteúdo dos registos da stream de alterações
- A sintaxe de baixo nível usada para consultar esses registos
- Um exemplo do fluxo de trabalho de consulta
Usa a API Spanner para consultar streams de alterações diretamente. As aplicações que, em alternativa, usam o Dataflow para ler dados de fluxo de alterações não precisam de trabalhar diretamente com o modelo de dados descrito aqui.
Para um guia de introdução mais abrangente aos fluxos de alterações, consulte a vista geral dos fluxos de alterações.
Altere as partições da stream
Quando ocorre uma alteração numa tabela monitorizada por um fluxo de alterações, o Spanner escreve um registo de fluxo de alterações correspondente na base de dados, de forma síncrona na mesma transação que a alteração de dados. Isto significa que, se a transação for bem-sucedida, o Spanner também capturou e persistiu a alteração com êxito. Internamente, o Spanner localiza conjuntamente o registo da stream de alterações e a alteração de dados, para que sejam processados pelo mesmo servidor, de modo a minimizar a sobrecarga de escrita.
Como parte da DML para uma divisão específica, o Spanner anexa a gravação à divisão de dados do fluxo de alterações correspondente na mesma transação. Devido a esta colocação conjunta, as streams de alterações não adicionam coordenação adicional nos recursos de publicação, o que minimiza a sobrecarga de confirmação de transações.
O Spanner é dimensionado dividindo e unindo dinamicamente os dados com base na carga e no tamanho da base de dados, e distribuindo as divisões pelos recursos de publicação.
Para permitir que as escritas e as leituras de streams de alterações sejam dimensionadas, o Spanner divide e une o armazenamento interno de streams de alterações juntamente com os dados da base de dados, evitando automaticamente hotspots. Para suportar a leitura de registos de streams de alterações quase em tempo real à medida que as escritas na base de dados são dimensionadas, a API Spanner foi concebida para que uma stream de alterações seja consultada em simultâneo através de partições de streams de alterações. Altere o mapeamento das partições da stream para alterar as divisões de dados da stream que contêm os registos da stream de alterações. As partições de um fluxo de alterações mudam dinamicamente ao longo do tempo e estão correlacionadas com a forma como o Spanner divide e une dinamicamente os dados da base de dados.
Uma partição de stream de alterações contém registos para um intervalo de chaves imutável para um intervalo de tempo específico. Qualquer partição da stream de alterações pode ser dividida numa ou mais partições da stream de alterações ou ser unida com outras partições da stream de alterações. Quando ocorrem estes eventos de divisão ou união, são criadas partições secundárias para captar as alterações dos respetivos intervalos de chaves imutáveis para o intervalo de tempo seguinte. Além dos registos de alteração de dados, uma consulta de stream de alterações devolve registos de partição subordinada para notificar os leitores de novas partições de stream de alterações que precisam de ser consultadas, bem como registos de batimentos cardíacos para indicar o progresso quando não ocorreram gravações recentemente.
Quando consulta uma partição específica da stream de alterações, os registos de alterações são devolvidos por ordem da data/hora de confirmação. Cada registo de alterações é devolvido exatamente uma vez. Nas partições da stream de alterações, a ordenação dos registos de alterações não está garantida. Os registos de alterações de uma determinada chave primária são devolvidos apenas numa partição para um determinado intervalo de tempo.
Devido à linhagem de partições principal-secundária, para processar alterações para uma chave específica por ordem de data/hora de confirmação, os registos devolvidos das partições secundárias devem ser processados apenas após o processamento dos registos de todas as partições principais.
Altere as funções de leitura de streams e a sintaxe de consulta
GoogleSQL
Para consultar streams de alterações, use a API
ExecuteStreamingSql
. O Spanner cria automaticamente uma função de leitura especial
juntamente com a stream de alterações. A função de leitura dá acesso aos registos da stream de alterações. A convenção de nomenclatura da função de leitura é
READ_change_stream_name
.
Partindo do princípio de que existe uma stream de alterações SingersNameStream
na base de dados, a sintaxe de consulta para o GoogleSQL é a seguinte:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
A função de leitura aceita os seguintes argumentos:
Nome do argumento | Tipo | Obrigatório? | Descrição |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obrigatória | Especifica que devem ser devolvidos os registos com commit_timestamp superior ou igual a start_timestamp . O valor tem de estar dentro do período de retenção do fluxo de alterações e deve ser inferior ou igual à hora atual e superior ou igual à data/hora de criação do fluxo de alterações. |
end_timestamp |
TIMESTAMP |
Opcional (predefinição: NULL ) |
Especifica que devem ser devolvidos os registos com um valor commit_timestamp inferior ou igual a end_timestamp . O valor tem de estar dentro do período de retenção do fluxo de alterações e ser superior ou igual a start_timestamp . A consulta termina após devolver todos os resultados até ao limite de end_timestamp ou quando termina a ligação.ChangeRecords Se end_timestamp estiver definido como NULL
ou não estiver especificado, a consulta continua a execução até serem devolvidos todos os
ChangeRecords ou até terminar a ligação. |
partition_token |
STRING |
Opcional (predefinição: NULL ) |
Especifica que partição do fluxo de alterações deve ser consultada, com base no conteúdo dos registos de partições secundárias. Se for NULL ou não for especificado, significa que o leitor está a consultar o fluxo de alterações pela primeira vez e não obteve tokens de partição específicos a partir dos quais consultar. |
heartbeat_milliseconds |
INT64 |
Obrigatória | Determina a frequência com que um sinal de pulsação ChangeRecord é devolvido caso não existam transações confirmadas nesta partição.
O valor tem de estar entre 1,000 (um segundo) e
300,000 (cinco minutos). |
read_options |
ARRAY |
Opcional (predefinição: NULL ) |
Adiciona opções de leitura reservadas para utilização futura. O único valor permitido é NULL . |
Recomendamos que crie um método auxiliar para criar o texto da consulta da função de leitura e associar parâmetros ao mesmo, conforme mostrado no exemplo seguinte.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Para consultar streams de alterações, use a API
ExecuteStreamingSql
. O Spanner cria automaticamente uma função de leitura especial
juntamente com a stream de alterações. A função de leitura dá acesso aos registos da stream de alterações. A convenção de nomenclatura da função de leitura é
spanner.read_json_change_stream_name
.
Partindo do princípio de que existe uma stream de alterações SingersNameStream
na base de dados, a sintaxe de consulta para o PostgreSQL é a seguinte:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
A função de leitura aceita os seguintes argumentos:
Nome do argumento | Tipo | Obrigatório? | Descrição |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obrigatória | Especifica que devem ser devolvidos os registos de alterações com commit_timestamp
superior ou igual a start_timestamp . O valor tem de estar dentro do período de retenção do fluxo de alterações e deve ser inferior ou igual à hora atual e superior ou igual à data/hora de criação do fluxo de alterações. |
end_timestamp |
timestamp with timezone |
Opcional (predefinição: NULL ) |
Especifica que devem ser devolvidos os registos de alterações com commit_timestamp
inferior ou igual a end_timestamp . O valor tem de estar dentro do período de retenção do fluxo de alterações e ser superior ou igual a start_timestamp .
A consulta termina depois de devolver todos os registos de alterações até à data/hora end_timestamp ou até terminar a ligação.
Se NULL , a consulta continua a execução até serem devolvidos todos os registos de alterações ou até terminar a ligação. |
partition_token |
text |
Opcional (predefinição: NULL ) |
Especifica que partição do fluxo de alterações deve ser consultada, com base no conteúdo dos registos de partições secundárias. Se for NULL ou não for especificado, significa que o leitor está a consultar o fluxo de alterações pela primeira vez e não obteve tokens de partição específicos a partir dos quais consultar. |
heartbeat_milliseconds |
bigint |
Obrigatória | Determina a frequência com que um heartbeat ChangeRecord é
devolvido quando não existem transações comprometidas nesta
partição.
O valor tem de estar entre 1,000 (um segundo) e
300,000 (cinco minutos). |
null |
null |
Obrigatória | Reservado para utilização futura |
Recomendamos que crie um método auxiliar para criar o texto da função de leitura e associar-lhe parâmetros, conforme mostrado no exemplo seguinte.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Altere o formato de registo de streams
GoogleSQL
A função de leitura de streams de alterações devolve uma única coluna do tipo ARRAY<STRUCT<...>>
.ChangeRecord
Em cada linha, esta matriz contém sempre um único elemento.
Os elementos da matriz têm o seguinte tipo:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Existem três campos neste STRUCT
: data_change_record
,
heartbeat_record
e child_partitions_record
, cada um do tipo
ARRAY<STRUCT<...>>
. Em qualquer linha que a função de leitura do fluxo de alterações devolva, apenas um destes três campos contém um valor; os outros dois estão vazios ou são NULL
. Estes campos de matriz contêm, no máximo, um elemento.
As secções seguintes analisam cada um destes três tipos de registos.
PostgreSQL
A função de leitura de streams de alterações devolve uma única coluna ChangeRecord
do tipo JSON
com a seguinte estrutura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Existem três chaves possíveis neste objeto: data_change_record
,
heartbeat_record
e child_partitions_record
. O tipo de valor
correspondente é JSON
. Em qualquer linha que a função de leitura do fluxo de alterações devolva, existe apenas uma destas três chaves.
As secções seguintes analisam cada um destes três tipos de registos.
Registos de alterações de dados
Um registo de alteração de dados contém um conjunto de alterações a uma tabela com o mesmo tipo de modificação (inserção, atualização ou eliminação) confirmado na mesma data/hora de confirmação numa partição de fluxo de alterações para a mesma transação. Podem ser devolvidos vários registos de alterações de dados para a mesma transação em várias partições do fluxo de alterações.
Todos os registos de alterações de dados têm os campos commit_timestamp
, server_transaction_id
e record_sequence
, que, em conjunto, determinam a ordem na stream de alterações de um registo de stream. Estes três campos são suficientes para obter a ordem das alterações e fornecer consistência externa.
Tenha em atenção que várias transações podem ter a mesma data/hora de confirmação se
afetarem dados não sobrepostos. O campo server_transaction_id
oferece a capacidade de distinguir que conjunto de alterações (potencialmente em partições do fluxo de alterações) foram emitidas na mesma transação. A associação aos campos record_sequence
e number_of_records_in_transaction
também permite armazenar em buffer e ordenar todos os registos de uma transação específica.
Os campos de um registo de alteração de dados incluem o seguinte:
GoogleSQL
Campo | Tipo | Descrição |
---|---|---|
commit_timestamp |
TIMESTAMP |
Indica a data/hora em que a alteração foi confirmada. |
record_sequence |
STRING |
Indica o número de sequência do registo na transação.
Os números de sequência são únicos e aumentam monotonicamente (mas não são necessariamente contíguos) numa transação. Ordene os registos para o mesmo
server_transaction_id por record_sequence para
reconstruir a ordem das alterações na transação.
O Spanner pode otimizar esta ordenação para um melhor desempenho e pode nem sempre corresponder à ordenação original que fornece. |
server_transaction_id |
STRING |
Fornece uma string exclusiva a nível global que representa a transação na qual a alteração foi confirmada. O valor só deve ser usado no contexto do processamento de registos de fluxo de alterações e não está correlacionado com o ID da transação na API do Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica se este é o último registo de uma transação na partição atual. |
table_name |
STRING |
Nome da tabela afetada pela alteração. |
value_capture_type |
STRING |
Descreve o tipo de captura de valor especificado na configuração do fluxo de alterações quando esta alteração foi capturada. O tipo de captura de valor pode ser um dos seguintes:
Por predefinição, é |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indica o nome da coluna, o tipo de coluna, se é uma chave primária e a posição da coluna, conforme definido no esquema (ordinal_position ). A primeira coluna de uma tabela no esquema teria uma posição ordinal de 1 . O tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura de tipo descrita na referência da API Spanner.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Descreve as alterações feitas, incluindo os valores da chave principal, os valores antigos e os novos valores das colunas alteradas ou monitorizadas.
A disponibilidade e o conteúdo dos valores antigos e novos dependem do value_capture_type configurado. Os campos new_values e old_values contêm apenas as colunas não principais. |
mod_type |
STRING |
Descreve o tipo de alteração. Uma das seguintes opções: INSERT ,
UPDATE ou DELETE . |
number_of_records_in_transaction |
INT64 |
Indica o número de registos de alterações de dados que fazem parte desta transação em todas as partições do fluxo de alterações. |
number_of_partitions_in_transaction |
INT64 |
Indica o número de partições que devolvem registos de alterações de dados para esta transação. |
transaction_tag |
STRING |
Indica a etiqueta de transação associada a esta transação. |
is_system_transaction |
BOOL |
Indica se a transação é uma transação do sistema. |
PostgreSQL
Campo | Tipo | Descrição |
---|---|---|
commit_timestamp |
STRING |
Indica a data/hora em que a alteração foi confirmada. |
record_sequence |
STRING |
Indica o número de sequência do registo na transação.
Os números de sequência são únicos e aumentam monotonicamente (mas não são necessariamente contíguos) numa transação. Ordene os registos para o mesmo
server_transaction_id por record_sequence para
reconstruir a ordem das alterações na transação. |
server_transaction_id |
STRING |
Fornece uma string exclusiva a nível global que representa a transação na qual a alteração foi confirmada. O valor só deve ser usado no contexto do processamento de registos de fluxo de alterações e não está correlacionado com o ID da transação na API do Spanner |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica se este é o último registo de uma transação na partição atual. |
table_name |
STRING |
Indica o nome da tabela afetada pela alteração. |
value_capture_type |
STRING |
Descreve o tipo de captura de valor especificado na configuração do fluxo de alterações quando esta alteração foi capturada. O tipo de captura de valor pode ser um dos seguintes:
Por predefinição, é |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indica o nome da coluna, o tipo de coluna, se é uma chave primária e a posição da coluna, conforme definido no esquema (ordinal_position ). A primeira coluna de uma tabela no esquema teria uma posição ordinal de 1 . O tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura de tipo descrita na referência da API Spanner.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Descreve as alterações feitas, incluindo os valores da chave principal, os valores antigos e os novos valores das colunas alteradas ou monitorizadas. A disponibilidade e o conteúdo dos valores antigos e novos dependem
do value_capture_type configurado. Os campos new_values e old_values contêm apenas as colunas não principais.
|
mod_type |
STRING |
Descreve o tipo de alteração. Uma das seguintes opções: INSERT ,
UPDATE ou DELETE . |
number_of_records_in_transaction |
INT64 |
Indica o número de registos de alterações de dados que fazem parte desta transação em todas as partições do fluxo de alterações. |
number_of_partitions_in_transaction |
NUMBER |
Indica o número de partições que devolvem registos de alterações de dados para esta transação. |
transaction_tag |
STRING |
Indica a etiqueta de transação associada a esta transação. |
is_system_transaction |
BOOLEAN |
Indica se a transação é uma transação do sistema. |
Exemplo de registo de alteração de dados
Segue-se um par de registos de alterações de dados de exemplo. Descrevem uma única transação em que existe uma transferência entre duas contas. As duas contas estão em partições de streams de alterações separadas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
O registo de alteração de dados seguinte é um exemplo de um registo com o tipo de captura NEW_VALUES
. Tenha em atenção que apenas os novos valores são preenchidos.
Apenas a coluna LastUpdate
foi modificada, pelo que apenas essa coluna
foi devolvida.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
O registo de alteração de dados seguinte é um exemplo de um registo com o tipo de captura NEW_ROW
. Apenas a coluna LastUpdate
foi modificada, mas todas as colunas acompanhadas são devolvidas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
O registo de alteração de dados seguinte é um exemplo de um registo com o tipo de captura NEW_ROW_AND_OLD_VALUES
. Apenas a coluna LastUpdate
foi modificada, mas todas as colunas monitorizadas são devolvidas. Este tipo de captura de valor captura o novo valor e o valor antigo de LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Registos de pulsação
Quando é devolvido um registo de pulsação, indica que todas as alterações com
commit_timestamp
inferior ou igual ao
timestamp
do registo de pulsação foram devolvidas e os registos de dados futuros nesta
partição têm de ter carimbos de data/hora de confirmação superiores aos devolvidos pelo
registo de pulsação. Os registos de pulsação são devolvidos quando não existem alterações de dados escritas numa partição. Quando existem alterações de dados escritas na partição, pode usar data_change_record.commit_timestamp
em vez de heartbeat_record.timestamp
para indicar que o leitor está a fazer progressos na leitura da partição.
Pode usar registos de pulsação devolvidos em partições para sincronizar leitores em todas as partições. Assim que todos os leitores receberem um sinal de
atividade superior ou igual a uma determinada data/hora A
ou receberem dados ou
registos de partições secundárias superiores ou iguais à data/hora A
, os leitores sabem
que receberam todos os registos confirmados nessa data/hora ou antes A
e podem
começar a processar os registos em buffer, por exemplo, ordenando os registos
de várias partições por data/hora e agrupando-os por server_transaction_id
.
Um registo de sinal de batimento cardíaco contém apenas um campo:
GoogleSQL
Campo | Tipo | Descrição |
---|---|---|
timestamp |
TIMESTAMP |
Indica a data/hora do registo de batimentos cardíacos. |
PostgreSQL
Campo | Tipo | Descrição |
---|---|---|
timestamp |
STRING |
Indica a data/hora do registo de batimentos cardíacos. |
Exemplo de registo de heartbeat
Um exemplo de registo de sinal de pulsação, que comunica que todos os registos com datas/horas inferiores ou iguais à data/hora deste registo foram devolvidos:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Registos de partições secundárias
Os registos de partições secundárias devolvem informações sobre as partições secundárias: os respetivos tokens de partição, os tokens das respetivas partições principais e o start_timestamp
que representa a data/hora mais antiga para a qual as partições secundárias contêm registos de alterações. Os registos cujas datas/horas de confirmação sejam imediatamente anteriores à child_partitions_record.start_timestamp
são devolvidos na partição atual. Depois de devolver todos os registos de partição filho desta partição, esta consulta é devolvida com um estado de êxito, o que indica que todos os registos foram devolvidos para esta partição.
Os campos de um registo de partição secundária incluem o seguinte:
GoogleSQL
Campo | Tipo | Descrição |
---|---|---|
start_timestamp |
TIMESTAMP |
Indica que os registos de alterações de dados devolvidos das partições secundárias neste registo de partição secundária têm uma data/hora de confirmação superior ou igual a start_timestamp . Quando consulta uma partição secundária, a consulta deve especificar o token da partição secundária e um valor start_timestamp igual ou superior a child_partitions_token.start_timestamp . Todos os registos de partições secundárias devolvidos por uma partição têm o mesmo start_timestamp e a data/hora está sempre entre o start_timestamp e o end_timestamp especificados na consulta. |
record_sequence |
STRING |
Indica um número de sequência que aumenta monotonicamente e que pode ser usado para definir a ordem dos registos de partições secundárias quando existem vários registos de partições secundárias devolvidos com o mesmo start_timestamp numa determinada partição. O token de partição, start_timestamp
e record_sequence , identifica de forma exclusiva um registo de partição secundário.
|
child_partitions |
[ { "token" : "STRING", "parent_partition_tokens" : ["STRING"] } ] |
Devolve um conjunto de partições secundárias e as respetivas informações associadas. Isto inclui a string do token de partição usada para identificar a partição secundária nas consultas, bem como os tokens das respetivas partições principais. |
PostgreSQL
Campo | Tipo | Descrição |
---|---|---|
start_timestamp |
STRING |
Indica que os registos de alterações de dados devolvidos das partições secundárias neste registo de partição secundária têm uma data/hora de confirmação superior ou igual a start_timestamp . Quando consulta uma partição secundária, a consulta deve especificar o token da partição secundária e um valor start_timestamp igual ou superior a child_partitions_token.start_timestamp . Todos os registos de partições secundárias devolvidos por uma partição têm o mesmo start_timestamp e a data/hora está sempre entre o start_timestamp e o end_timestamp especificados na consulta.
|
record_sequence |
STRING |
Indica um número de sequência que aumenta monotonicamente e que pode ser usado para definir a ordem dos registos de partições secundárias quando existem vários registos de partições secundárias devolvidos com o mesmo start_timestamp numa determinada partição. O token de partição, start_timestamp
e record_sequence , identifica de forma exclusiva um registo de partição secundário.
|
child_partitions |
[ { "token": "STRING", "parent_partition_tokens": ["STRING"], }, [...] ] |
Devolve uma matriz de partições secundárias e as respetivas informações associadas. Isto inclui a string do token de partição usada para identificar a partição secundária nas consultas, bem como os tokens das respetivas partições principais. |
Exemplo de registo de partição secundária
Segue-se um exemplo de um registo de partição secundária:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Fluxo de trabalho de consulta de streams de alterações
Execute consultas de fluxo de alterações através da API
ExecuteStreamingSql
, com uma transação
só de leitura
de utilização única e um
limite de data/hora forte. A função de leitura de fluxo de alterações permite-lhe especificar o start_timestamp
e o end_timestamp
para o intervalo de tempo de interesse. Todos os registos de alterações
no período de retenção são acessíveis através do limite de tempo
de leitura forte.
Todas as outras
TransactionOptions
são inválidas para consultas de streams de alterações. Além disso, se TransactionOptions.read_only.return_read_timestamp
estiver definido como true
, é devolvido um valor especial de kint64max - 1
na mensagem Transaction
que descreve a transação, em vez de uma data/hora válida de leitura. Este valor especial deve ser rejeitado e não usado para consultas subsequentes.
Cada consulta de stream de alterações pode devolver qualquer número de linhas, cada uma contendo um registo de alteração de dados, um registo de batimento cardíaco ou um registo de partições secundárias. Não é necessário definir um prazo para o pedido.
Exemplo de fluxo de trabalho de consulta de stream de alterações
O fluxo de trabalho de consulta de streaming começa com a emissão da primeira consulta de stream de alterações, especificando o partition_token
para NULL
. A consulta tem de especificar a função de leitura para a stream de alterações, a data/hora de início e de fim de interesse e o intervalo de sinal de pulsação. Quando end_timestamp
é NULL
, a consulta continua a devolver alterações de dados até a partição terminar.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Processar registos de dados desta consulta até que todos os registos de partições secundárias sejam devolvidos. No exemplo seguinte, são devolvidos dois registos de partição secundária e três tokens de partição. Em seguida, a consulta termina. Os registos de partição secundária de uma consulta específica partilham sempre o mesmo start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012390",
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Para processar alterações após 2022-05-01T09:00:01Z
, crie três novas consultas e execute-as em paralelo. Usadas em conjunto, as três consultas devolvem alterações de dados para o mesmo intervalo de chaves que o respetivo elemento principal abrange. Defina sempre o start_timestamp
como o
start_timestamp
no mesmo registo de partição secundária e use o mesmo
end_timestamp
e intervalo de sinal de pulsação para processar os registos de forma consistente
em todas as consultas.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
A consulta em child_token_2
termina após devolver outro registo de partição secundária. Este registo indica que uma nova partição está a abranger alterações para child_token_2
e child_token_3
a partir de 2022-05-01T09:30:15Z
. O mesmo registo é devolvido pela consulta em child_token_3
, porque ambos são as partições principais da nova child_token_4
. Para garantir um processamento estritamente ordenado dos registos de dados para uma chave específica, a consulta em child_token_4
tem de começar depois de todos os pais terem terminado. Neste caso, os elementos principais são child_token_2
e child_token_3
. Crie apenas uma consulta para cada símbolo de partição filho. A conceção do fluxo de trabalho de consulta deve atribuir um pai para aguardar e agendar a consulta em child_token_4
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": ["child_token_2", "child_token_3"],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Encontre exemplos de processamento e análise de registos de streams de alterações no conector SpannerIO Dataflow no GitHub.