Nesta página, descrevemos os seguintes atributos dos fluxo de alterações em detalhes:
- O modelo de particionamento baseado em divisão
- O formato e o conteúdo dos registros de fluxo de alterações
- a sintaxe de baixo nível usada para consultar esses registros
- Um exemplo do fluxo de trabalho de consulta
As informações nesta página são mais relevantes para usar a API Spanner para consultar fluxo de alterações diretamente. Os aplicativos que usam o Dataflow para ler dados de fluxo de alterações não precisam trabalhar diretamente com o modelo de dados descrito aqui.
Para conferir um guia introdutório mais amplo sobre os fluxo de alterações, consulte Visão geral dos fluxos de mudanças.
Alterar partições do fluxo
Quando ocorre uma alteração em uma tabela observada por um fluxo de alterações, o Spanner grava um registro de fluxo de alterações correspondente no banco de dados, de maneira síncrona na mesma transação que a alteração de dados. Isso garante que, se a transação for bem-sucedida, o Spanner também terá capturado e mantido a alteração. Internamente, o Spanner coloca em conjunto o registro do fluxo de alterações e a alteração de dados para que eles sejam processados pelo mesmo servidor a fim de minimizar a sobrecarga de gravação.
Como parte do DML a uma divisão específica, o Spanner anexa a gravação à divisão de dados do fluxo de alterações correspondente na mesma transação. Por causa dessa colocation, os streams de alteração não adicionam coordenação extra entre os recursos de disponibilização, o que minimiza a sobrecarga de confirmação da transação.
O Spanner escalona ao dividir e mesclar dinamicamente os dados com base na carga e no tamanho do banco de dados e distribuir divisões nos recursos de disponibilização.
Para ativar as leituras e gravações dos fluxos de alterações para escalonar, o Spanner divide e mescla o armazenamento interno do fluxo de alterações com os dados do banco de dados, evitando automaticamente os pontos de acesso. Para aceitar a leitura de registros de fluxo de alterações quase em tempo real à medida que as gravações do banco de dados são escalonadas, a API Spanner foi projetada para que um fluxo de alterações seja consultado simultaneamente usando partições de fluxo de alterações. Mapeamento das partições do fluxo de alterações para alterar as divisões de dados do fluxo que contêm os registros dele. As partições de um fluxo de alterações mudam dinamicamente ao longo do tempo e estão correlacionadas à forma como o Spanner divide e mescla dinamicamente os dados do banco de dados.
Uma partição de fluxo de alterações contém registros para um intervalo de chaves imutável para um período específico. Qualquer partição de fluxo de alterações pode ser dividida em uma ou mais partições de fluxo de alterações ou mesclada com outras partições de fluxo de alterações. Quando esses eventos de divisão ou mesclagem acontecem, as partições filhas são criadas para capturar as alterações dos respectivos intervalos de chaves imutáveis para o próximo intervalo de tempo. Além dos registros de alteração de dados, uma consulta de fluxo de alterações retorna registros de partição filhas para notificar os leitores sobre novas partições do fluxo de alterações que precisam ser consultadas, bem como registros de sinal de funcionamento para indicar o progresso quando nenhuma gravação ocorreu recentemente.
Ao consultar uma partição de fluxo de alterações específica, os registros de alteração são retornados na ordem do carimbo de data/hora de confirmação. Cada registro de alteração é retornado exatamente uma vez. Nas partições do fluxo de alterações, não há garantia de ordem dos registros de alteração. Os registros de alteração de uma chave primária específica são retornados somente em uma partição para um determinado intervalo de tempo.
Devido à linhagem da partição pai-filho, para processar alterações de uma chave específica na ordem do carimbo de data/hora de confirmação, os registros retornados de partições filhas devem ser processados somente depois que os registros de todas as partições pai tiverem sido processados.
Funções de leitura e sintaxe de consulta do fluxo de alterações
GoogleSQL
Para consultar os fluxo de alterações, use a
API
ExecuteStreamingSql
. O Spanner cria automaticamente uma função de leitura especial junto com o fluxo de alterações. A função de leitura fornece acesso aos registros do fluxo de alterações. A convenção de nomenclatura da função de leitura é
READ_change_stream_name
.
Supondo que haja um fluxo de alterações SingersNameStream
no banco de dados, a
sintaxe da consulta do GoogleSQL será a seguinte:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
A função de leitura aceita os seguintes argumentos:
Nome do argumento | Tipo | Obrigatório? | Descrição |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obrigatório | Especifica que os registros com commit_timestamp maior ou igual a start_timestamp
precisam ser retornados. O valor precisa estar dentro do período de armazenamento do fluxo de alterações e ser menor ou igual ao horário atual e maior ou igual ao carimbo de data/hora da criação do fluxo de alterações. |
end_timestamp |
TIMESTAMP |
Opcional (padrão: NULL ) |
Especifica que os registros com commit_timestamp menor
ou igual a end_timestamp precisam
ser retornados. O valor precisa estar dentro do período de armazenamento do fluxo de alterações e ser maior ou igual a start_timestamp . A consulta é concluída após retornar todos os ChangeRecords até end_timestamp ou quando o usuário encerra a conexão. Se NULL ou não for especificado, a consulta será executada até que todos os ChangeRecords sejam retornados ou o usuário encerre a conexão. |
partition_token |
STRING |
Opcional (padrão: NULL ) |
Especifica qual partição do fluxo de alterações consultar, com base no
conteúdo dos registros de partições
filhas. Se NULL ou não for especificado, isso significa que o
leitor está consultando o fluxo de alterações pela primeira vez e não
recebeu nenhum token de partição específico para consultar. |
heartbeat_milliseconds |
INT64 |
Obrigatório | Determina com que frequência um ChangeRecord de sinal de funcionamento é retornado
caso não haja transações confirmadas nessa partição.
O valor precisa estar entre 1,000 (um segundo) e 30,0000 (cinco
minutos). |
read_options |
ARRAY |
Opcional (padrão: NULL ) |
Outras opções de leitura reservadas para uso futuro. Atualmente, o único valor permitido é NULL . |
Recomendamos criar um método conveniente para criar o texto da consulta da função de leitura e vincular parâmetros a ela, conforme mostrado no exemplo a seguir.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Para consultar os fluxo de alterações, use a
API ExecuteStreamingSql
.
O Spanner cria automaticamente uma função de leitura especial junto com o fluxo de alterações. A função de leitura fornece acesso aos registros do fluxo de alterações. A convenção de nomenclatura da função de leitura é
spanner.read_json_change_stream_name
.
Supondo que haja um fluxo de alterações SingersNameStream
no banco de dados, a
sintaxe da consulta do PostgreSQL será a seguinte:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
A função de leitura aceita os seguintes argumentos:
Nome do argumento | Tipo | Obrigatório? | Descrição |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obrigatório | Especifica que os registros de mudança com commit_timestamp maior ou igual a start_timestamp
devem ser retornados. O valor precisa estar dentro do período de armazenamento do fluxo de alterações e ser menor ou igual ao horário atual e maior ou igual ao carimbo de data/hora da criação do fluxo de alterações. |
end_timestamp |
timestamp with timezone |
Opcional (padrão: NULL ) |
Especifica que os registros de mudança com commit_timestamp
menor ou igual a end_timestamp precisam
ser retornados. O valor precisa estar dentro do período de armazenamento do fluxo de alterações e ser maior ou igual a start_timestamp .
A consulta é concluída depois de retornar todos os registros de alteração até
end_timestamp ou o usuário encerra a conexão.
Se NULL , a consulta será executada até que todos os registros de alteração sejam retornados ou o usuário encerre a conexão. |
partition_token |
text |
Opcional (padrão: NULL ) |
Especifica qual partição do fluxo de alterações consultar, com base no
conteúdo dos registros de partições
filhas. Se NULL ou não for especificado, isso significa que o
leitor está consultando o fluxo de alterações pela primeira vez e não
recebeu nenhum token de partição específico para consultar. |
heartbeat_milliseconds |
bigint |
Obrigatório | Determina com que frequência um ChangeRecord de sinal de funcionamento será retornado
caso não haja transações confirmadas nessa partição.
O valor precisa estar entre 1,000 (um segundo) e 300,000 (cinco
minutos). |
null |
null |
Obrigatório | Reservado para uso futuro |
Recomendamos criar um método prático para criar o texto da função de leitura e vincular parâmetros a ela, conforme mostrado no exemplo a seguir.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Mudar formato de registro dos streams
GoogleSQL
A função de leitura de fluxo de alterações retorna uma única coluna ChangeRecord
do tipo
ARRAY<STRUCT<...>>
. Em cada linha, a matriz sempre contém um único elemento.
Os elementos da matriz têm o seguinte tipo:
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
Há três campos nesse struct: data_change_record
, heartbeat_record
e child_partitions_record
, cada um do tipo ARRAY<STRUCT<...>>
. Em qualquer linha que a função de leitura do fluxo de alterações
retorne, apenas um desses três campos contém um valor. Os outros dois
estão vazios ou NULL
. Esses campos de matriz contêm no máximo um elemento.
As seções a seguir examinam cada um desses três tipos de registro.
PostgreSQL
A função de leitura de fluxo de alterações retorna uma única coluna ChangeRecord
do
tipo JSON
com a seguinte estrutura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Há três chaves possíveis nesse objeto: data_change_record
,
heartbeat_record
e child_partitions_record
. O tipo de valor
correspondente é JSON
.
Em qualquer linha retornada pela função de leitura do fluxo de alterações, só
uma dessas três chaves existe.
As seções a seguir examinam cada um desses três tipos de registro.
Registros de alteração de dados
Um registro de alteração de dados contém um conjunto de alterações em uma tabela com o mesmo tipo de modificação (inserir, atualizar ou excluir) confirmado no mesmo carimbo de data/hora de confirmação em uma partição de fluxo de alterações para a mesma transação. Vários registros de alteração de dados podem ser retornados para a mesma transação em várias partições de fluxo de alterações.
Todos os registros de alteração de dados têm campos commit_timestamp
, server_transaction_id
e record_sequence
, que juntos determinam a ordem no fluxo de alterações de um registro de fluxo. Esses três campos são suficientes para derivar
a ordem das alterações e fornecer consistência externa.
Várias transações podem ter o mesmo carimbo de data/hora de confirmação se
tocarem em dados não sobrepostos. O campo server_transaction_id
oferece a capacidade de distinguir qual conjunto de mudanças (possivelmente
em partições de fluxo de alterações) foi emitido na mesma
transação. Pareá-lo com os campos record_sequence
e
number_of_records_in_transaction
permite armazenar em buffer e ordenar
todos os registros de uma transação específica.
Os campos de um registro de alteração de dados incluem:
GoogleSQL
Campo | Tipo | Descrição |
---|---|---|
commit_timestamp |
TIMESTAMP |
O carimbo de data/hora em que a alteração foi confirmada. |
record_sequence |
STRING |
O número de sequência do registro dentro da transação. Os números de sequência são únicos e aumentam monotonicamente em uma transação, mas não necessariamente contíguos. Classifique os registros do mesmo
server_transaction_id por record_sequence para
reconstruir a ordem das mudanças dentro da transação. |
server_transaction_id |
STRING |
Uma string globalmente exclusiva que representa a transação em que a alteração foi confirmada. O valor só deve ser usado no contexto do processamento de registros de fluxo de alterações e não está correlacionado com o código da transação na API do Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica se este é o último registro de uma transação na partição atual. |
table_name |
STRING |
Nome da tabela afetada pela alteração. |
value_capture_type |
STRING |
Descreve o tipo de captura de valor especificado na configuração do fluxo de alterações quando essa alteração foi capturada. O tipo de captura de valor pode ser |
column_types |
ARRAY<STRUCT< |
O nome e o tipo da coluna, seja ela uma chave primária, e a posição da coluna, conforme definido no esquema ("ordinal_position"). A primeira coluna de uma tabela no esquema teria uma posição ordinal de "1". O tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura de tipo descrita na referência da API Spanner. |
mods |
ARRAY<STRUCT< |
Descreve as alterações feitas, incluindo os valores de chave primária, os valores antigos e os novos valores das colunas alteradas ou rastreadas.
A disponibilidade e o conteúdo dos valores antigos e novos vão depender do value_capture_type configurado. Os campos new_values e old_values contêm apenas as colunas sem chave. |
mod_type |
STRING |
Descreve o tipo de alteração. Um de INSERT , UPDATE ou DELETE . |
number_of_records_in_transaction |
INT64 |
O número de registros de alteração de dados que fazem parte dessa transação em todas as partições do fluxo de alterações. |
number_of_partitions_in_transaction |
INT64 |
O número de partições que retornarão registros de alteração de dados para esta transação. |
transaction_tag |
STRING |
Tag associada a essa transação. |
is_system_transaction |
BOOL |
Indica se é uma transação do sistema. |
PostgreSQL
Campo | Tipo | Descrição |
---|---|---|
commit_timestamp |
STRING |
O carimbo de data/hora em que a alteração foi confirmada. |
record_sequence |
STRING |
O número de sequência do registro dentro da transação. Os números de sequência são únicos e aumentam monotonicamente em uma transação, mas não necessariamente contíguos. Classifique os registros do mesmo "server_transaction_id" por "record_array" para reconstruir a ordem das mudanças dentro da transação. |
server_transaction_id |
STRING |
Uma string globalmente exclusiva que representa a transação em que a alteração foi confirmada. O valor só deve ser usado no contexto do processamento de registros de fluxo de alterações e não está correlacionado com o código da transação na API do Spanner. |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica se este é o último registro de uma transação na partição atual. |
table_name |
STRING |
Nome da tabela afetada pela alteração. |
value_capture_type |
STRING |
Descreve o tipo de captura de valor especificado na configuração do fluxo de alterações quando essa alteração foi capturada. O tipo de captura de valor pode ser |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
O nome e o tipo da coluna, seja ela uma chave primária, e a posição da coluna, conforme definido no esquema ("ordinal_position"). A primeira coluna de uma tabela no esquema teria uma posição ordinal de "1". O tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura de tipo descrita na referência da API Spanner. |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
Descreve as alterações feitas, incluindo os valores de chave primária, os valores antigos e os novos valores das colunas alteradas ou rastreadas. A disponibilidade e o conteúdo dos valores antigos e novos dependerão do value_capture_type configurado. Os campos new_values e old_values contêm apenas as colunas sem chave.
|
mod_type |
STRING |
Descreve o tipo de alteração. Um de INSERT , UPDATE ou DELETE . |
number_of_records_in_transaction |
INT64 |
O número de registros de alteração de dados que fazem parte dessa transação em todas as partições do fluxo de alterações. |
number_of_partitions_in_transaction |
NUMBER |
O número de partições que retornarão registros de alteração de dados para esta transação. |
transaction_tag |
STRING |
Tag associada a essa transação. |
is_system_transaction |
BOOLEAN |
Indica se é uma transação do sistema. |
Confira a seguir dois exemplos de registros de alteração de dados. Eles descrevem uma única transação em que há uma transferência entre duas contas. Observe que as duas contas estão em partições de fluxo de alterações separadas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
O registro de alteração de dados a seguir é um exemplo de registro com o tipo de captura de valor "NEW_VALUES"
. Apenas novos valores são preenchidos.
Somente a coluna "LastUpdate"
foi modificada, portanto, apenas essa coluna foi retornada.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
O registro de alteração de dados a seguir é um exemplo de registro com o tipo de captura de valor "NEW_ROW"
. Somente a coluna "LastUpdate"
foi modificada, mas todas as colunas rastreadas são retornadas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
O registro de alteração de dados a seguir é um exemplo de registro com o tipo de captura de valor "NEW_ROW_AND_OLD_VALUES"
. Somente a coluna "LastUpdate"
foi modificada, mas todas as colunas rastreadas são retornadas. Esse tipo de captura
de valor captura o novo valor e o valor antigo de LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Registros do sinal de funcionamento
Quando um registro de sinal de funcionamento é retornado, isso indica que todas as alterações com
commit_timestamp
menores ou iguais a timestamp
do registro do sinal de funcionamento
foram retornadas, e os registros de dados futuros nessa
partição precisam ter carimbos de data/hora de confirmação maiores do que os retornados pelo
registro de pulsação. Os registros de sinal de funcionamento são retornados quando não há alterações de dados
gravadas em uma partição. Quando há alterações de dados gravadas na partição, data_change_record.commit_timestamp
pode ser usado em vez de heartbeat_record.timestamp
para informar que o leitor está avançando
na leitura da partição.
É possível usar registros de sinal de funcionamento retornados nas partições para sincronizar
leitores em todas as partições. Depois que todos os leitores tiverem recebido um sinal de funcionamento maior ou igual a algum carimbo de data/hora A
ou dados ou registros de partição filhas superiores ou iguais ao carimbo de data/hora A
, os leitores saberão que receberam todos os registros confirmados até esse carimbo de data/hora A
e poderão começar a processar os registros em buffer. Por exemplo, classificando os registros entre partições por carimbo de data/hora e agrupando-os por server_transaction_id
.
Um registro de sinal de funcionamento contém apenas um campo:
GoogleSQL
Campo | Tipo | Descrição |
---|---|---|
timestamp |
TIMESTAMP |
O carimbo de data/hora do registro do sinal de funcionamento. |
PostgreSQL
Campo | Tipo | Descrição |
---|---|---|
timestamp |
STRING |
O carimbo de data/hora do registro do sinal de funcionamento. |
Exemplo de registro do sinal de funcionamento que informa que todos os registros com carimbos de data/hora menores ou iguais ao carimbo de data/hora desse registro foram retornados:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Registros de partições filhas
Um registro de partições filhas retorna informações sobre as partições filhas: os tokens de partição, as partições pai e o
start_timestamp
que representa o carimbo de data/hora mais antigo do qual as partições filhas
contêm registros de alteração. Registros com carimbos de data/hora de confirmação
logo antes de child_partitions_record.start_timestamp
são
retornados na partição atual. Depois de retornar todos os registros de partições filhas desta partição, a consulta retornará um status de sucesso, indicando que todos os registros foram retornados para ela.
Os campos de um registro de partições filhas incluem:
GoogleSQL
Campo | Tipo | Descrição |
---|---|---|
start_timestamp |
TIMESTAMP |
Os registros de alteração de dados retornados das partições filhas nesse registro da partição filha têm um carimbo de data/hora de confirmação maior ou igual a start_timestamp . Ao consultar uma partição filha, a consulta precisa especificar o token dela e um start_timestamp maior ou igual a child_partitions_token.start_timestamp . Todos os registros de partições filhas
retornados por uma partição têm o mesmo start_timestamp , e o
carimbo de data/hora sempre está entre o start_timestamp
e o end_timestamp especificados da consulta. |
record_sequence |
STRING |
Um número de sequência monotonicamente crescente que pode ser usado para definir a ordem do registro das partições filhas quando há vários registros de partições filhas retornados com o mesmo start_timestamp em uma partição específica. O token de partição start_timestamp e record_sequence identificam exclusivamente um registro de partição filho. |
child_partitions |
ARRAY<STRUCT< |
Retorna um conjunto de partições filhas e as informações associadas a elas. Isso inclui a string do token de partição usada para identificar a partição filha nas consultas, bem como os tokens das partições pai. |
PostgreSQL
Campo | Tipo | Descrição |
---|---|---|
start_timestamp |
STRING |
Os registros de alteração de dados retornados de partições filhas nesse registro de partições filhas têm um carimbo de data/hora de confirmação maior ou igual a start_timestamp . Ao consultar uma partição filha, a consulta precisa especificar o token dela e um
start_timestamp maior ou igual a
child_partitions_token.start_timestamp . Todos os registros de partições filhas retornados por uma partição têm o mesmo start_timestamp , e o carimbo de data/hora sempre fica entre o start_timestamp e o end_timestamp especificados da consulta.
|
record_sequence |
STRING |
Um número de sequência monotonicamente crescente que pode ser usado para definir a ordem do registro das partições filhas quando há vários registros de partições filhas retornados com o mesmo start_timestamp em uma partição específica. O token de partição start_timestamp e record_sequence identificam exclusivamente um registro de partição filho. |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
Retorna uma matriz de partições filhas e as informações associadas a elas. Isso inclui a string do token de partição usada para identificar a partição filha nas consultas, bem como os tokens das partições pai. |
Este é um exemplo de registro de partição filho:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Fluxo de trabalho de consulta de fluxos de alterações
Execute consultas de fluxo de alterações usando a API ExecuteStreamingSql
, com uma transação somente leitura de uso único e um limite de carimbo de data/hora forte. A função de leitura
do fluxo de alterações permite especificar start_timestamp
e
end_timestamp
para o período de interesse. Todos os registros de alteração
no período de armazenamento podem ser acessados usando o limite forte de carimbo de data/hora
somente leitura.
Todos os outros
TransactionOptions
são inválidos para consultas de fluxo de alterações. Além disso, se TransactionOptions.read_only.return_read_timestamp
for definido como verdadeiro, um valor especial de kint64max - 1
será retornado na mensagem Transaction
que descreve a transação, em vez de um carimbo de data/hora de leitura válido. Esse valor especial precisa ser descartado e não usado para consultas subsequentes.
Cada consulta de fluxo de alterações pode retornar qualquer número de linhas, cada uma contendo um registro de alteração de dados, um registro de sinal de funcionamento ou um registro de partições filhas. Não é necessário definir um prazo para a solicitação.
Exemplo:
O fluxo de trabalho da consulta de streaming começa com a emissão da primeira consulta de fluxo de alterações especificando partition_token
para NULL
. A consulta precisa especificar
a função de leitura do fluxo de alterações, o carimbo de data/hora de início e término de interesse e
o intervalo do sinal de funcionamento. Quando end_timestamp
é NULL
, a consulta continua retornando alterações de dados até que a partição termine.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Processe os registros de dados dessa consulta até que os registros da partição filha sejam
retornados. No exemplo abaixo, dois registros de partição filhos e três tokens de partição são retornados, e a consulta é encerrada. Os registros de partição filhas de uma
consulta específica sempre compartilham o mesmo start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Para processar alterações futuras após 2022-05-01T09:00:01Z
, crie três novas consultas e execute-as em paralelo. As três consultas juntas retornam alterações de dados futuras para o mesmo intervalo de chaves coberto pelo pai. Sempre defina o
start_timestamp
como start_timestamp
no mesmo registro de partição filho e
use o mesmo end_timestamp
e o mesmo intervalo de sinal de funcionamento para processar os registros
de maneira consistente em todas as consultas.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Depois de um tempo, a consulta em child_token_2
é concluída após retornar outro registro de partição filha. Isso indica que uma nova partição abrangerá alterações futuras para child_token_2
e child_token_3
a partir de 2022-05-01T09:30:15Z
. O mesmo registro será retornado pela consulta em child_token_3
, porque ambos são as partições pai do novo child_token_4
.
Para garantir um processamento ordenado estrito de registros de dados para uma chave específica, a consulta em child_token_4
só pode começar depois que todos os pais terminarem, que neste caso são child_token_2
e child_token_3
. Crie apenas uma consulta
para cada token de partição filha. O design do fluxo de trabalho da consulta precisa indicar um
pai para aguardar e programar a consulta em child_token_4
.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Encontre exemplos de processamento e análise de registros de fluxo de alterações no conector do Dataflow para SpannerIO do Apache Beam no GitHub.