Esta página descreve as streams de mudanças no Spanner para bancos de dados com dialetos do GoogleSQL e do PostgreSQL, incluindo:
- Modelo de particionamento baseado em divisão
- O formato e o conteúdo dos registros do fluxo de alterações
- A sintaxe de baixo nível usada para consultar esses registros
- Exemplo do fluxo de trabalho da consulta
Use a API Spanner para consultar fluxos de alterações diretamente. Os aplicativos que usam o Dataflow para ler dados do fluxo de alterações não precisam trabalhar diretamente com o modelo de dados descrito aqui.
Para um guia introdutório mais amplo sobre fluxos de alterações, consulte Visão geral dos fluxos de alterações.
Mudar as partições do fluxo
Quando uma mudança ocorre em uma tabela monitorada por um fluxo de alterações, o Spanner grava um registro de fluxo de alterações correspondente no banco de dados, de forma síncrona na mesma transação que a mudança de dados. Isso significa que, se a transação for bem-sucedida, o Spanner também capturou e persistiu a mudança. Internamente, o Spanner coloca o registro do fluxo de alterações e a mudança de dados juntos para que sejam processados pelo mesmo servidor e minimizem a sobrecarga de gravação.
Como parte da DML para uma divisão específica, o Spanner anexa a gravação à divisão de dados de fluxo de alterações correspondente na mesma transação. Devido a essa colocalização, os fluxos de mudanças não adicionam coordenação extra aos recursos de veiculação, o que minimiza a sobrecarga de confirmação de transação.
O Spanner é dimensionado dividindo e mesclando dados dinamicamente com base na carga e no tamanho do banco de dados e distribuindo divisões entre os recursos de serviço.
Para permitir que as gravações e leituras de fluxos de alterações sejam dimensionadas, o Spanner divide e mescla o armazenamento interno do fluxo de alterações com os dados do banco de dados, evitando automaticamente os pontos de acesso. Para oferecer suporte à leitura de registros de fluxo de alterações quase em tempo real à medida que as gravações do banco de dados são dimensionadas, a API Spanner foi projetada para que um fluxo de alterações seja consultado simultaneamente usando partições de fluxo de alterações. O mapa de partições de fluxo de alterações é usado para mudar as divisões de dados de fluxo de alterações que contêm os registros de fluxo de alterações. As partições de um fluxo de alterações mudam dinamicamente ao longo do tempo e estão relacionadas a como o Spanner divide e mescla dinamicamente os dados do banco de dados.
Uma partição de fluxo de alterações contém registros de um intervalo de chaves imutáveis para um intervalo de tempo específico. Qualquer partição do fluxo de alterações pode ser dividida em uma ou mais partições do fluxo de alterações ou ser mesclada a outras partições do fluxo de alterações. Quando esses eventos de divisão ou mesclagem acontecem, partições filhas são criadas para capturar as mudanças dos respectivos intervalos de chaves imutáveis para o próximo intervalo de tempo. Além dos registros de mudança de dados, uma consulta de fluxo de alterações retorna registros de partição filha para notificar os leitores sobre novas partições de fluxo de alterações que precisam ser consultadas, além de registros de batimento cardíaco para indicar o progresso quando nenhuma gravação ocorreu recentemente.
Ao consultar uma partição específica do fluxo de mudanças, os registros de mudança são retornados na ordem do carimbo de data/hora de confirmação. Cada registro de alteração é retornado exatamente uma vez. Não há garantia de ordenação de registros de alteração nas partições do fluxo de alterações. Os registros de mudança de uma chave primária específica são retornados apenas em uma partição para um determinado período.
Devido à linhagem de partição pai-filho, para processar mudanças de uma chave específica na ordem do carimbo de data/hora de confirmação, os registros retornados de partições filhas só podem ser processados depois que os registros de todas as partições mãe tiverem sido processados.
Mudanças nas funções de leitura de fluxo de alterações e na sintaxe de consulta
Para consultar fluxos de mudanças, use a
API
ExecuteStreamingSql
. O Spanner cria automaticamente uma função de leitura especial
com o fluxo de alterações. A função de leitura fornece acesso aos registros do fluxo de alterações. A convenção de nomenclatura da função de leitura é
READ_change_stream_name
.
Supondo que um fluxo de alterações SingersNameStream
exista no banco de dados, a
sintaxe de consulta do GoogleSQL é a seguinte:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
A função de leitura aceita os seguintes argumentos:
Nome do argumento | Tipo | Obrigatório? | Descrição |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obrigatório | Especifica que os registros com commit_timestamp maior
ou igual a start_timestamp precisam ser retornados. O valor precisa estar dentro do período de retenção do fluxo de alterações e ser menor ou igual ao horário atual e maior ou igual ao carimbo de data/hora da criação do fluxo de alterações. |
end_timestamp |
TIMESTAMP |
Opcional (padrão: NULL ) |
Especifica que os registros com um commit_timestamp menor
ou igual a end_timestamp precisam ser retornados. O valor precisa estar dentro do período de retenção da transmissão de mudanças e ser maior ou igual a start_timestamp . A
consulta é concluída depois de retornar todos os ChangeRecords
até o end_timestamp ou quando você encerra a
conexão. Se end_timestamp for definido como NULL
ou não for especificado, a consulta continuará a execução até que todas
ChangeRecords sejam retornadas ou até que você encerre a
conexão. |
partition_token |
STRING |
Opcional (padrão: NULL ) |
Especifica qual partição do fluxo de alterações será consultada com base no
conteúdo dos registros de partições
filhas. Se NULL ou não for especificado, significa que o
leitor está consultando o fluxo de mudanças pela primeira vez e não
recebeu nenhum token de partição específico para consultar. |
heartbeat_milliseconds |
INT64 |
Obrigatório | Determina com que frequência um ChangeRecord de batimento cardíaco é
retornado caso não haja transações confirmadas nessa
partição.
O valor precisa estar entre 1,000 (um segundo) e
300,000 (cinco minutos). |
read_options |
ARRAY |
Opcional (padrão: NULL ) |
Adiciona opções de leitura reservadas para uso futuro. O único
valor permitido é NULL . |
Recomendamos criar um método auxiliar para criar o texto da consulta da função de leitura e vincular parâmetros a ele, conforme mostrado no exemplo a seguir.
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
Para consultar fluxos de mudanças, use a
API
ExecuteStreamingSql
. O Spanner cria automaticamente uma função de leitura especial
com o fluxo de alterações. A função de leitura fornece acesso aos registros do fluxo de alterações. A convenção de nomenclatura da função de leitura é
spanner.read_json_change_stream_name
.
Supondo que um fluxo de alterações SingersNameStream
exista no banco de dados, a sintaxe de consulta do PostgreSQL é a seguinte:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
A função de leitura aceita os seguintes argumentos:
Nome do argumento | Tipo | Obrigatório? | Descrição |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obrigatório | Especifica que os registros de mudança com commit_timestamp
maior ou igual a start_timestamp
precisam ser retornados. O valor precisa estar dentro do período de retenção da transmissão de mudanças e ser menor ou igual ao horário atual e maior ou igual ao carimbo de data/hora da criação da transmissão de mudanças. |
end_timestamp |
timestamp with timezone |
Opcional (padrão: NULL ) |
Especifica que os registros de mudança com commit_timestamp
menor ou igual a end_timestamp precisam
ser retornados. O valor precisa estar dentro do período de retenção da mudança de fluxo e ser maior ou igual a start_timestamp .
A consulta é concluída depois de retornar todos os registros de mudança até
o end_timestamp ou até que você encerre a conexão.
Se for NULL , a consulta continuará a execução até que todos os registros de
mudança sejam retornados ou até que você encerre a conexão. |
partition_token |
text |
Opcional (padrão: NULL ) |
Especifica qual partição do fluxo de alterações será consultada com base no
conteúdo dos registros de partições
filhas. Se NULL ou não for especificado, significa que o
leitor está consultando o fluxo de mudanças pela primeira vez e não
recebeu nenhum token de partição específico para consultar. |
heartbeat_milliseconds |
bigint |
Obrigatório | Determina com que frequência um ChangeRecord de batimento cardíaco é
retornado quando não há transações confirmadas nesta
partição.
O valor precisa estar entre 1,000 (um segundo) e
300,000 (cinco minutos). |
null |
null |
Obrigatório | Reservado para uso futuro |
Recomendamos criar um método auxiliar para criar o texto da função de leitura e vincular parâmetros a ele, conforme mostrado no exemplo a seguir.
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Mudar o formato de registro do fluxo de alterações
A função de leitura de fluxos de mudanças retorna uma única coluna ChangeRecord
do tipo ARRAY<STRUCT<...>>
. Em cada linha, essa matriz sempre contém um único
elemento.
Os elementos da matriz têm o seguinte tipo:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Há três campos neste STRUCT
: data_change_record
,
heartbeat_record
e child_partitions_record
, cada um do tipo
ARRAY<STRUCT<...>>
. Em qualquer linha retornada pela função de leitura do fluxo de alterações, apenas um desses três campos contém um valor. Os outros dois
estão vazios ou são NULL
. Esses campos de matriz contêm, no máximo, um elemento.
As seções a seguir examinam cada um desses três tipos de registro.
A função de leitura de fluxos de mudanças retorna uma única coluna ChangeRecord
do
tipo JSON
com a seguinte estrutura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Há três chaves possíveis neste objeto: data_change_record
,
heartbeat_record
e child_partitions_record
. O tipo de valor correspondente
é JSON
. Em qualquer linha retornada pela função de leitura do fluxo de alterações,
apenas uma dessas três chaves existe.
As seções a seguir examinam cada um desses três tipos de registro.
Registros de alteração de dados
Um registro de alteração de dados contém um conjunto de alterações em uma tabela com o mesmo tipo de modificação (inserir, atualizar ou excluir) confirmada no mesmo carimbo de data/hora em uma partição de fluxo de alterações para a mesma transação. Vários registros de alteração de dados podem ser retornados para a mesma transação em várias partições de fluxo de alterações.
Todos os registros de alteração de dados têm campos commit_timestamp
, server_transaction_id
e record_sequence
, que, juntos, determinam a ordem no fluxo de alterações de um registro de fluxo. Esses três campos são suficientes para derivar
a ordem das mudanças e fornecer consistência externa.
Várias transações podem ter o mesmo carimbo de data/hora de confirmação se
elas tocarem dados não sobrepostos. O campo server_transaction_id
oferece a capacidade de distinguir quais conjuntos de mudanças (potencialmente
em partições de fluxo de mudanças) foram emitidos na mesma
transação. O pareamento com os campos record_sequence
e
number_of_records_in_transaction
também permite armazenar em buffer e ordenar
todos os registros de uma transação específica.
Os campos de um registro de alteração de dados incluem:
Campo | Tipo | Descrição |
---|---|---|
commit_timestamp |
TIMESTAMP |
Indica o carimbo de data/hora em que a alteração foi confirmada. |
record_sequence |
STRING |
Indica o número de sequência do registro na transação.
Os números de sequência são exclusivos e aumentam de maneira uniforme (mas não
necessariamente contíguos) em uma transação. Ordene os registros do mesmo
server_transaction_id por record_sequence para
reconstruir a ordem das mudanças na transação.
O Spanner pode otimizar essa ordenação para melhorar a performance,
e ela nem sempre corresponde à ordem original que você fornece. |
server_transaction_id |
STRING |
Fornece uma string globalmente exclusiva que representa a transação em que a alteração foi executada. O valor só pode ser usado no contexto de processamento de registros de fluxo de alterações e não está correlacionado com o ID da transação na API do Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica se este é o último registro de uma transação na partição atual. |
table_name |
STRING |
Nome da tabela afetada pela mudança. |
value_capture_type |
STRING |
Descreve o tipo de captura de valor especificado na configuração do fluxo de mudanças quando a mudança foi capturada. O tipo de captura de valor pode ser um dos seguintes:
Por padrão, ele é |
column_types |
[ { "name": " |
Indica o nome da coluna, o tipo de coluna, se ela é uma chave primária e a posição da coluna, conforme definido no esquema (ordinal_position ). A primeira coluna de uma tabela no esquema teria uma posição ordinal de 1 . O
tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura
de tipo descrita na referência da API Spanner.
|
mods |
[ { "keys": {" |
Descreve as mudanças feitas, incluindo os valores da chave primária, os valores antigos e os novos valores das colunas alteradas ou rastreadas.
A disponibilidade e o conteúdo dos valores antigos e novos dependem do
value_capture_type configurado. Os campos new_values e
old_values contêm apenas as colunas sem chave. |
mod_type |
STRING |
Descreve o tipo de mudança. Um de INSERT ,
UPDATE ou DELETE . |
number_of_records_in_transaction |
INT64 |
Indica o número de registros de alteração de dados que fazem parte dessa transação em todas as partições de fluxo de alterações. |
number_of_partitions_in_transaction |
INT64 |
Indica o número de partições que retornam registros de alteração de dados para essa transação. |
transaction_tag |
STRING |
Indica a tag da transação associada a essa transação. |
is_system_transaction |
BOOL |
Indica se a transação é do sistema. |
Campo | Tipo | Descrição |
---|---|---|
commit_timestamp |
STRING |
Indica o carimbo de data/hora em que a alteração foi confirmada. |
record_sequence |
STRING |
Indica o número de sequência do registro na transação.
Os números de sequência são exclusivos e aumentam de maneira uniforme (mas não
necessariamente contíguos) em uma transação. Ordene os registros do mesmo
server_transaction_id por record_sequence para
reconstruir a ordem das mudanças na transação. |
server_transaction_id |
STRING |
Fornece uma string globalmente exclusiva que representa a transação em que a alteração foi executada. O valor só pode ser usado no contexto de processamento de registros de fluxo de alterações e não está correlacionado com o ID da transação na API do Spanner. |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica se este é o último registro de uma transação na partição atual. |
table_name |
STRING |
Indica o nome da tabela afetada pela mudança. |
value_capture_type |
STRING |
Descreve o tipo de captura de valor especificado na configuração do fluxo de mudanças quando a mudança foi capturada. O tipo de captura de valor pode ser um dos seguintes:
Por padrão, ele é |
column_types |
[ { "name": " |
Indica o nome da coluna, o tipo de coluna, se ela é uma chave primária e a posição da coluna, conforme definido no esquema (ordinal_position ). A primeira coluna de uma tabela no esquema teria uma posição ordinal de 1 . O
tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura
de tipo descrita na
referência da API Spanner.
|
mods |
[ { "keys": {" |
Descreve as mudanças feitas, incluindo os valores da chave primária, os valores antigos e os novos valores das colunas alteradas ou rastreadas. A disponibilidade e o conteúdo dos valores antigos e novos dependem
do value_capture_type configurado. Os
campos new_values e old_values contêm apenas as
colunas não-chave.
|
mod_type |
STRING |
Descreve o tipo de mudança. Um de INSERT ,
UPDATE ou DELETE . |
number_of_records_in_transaction |
INT64 |
Indica o número de registros de alteração de dados que fazem parte dessa transação em todas as partições de fluxo de alterações. |
number_of_partitions_in_transaction |
NUMBER |
Indica o número de partições que retornam registros de alteração de dados para essa transação. |
transaction_tag |
STRING |
Indica a tag da transação associada a essa transação. |
is_system_transaction |
BOOLEAN |
Indica se a transação é do sistema. |
Exemplo de registro de alteração de dados
Confira abaixo um par de exemplos de registros de alteração de dados. Eles descrevem uma única transação em que há uma transferência entre duas contas. As duas contas estão em partições de fluxo de mudanças separadas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
O registro de mudança de dados a seguir é um exemplo de um registro com o tipo de captura de valor NEW_VALUES
. Somente os novos valores são preenchidos.
Apenas a coluna LastUpdate
foi modificada, portanto, apenas essa coluna
foi retornada.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
O registro de mudança de dados a seguir é um exemplo de um registro com o tipo de captura de valor NEW_ROW
. Apenas a coluna LastUpdate
foi modificada, mas todas as colunas rastreadas são retornadas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
O registro de mudança de dados a seguir é um exemplo de um registro com o tipo de captura de valor NEW_ROW_AND_OLD_VALUES
. Apenas a coluna LastUpdate
foi
modificada, mas todas as colunas rastreadas são retornadas. Esse tipo de captura de valor captura
o novo valor e o valor antigo de LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Registros de batimentos
Quando um registro de batimento cardíaco é retornado, ele indica que todas as mudanças com
commit_timestamp
menor ou igual ao timestamp
do registro de batimento cardíaco
foram retornadas, e os registros de dados futuros nessa
partição precisam ter carimbos de data/hora de confirmação mais altos do que o retornado pelo
registro de batimento cardíaco. Os registros de tiques são retornados quando não há mudanças de
dados gravadas em uma partição. Quando há mudanças de dados gravadas na
partição, data_change_record.commit_timestamp
pode ser usado em vez de
heartbeat_record.timestamp
para informar que o leitor está avançando
na leitura da partição.
É possível usar registros de batimento cardíaco retornados em partições para sincronizar
leitores em todas as partições. Quando todos os leitores recebem um
batimento de coração maior ou igual a um carimbo de data/hora A
ou recebem dados ou
registros de partição filho maiores ou iguais ao carimbo de data/hora A
, os leitores sabem
que receberam todos os registros confirmados até ou no carimbo de data/hora A
e podem
começar a processar os registros armazenados em buffer, por exemplo, classificando os registros de
partição cruzada por carimbo de data/hora e agrupando-os por server_transaction_id
.
Um registro de batimento cardíaco contém apenas um campo:
Campo | Tipo | Descrição |
---|---|---|
timestamp |
TIMESTAMP |
Indica o carimbo de data/hora do registro de Heartbeat. |
Campo | Tipo | Descrição |
---|---|---|
timestamp |
STRING |
Indica o carimbo de data/hora do registro de Heartbeat. |
Exemplo de registro de batimento cardíaco
Um exemplo de registro de batimento cardíaco, comunicando que todos os registros com carimbos de data/hora menores ou iguais ao carimbo de data/hora deste registro foram retornados:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Registros de partição filha
Os registros de partição filha retornam informações sobre as partições filhas: os
tokens de partição, os tokens das partições mãe e o
start_timestamp
que representa o carimbo de data/hora mais antigo para o qual as partições
filhas contêm registros de mudança. Os registros cujos carimbos de data/hora de confirmação
são imediatamente anteriores ao child_partitions_record.start_timestamp
são
retornados na partição atual. Depois de retornar todos os
registros de partição filho para essa partição, a consulta retorna com
um status de sucesso, indicando que todos os registros foram retornados para essa
partição.
Os campos de um registro de partição filho incluem o seguinte:
Campo | Tipo | Descrição |
---|---|---|
start_timestamp |
TIMESTAMP |
Indica que os registros de alteração de dados retornados de partições
filhas neste registro de partição filha têm um carimbo de data/hora de confirmação
maior ou igual a start_timestamp . Ao consultar uma partição
filho, a consulta precisa especificar o token da partição filho e um
start_timestamp maior ou igual a
child_partitions_token.start_timestamp . Todos os registros de partições
filhas retornados por uma partição têm o mesmo start_timestamp
e o carimbo de data/hora sempre fica entre o start_timestamp e o end_timestamp
especificados da consulta. |
record_sequence |
STRING |
Indica um número de sequência crescente monotônico que pode ser usado para
definir a ordem dos registros de partição filha quando há vários
registros de partição filha retornados com o mesmo start_timestamp
em uma partição específica. O token de partição, start_timestamp
e record_sequence identificam exclusivamente um registro de partição filho.
|
child_partitions |
[ { "token" : " |
Retorna um conjunto de partições filhas e as informações associadas a elas. Isso inclui a string de token de partição usada para identificar a partição filha em consultas, bem como os tokens das partições mãe. |
Campo | Tipo | Descrição |
---|---|---|
start_timestamp |
STRING |
Indica que os registros de alteração de dados retornados de partições
filhas neste registro de partição filha têm um carimbo de data/hora de confirmação
maior ou igual a start_timestamp . Ao consultar uma partição
filho, a consulta precisa especificar o token da partição filho e um
start_timestamp maior ou igual a
child_partitions_token.start_timestamp . Todos os registros de partições
filhas retornados por uma partição têm o mesmo start_timestamp
e o carimbo de data/hora sempre fica entre o start_timestamp e o end_timestamp
especificados da consulta.
|
record_sequence |
STRING |
Indica um número de sequência crescente monotônico que pode ser usado para
definir a ordem dos registros de partição filha quando há vários
registros de partição filha retornados com o mesmo start_timestamp
em uma partição específica. O token de partição, start_timestamp
e record_sequence identificam exclusivamente um registro de partição filho.
|
child_partitions |
[ { "token": " |
Retorna uma matriz de partições filhas e as informações associadas a elas. Isso inclui a string de token de partição usada para identificar a partição filha em consultas, bem como os tokens das partições mãe. |
Exemplo de registro de partição secundária
Confira a seguir um exemplo de registro de partição filha:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Fluxo de trabalho de consulta de fluxos de alterações
Execute consultas de fluxo de alterações usando a
API ExecuteStreamingSql
, com uma
transação somente leitura
de uso único e uma
limitação de carimbo de data/hora forte. A função de leitura
do fluxo de mudança permite especificar start_timestamp
e
end_timestamp
para o período de tempo de interesse. Todos os registros de mudança
no período de retenção são acessíveis usando a forte restrição de carimbo de data/hora
somente leitura.
Todos os outros
TransactionOptions
são inválidos para consultas de fluxo de mudanças. Além disso,
se TransactionOptions.read_only.return_read_timestamp
for definido como true
,
um valor especial de kint64max - 1
será retornado na mensagem Transaction
que descreve a transação, em vez de um carimbo de data/hora de leitura
válido. Esse valor especial precisa ser descartado e não usado em consultas
posteriores.
Cada consulta de fluxo de alterações pode retornar qualquer número de linhas, cada uma contendo um registro de alteração de dados, de batimento cardíaco ou de partições filhas. Não é necessário definir um prazo para a solicitação.
Exemplo de fluxo de trabalho de consulta de fluxo de alterações
O fluxo de trabalho de consulta de streaming começa emitindo a primeira consulta de fluxo de mudanças
especificando partition_token
para NULL
. A consulta precisa especificar
a função de leitura do fluxo de alterações, o carimbo de data/hora de início e de término de interesse
e o intervalo de batimentos. Quando o end_timestamp
é NULL
, a consulta continua
retornando mudanças de dados até o fim da partição.
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Processe os registros de dados dessa consulta até que todos os registros de partição filhos sejam
retornados. No exemplo abaixo, dois registros de partição filho e três
tokens de partição são retornados, e a consulta é encerrada. Os registros de partição
filha de uma consulta específica sempre compartilham o mesmo start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Para processar as alterações após 2022-05-01T09:00:01Z
, crie três novas consultas e
execute-as em paralelo. Usadas em conjunto, as três consultas retornam mudanças de dados para
o mesmo intervalo de chaves que as principais. Sempre defina o start_timestamp
como o
start_timestamp
no mesmo registro de partição filho e use o mesmo
end_timestamp
e o intervalo de batimentos para processar os registros de forma consistente
em todas as consultas.
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
A consulta em child_token_2
é concluída depois de retornar outro registro de partição
filha. Esse registro indica que uma nova partição está cobrindo as mudanças de
child_token_2
e child_token_3
, começando em 2022-05-01T09:30:15Z
. O
mesmo registro é retornado pela consulta em child_token_3
, porque ambos são
as partições mãe do novo child_token_4
. Para garantir um processamento ordenado
de registros de dados para uma chave específica, a consulta em child_token_4
precisa começar depois que todos os pais terminarem. Nesse caso, os pais são
child_token_2
e child_token_3
. Crie apenas uma consulta para cada token de partição
filha. O design do fluxo de trabalho da consulta precisa designar um pai para aguardar e
programar a consulta em child_token_4
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Encontre exemplos de como processar e analisar registros de fluxo de alterações no conector do Dataflow SpannerIO do Apache Beam no GitHub.