Nesta página, descrevemos os seguintes atributos dos fluxo de alterações em detalhes:
- O modelo de particionamento baseado em divisão
- O formato e o conteúdo dos registros do fluxo de alterações
- a sintaxe de baixo nível usada para consultar esses registros
- Um exemplo do fluxo de trabalho de consulta
As informações nesta página são mais relevantes para usar a API Spanner para consultar fluxo de alterações diretamente. Aplicativos que, em vez disso, usam o Dataflow para ler o fluxo de alterações dados não precisam trabalhar diretamente com o modelo de dados descritas aqui.
Para conferir um guia introdutório mais amplo sobre fluxo de alterações, consulte Fluxos de alteração. geral do Google.
Mudar as partições do fluxo
Quando uma mudança ocorre em uma tabela monitorada por um fluxo de alterações, o Spanner grava um registro de fluxo de alterações correspondente no banco de dados, simultaneamente na mesma transação que a mudança de dados. Isso garante que, se a transação for bem-sucedida, o Spanner também capturou e manteve a mudança. Internamente, O Spanner coloca em conjunto o registro do fluxo de alterações e a alteração dos dados para que sejam processados pelo mesmo servidor a fim de minimizar a sobrecarga de gravação.
Como parte da DML para uma divisão específica, o Spanner anexa a gravação à divisão de dados de fluxo de alterações correspondente na mesma transação. Devido a essa colocalização, os fluxos de mudança não adicionam coordenação extra aos recursos de veiculação, o que minimiza a sobrecarga de confirmação de transação.
O Spanner escalona ao dividir e mesclar dinamicamente os dados com base com base na carga e no tamanho do banco de dados e na distribuição de divisões entre os recursos de veiculação.
Para ativar as gravações e leituras de fluxos de alterações para escalonamento, o Spanner divide e mescla o armazenamento interno do fluxo de alterações com os dados do banco de dados, evitando automaticamente os pontos de acesso. Para oferecer suporte à leitura de registros de fluxo de alterações em quase em tempo real à medida que as gravações no banco de dados são escalonadas, a API Spanner é projetada para que um fluxo de alterações seja consultado simultaneamente usando o fluxo de alterações partições diferentes. Mapa de partições do fluxo de alterações para alterar as divisões de dados dele que para conter os registros do fluxo de alteração. As partições de um fluxo de alterações mudam dinamicamente ao longo do tempo e estão relacionadas à forma como o Spanner divide e mescla dinamicamente os dados do banco de dados.
Uma partição de fluxo de alterações contém registros de um intervalo de chaves imutável para uma em um período específico. Qualquer partição de fluxo de alterações pode ser dividida em uma ou mais partições de fluxo de alterações ou mesclada com outras partições de fluxo de alterações. Quando esses eventos de divisão ou mesclagem acontecem, partições filhas são criadas para capturar as mudanças dos respectivos intervalos de chaves imutáveis para o próximo período. Além disso, aos registros de alteração de dados, uma consulta de fluxo de alterações retorna os registros de partição filhos notificar os leitores sobre novas partições do fluxo de alterações que precisam ser consultadas; como registros de sinal de funcionamento para indicar o progresso quando nenhuma gravação tiver ocorrido. recentemente.
Ao consultar uma partição de fluxo de alterações específica, os registros de alteração são retornados na ordem do carimbo de data/hora de commit. Cada registro de alteração é retornado exatamente uma vez. Nas partições do fluxo de alterações, não há garantia de ordem de mudança. registros. Os registros de mudança de uma chave primária específica são retornados apenas em uma partição para um determinado período.
Devido à linhagem da partição pai-filho, para processar alterações em uma chave específica na ordem do carimbo de data/hora de commit, os registros retornados do filho as partições só devem ser processadas após os registros de todas as partições foram processadas.
Funções de leitura e sintaxe de consulta do fluxo de alterações
GoogleSQL
Para consultar fluxos de mudanças, use a
API
ExecuteStreamingSql
. O Spanner cria automaticamente uma função de leitura especial junto
com o fluxo de alterações. A função de leitura fornece acesso à mudança
registros do stream. A convenção de nomenclatura da função de leitura é
READ_change_stream_name
.
Supondo que um fluxo de alterações SingersNameStream
exista no banco de dados, a
sintaxe de consulta do GoogleSQL é a seguinte:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
A função de leitura aceita os seguintes argumentos:
Nome do argumento | Tipo | Obrigatório? | Descrição |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obrigatório | Especifica que os registros com commit_timestamp maior ou igual a start_timestamp
deve ser retornado. O valor precisa estar dentro do fluxo de alterações
período de armazenamento, e deve ser menor ou igual ao tempo atual,
e maior ou igual ao carimbo de data/hora de criação do fluxo de alterações. |
end_timestamp |
TIMESTAMP |
Opcional (padrão: NULL ) |
Especifica que os registros com commit_timestamp a menos
que ou igual a end_timestamp deve
serão retornadas. O valor precisa estar dentro do período de retenção da mudança de fluxo
e ser maior ou igual a start_timestamp . A consulta
é concluída depois de retornar todos os ChangeRecords até end_timestamp
ou quando o usuário encerra a conexão. Se NULL ou não
especificado, a consulta é executada até que todos os ChangeRecords sejam retornados ou até que o
o usuário encerra a conexão. |
partition_token |
STRING |
Opcional (padrão: NULL ) |
Especifica qual partição do fluxo de alterações será consultada com base no
conteúdo dos registros de partições
filhas. Se NULL ou não for especificado, significa que o
leitor está consultando o fluxo de mudanças pela primeira vez e não
recebeu nenhum token de partição específico para consultar. |
heartbeat_milliseconds |
INT64 |
Obrigatório | Determina com que frequência um ChangeRecord de batimento é retornado
caso não haja transações confirmadas nessa partição.
O valor precisa estar entre 1,000 (um segundo) e 300,000 (cinco
minutos). |
read_options |
ARRAY |
Opcional (padrão: NULL ) |
Outras opções de leitura reservadas para uso futuro. No momento, o único valor permitido é NULL . |
Recomendamos criar um método conveniente para criar o texto da ler parâmetros de consulta e vinculação da função para ela, conforme mostrado exemplo.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Você consulta os fluxo de alterações usando o
API ExecuteStreamingSql
.
O Spanner cria automaticamente uma função de leitura especial com o fluxo de alterações. A função de leitura fornece acesso aos registros do fluxo de mudanças. A convenção de nomenclatura da função de leitura é
spanner.read_json_change_stream_name
:
Supondo que haja um fluxo de alterações SingersNameStream
no banco de dados, o
A sintaxe de consulta do PostgreSQL é esta:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
A função de leitura aceita os seguintes argumentos:
Nome do argumento | Tipo | Obrigatório? | Descrição |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obrigatório | Especifica que os registros de mudança com commit_timestamp maior ou igual a start_timestamp
precisam ser retornados. O valor precisa estar dentro do período de retenção do fluxo de alterações e ser menor ou igual ao horário atual e maior ou igual ao carimbo de data/hora da criação do fluxo de alterações. |
end_timestamp |
timestamp with timezone |
Opcional (padrão: NULL ) |
Especifica que os registros de mudança com commit_timestamp
menor ou igual a end_timestamp deve
serão retornadas. O valor precisa estar dentro do período de retenção da mudança de fluxo
e ser maior ou igual a start_timestamp .
A consulta é concluída depois de retornar todos os registros de mudança até
end_timestamp ou quando o usuário encerra a conexão.
Se NULL , a consulta será executada até que todos os registros de alteração sejam retornados
ou o usuário encerra a conexão. |
partition_token |
text |
Opcional (padrão: NULL ) |
Especifica qual partição do fluxo de alterações será consultada, com base no
o conteúdo das partições filhas
.. Se NULL ou não for especificado, significa que o
leitor está consultando o fluxo de mudanças pela primeira vez e não
recebeu nenhum token de partição específico para consultar. |
heartbeat_milliseconds |
bigint |
Obrigatório | Determina a frequência com que um ChangeRecord de batimentos cardíacos será retornado.
caso não haja transações confirmadas nessa partição.
O valor precisa estar entre 1,000 (um segundo) e 300,000 (cinco
minutos). |
null |
null |
Obrigatório | Reservado para uso futuro |
Recomendamos criar um método de conveniência para criar o texto da função de leitura e vincular parâmetros a ele, conforme mostrado no exemplo a seguir.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Mudar o formato de registro do fluxo de alterações
GoogleSQL
A função de leitura dos fluxo de alterações retorna uma única coluna ChangeRecord
do tipo.
ARRAY<STRUCT<...>>
. Em cada linha, essa matriz sempre contém um único elemento.
Os elementos da matriz têm o seguinte tipo:
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
Há três campos nessa estrutura: data_change_record
,
heartbeat_record
e child_partitions_record
, cada um do tipo
ARRAY<STRUCT<...>>
. Em qualquer linha em que a função de leitura do fluxo de alterações
retorna, somente um desses três campos contém um valor; Os outros dois
estejam vazios ou NULL
. Esses campos de matriz contêm, no máximo, um elemento.
As seções a seguir examinam cada um desses três tipos de registro.
PostgreSQL
A função de leitura dos fluxo de alterações retorna uma única coluna ChangeRecord
de
digite JSON
com a seguinte estrutura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Há três chaves possíveis neste objeto: data_change_record
,
heartbeat_record
e child_partitions_record
. O tipo de valor correspondente
é JSON
.
Em qualquer linha retornada pela função de leitura do fluxo de alterações, somente
existe uma dessas três chaves.
As seções a seguir examinam cada um desses três tipos de registro.
Registros de alteração de dados
Um registro de alteração de dados contém um conjunto de alterações feitas em uma tabela com as mesmo tipo de modificação (inserir, atualizar ou excluir) confirmada no mesmo o carimbo de data/hora de commit em uma partição do fluxo de alterações para o mesmo transação. Vários registros de alteração de dados podem ser retornados para a mesma transação em várias partições de fluxo de alterações.
Todos os registros de alteração de dados têm campos commit_timestamp
, server_transaction_id
e record_sequence
, que, juntos, determinam a ordem no fluxo de alterações de um registro de fluxo. Esses três campos são suficientes para derivar
a ordem das mudanças e fornecer consistência externa.
Várias transações podem ter o mesmo carimbo de data/hora de confirmação se
elas tocarem dados não sobrepostos. O campo server_transaction_id
a capacidade de distinguir qual conjunto de mudanças (possivelmente
em todas as partições do fluxo de alterações) foram emitidos no mesmo
transação. Pareá-lo com o record_sequence
e
Os campos number_of_records_in_transaction
permitem armazenar em buffer e ordenar
todos os registros de uma transação específica.
Os campos de um registro de alteração de dados incluem:
GoogleSQL
Campo | Tipo | Descrição |
---|---|---|
commit_timestamp |
TIMESTAMP |
O carimbo de data/hora em que a alteração foi confirmada. |
record_sequence |
STRING |
O número de sequência do registro na transação. Os números de sequência são exclusivos e aumentam de maneira uniforme (mas não necessariamente contíguos) em uma transação. Ordene os registros do mesmo
server_transaction_id por record_sequence para
reconstruir a ordem das mudanças na transação.
Essa ordem pode ser otimizada pelo Spanner para ter um melhor desempenho e nem sempre corresponder à ordem original fornecida pelos usuários. |
server_transaction_id |
STRING |
Uma string globalmente exclusiva que representa a transação em com o qual a alteração foi confirmada. O valor deve ser usada no contexto do processamento de registros de fluxo de alterações e não é correlacionados ao ID da transação na API Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica se este é o último registro de uma transação na partição atual. |
table_name |
STRING |
Nome da tabela afetada pela alteração. |
value_capture_type |
STRING |
Descreve o tipo de captura de valor especificado na configuração do fluxo de mudanças quando a mudança foi capturada. O tipo de captura de valor pode ser |
column_types |
ARRAY<STRUCT< |
O nome e o tipo da coluna, se é uma chave primária e a posição da coluna como definida no esquema ("posição_ordinal"). A primeira coluna de uma tabela no esquema teria uma posição ordinal de "1". O tipo de coluna podem ser aninhados para colunas de matriz. O formato corresponde à estrutura de tipos descritos na referência da API Spanner. |
mods |
ARRAY<STRUCT< |
Descreve as alterações feitas, incluindo a chave primária
valores, os valores antigos e os novos valores das colunas alteradas ou rastreadas.
A disponibilidade e o conteúdo dos valores antigos e novos vão depender do value_capture_type configurado. Os campos new_values e old_values contêm apenas as colunas sem chave. |
mod_type |
STRING |
Descreve o tipo de alteração. INSERT , UPDATE ou
DELETE |
number_of_records_in_transaction |
INT64 |
O número de registros de alteração de dados que fazem parte dessa transação em todas as partições de fluxo de alterações. |
number_of_partitions_in_transaction |
INT64 |
O número de partições que retornarão registros de alteração de dados para para essa transação. |
transaction_tag |
STRING |
Tag da transação associada a essa transação. |
is_system_transaction |
BOOL |
Indica se a transação é do sistema. |
PostgreSQL
Campo | Tipo | Descrição |
---|---|---|
commit_timestamp |
STRING |
O carimbo de data/hora em que a alteração foi confirmada. |
record_sequence |
STRING |
O número de sequência do registro na transação. Os números de sequência são exclusivos e aumentam de maneira uniforme (mas não necessariamente contíguos) em uma transação. Ordene os registros do mesmo `server_transaction_id` por `record_sequence` para reconstruir a ordem das mudanças na transação. |
server_transaction_id |
STRING |
Uma string globalmente exclusiva que representa a transação em com o qual a alteração foi confirmada. O valor deve ser usada no contexto do processamento de registros de fluxo de alterações e não é correlacionado com o ID da transação na API Spanner |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica se este é o último registro de uma transação na partição atual. |
table_name |
STRING |
Nome da tabela afetada pela alteração. |
value_capture_type |
STRING |
Descreve o tipo de captura de valor especificado na configuração do fluxo de mudanças quando a mudança foi capturada. O tipo de captura de valor pode ser |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
O nome e o tipo da coluna, se é uma chave primária e a posição da coluna como definida no esquema ("posição_ordinal"). A primeira coluna de uma tabela no esquema teria uma posição ordinal de "1". O tipo de coluna podem ser aninhados para colunas de matriz. O formato corresponde à estrutura de tipo descrita na referência da API Spanner. |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
Descreve as mudanças feitas, incluindo os valores da chave primária, os valores antigos e os novos valores das colunas alteradas ou rastreadas. A disponibilidade e o conteúdo dos valores antigos e novos dependerão
no value_capture_type configurado. Os campos new_values e
old_values contêm apenas as colunas sem chave.
|
mod_type |
STRING |
Descreve o tipo de alteração. Um de INSERT , UPDATE ou
DELETE . |
number_of_records_in_transaction |
INT64 |
O número de registros de alteração de dados que fazem parte dessa transação em todas as partições de fluxo de alterações. |
number_of_partitions_in_transaction |
NUMBER |
O número de partições que vão retornar registros de alteração de dados para essa transação. |
transaction_tag |
STRING |
Tag da transação associada a essa transação. |
is_system_transaction |
BOOLEAN |
Indica se é uma transação do sistema. |
Confira abaixo um par de exemplos de registros de alteração de dados. Eles descrevem uma única transação em que há uma transferência entre duas contas. As duas contas estão fluxo de alterações separado partições diferentes.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
O registro de alteração de dados a seguir é um exemplo de um registro com o valor
tipo de captura "NEW_VALUES"
. Apenas novos valores são preenchidos.
Somente a coluna "LastUpdate"
foi modificada, portanto, apenas essa coluna
foi retornado.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
O registro de alteração de dados a seguir é um exemplo de um registro com o valor
tipo de captura "NEW_ROW"
. Apenas o "LastUpdate"
coluna foi modificada, mas todas as colunas rastreadas são retornadas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
O registro de mudança de dados a seguir é um exemplo de um registro com o tipo de captura de valor "NEW_ROW_AND_OLD_VALUES"
. Somente a coluna "LastUpdate"
foi modificada, mas todas as colunas rastreadas são retornadas. Esse tipo de captura de valor captura o novo e o valor antigo de LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Registros de batimentos
Quando um registro de sinal de funcionamento é retornado, isso indica que todas as alterações com
commit_timestamp
menor ou igual ao registro do sinal de funcionamento
timestamp
foram retornados, e registros de dados futuros nesta
partição precisa ter carimbos de data/hora de confirmação maiores do que os retornados pela
gravação de batimentos cardíacos. Os registros de tic-tac são retornados quando não há mudanças de dados
gravadas em uma partição. Quando há alterações de dados gravadas
da partição, data_change_record.commit_timestamp
poderá ser usada no lugar
de heartbeat_record.timestamp
para informar que o leitor está avançando
o progresso da leitura da partição.
É possível usar registros de sinal de funcionamento retornados nas partições para sincronizar
leitores em todas as partições. Quando todos os leitores recebem um
batimento de coração maior ou igual a um carimbo de data/hora A
ou recebem dados ou registros de
partição filha maiores ou iguais a um carimbo de data/hora A
, os leitores sabem que receberam
todos os registros confirmados até ou antes desse carimbo de data/hora A
e podem começar
a processar os registros armazenados em buffer, por exemplo, classificando os registros de
partição cruzada por carimbo de data/hora e agrupando-os por server_transaction_id
.
Um registro de batimento cardíaco contém apenas um campo:
GoogleSQL
Campo | Tipo | Descrição |
---|---|---|
timestamp |
TIMESTAMP |
O carimbo de data/hora do registro de batimento cardíaco. |
PostgreSQL
Campo | Tipo | Descrição |
---|---|---|
timestamp |
STRING |
O carimbo de data/hora do registro de batimento cardíaco. |
Um exemplo de registro de batimento cardíaco, comunicando que todos os registros com carimbos de data/hora menores ou iguais ao carimbo de data/hora deste registro foram retornados:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Registros de partições filhas
Um registro de partições filhas retorna informações sobre as partições filhas: os tokens de partições, os tokens das partições mães e o
start_timestamp
que representa o carimbo de data/hora mais antigo para o qual as partições
filhas contêm registros de mudança. Registros com carimbos de data/hora de commit
imediatamente antes dos child_partitions_record.start_timestamp
são
retornados na partição atual. Depois de retornar todas
registros de partições filhas desta partição, esta consulta retornará com
um status de sucesso, indicando que todos os registros foram retornados para esta
partição.
Os campos de um registro de partições filho incluem:
GoogleSQL
Campo | Tipo | Descrição |
---|---|---|
start_timestamp |
TIMESTAMP |
Registros de alteração de dados retornados do filho
as partições neste registro filho têm um carimbo de data/hora de confirmação
maior ou igual a start_timestamp . Ao consultar uma partição filha, a consulta precisa
especificar o token da partição filha e um start_timestamp maior ou igual a
child_partitions_token.start_timestamp . Todos os registros de partições filhas
retornados por uma partição têm os mesmos start_timestamp e o
o carimbo de data/hora sempre está entre o start_timestamp especificado da consulta
e end_timestamp . |
record_sequence |
STRING |
Um número de sequência
monotonicamente crescente que pode ser usado para definir a ordem do
registro de partições filho quando há vários
registros de partições filho retornados com o mesmo start_timestamp em uma
partição específica. O token de partição,
start_timestamp e
record_sequence identificam exclusivamente um
registro de partições filho. |
child_partitions |
ARRAY<STRUCT< |
Retorna um conjunto de partições filhas e as informações associadas a elas. Isso inclui a string de token de partição usada para identificar a partição filha em consultas, bem como os tokens das partições mãe. |
PostgreSQL
Campo | Tipo | Descrição |
---|---|---|
start_timestamp |
STRING |
Os registros de mudança de dados retornados de partições
filhas neste registro de partições filhas têm um carimbo de data/hora de confirmação
maior ou igual a start_timestamp . Ao consultar um filho
a consulta deve especificar o token da partição filha e uma
start_timestamp maior ou igual a
child_partitions_token.start_timestamp . Todos os registros de partições
filhas retornados por uma partição têm o mesmo
start_timestamp , e o carimbo de data/hora sempre fica entre o
start_timestamp e o
end_timestamp especificados pela consulta.
|
record_sequence |
STRING |
Uma sequência monotonicamente crescente
que pode ser usado para definir a ordem do
que as partições filhas
registros de partições filhas retornados com o mesmo start_timestamp em um
em uma partição específica. O token de partição,
start_timestamp e
record_sequence identificam exclusivamente um
registro de partições filho. |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
Retorna uma matriz de partições filhas e as informações associadas a elas. Isso inclui a string de token de partição usada para identificar a partição filha em consultas, bem como os tokens das partições mãe. |
Confira a seguir um exemplo de registro de partição filha:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Fluxo de trabalho de consulta de fluxos de alterações
Execute consultas de fluxo de alterações usando a
API ExecuteStreamingSql
, com uma
transação somente leitura
de uso único e uma
limitação de carimbo de data/hora forte. A função de leitura
do fluxo de mudança permite especificar start_timestamp
e
end_timestamp
para o período de tempo de interesse. Todos os registros de alteração
no período de armazenamento podem ser acessados usando o recurso
limite de carimbo de data/hora.
Todos os outros
TransactionOptions
são inválidos para consultas de fluxo de mudanças. Além disso,
se TransactionOptions.read_only.return_read_timestamp
for definido como verdadeiro,
um valor especial de kint64max - 1
será retornado na mensagem Transaction
que descreve a transação, em vez de um carimbo de data/hora de leitura
válido. Esse valor especial precisa ser descartado e não usado em consultas
posteriores.
Cada consulta de fluxo de alterações pode retornar qualquer número de linhas, cada uma contendo um registro de alteração de dados, de batimento cardíaco ou de partições filhas. Não é necessário definir um prazo para a solicitação.
Exemplo:
O fluxo de trabalho de consulta de streaming começa emitindo a primeira consulta de fluxo de mudanças
especificando partition_token
para NULL
. A consulta precisa especificar
a função de leitura do fluxo de alterações, o carimbo de data/hora de início e término de interesse e
o intervalo de batimentos. Quando o end_timestamp
é NULL
, a consulta continua
retornando mudanças de dados até que a partição termine.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Processe os registros de dados dessa consulta até que os registros de partição filhos sejam
retornados. No exemplo abaixo, dois registros de partição filho e três tokens de partição
são retornados, e a consulta é encerrada. Os registros de partição filho de uma
consulta específica sempre compartilham o mesmo start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Para processar mudanças futuras após 2022-05-01T09:00:01Z
, crie três novas
consultas e executá-las em paralelo. As três consultas juntas retornam mudanças de dados futuras para o mesmo intervalo de chaves que o pai abrange. Sempre defina o
start_timestamp
como o start_timestamp
no mesmo registro de partição filho e
use o mesmo end_timestamp
e intervalo de batimentos para processar os registros
de forma consistente em todas as consultas.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
Depois de um tempo, a consulta em child_token_2
é concluída após retornar outro
registro de partição filho. Esse registro indica que uma nova partição vai
cobrir mudanças futuras para child_token_2
e child_token_3
, começando em
2022-05-01T09:30:15Z
. O mesmo registro será retornado pela consulta no
child_token_3
, porque ambas são as partições pai da nova child_token_4
.
Para garantir um processamento ordenado estrito de registros de dados para uma chave específica,
a consulta em child_token_4
só precisa começar depois que todos os pais terminarem,
que, neste caso, são child_token_2
e child_token_3
. Crie apenas uma consulta
para cada token de partição filha, o design do fluxo de trabalho da consulta deve indicar um
pai aguarde e programe a consulta em child_token_4
.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Encontre exemplos de processamento e análise de registros de fluxo de mudanças no conector do Dataflow do Apache Beam SpannerIO no GitHub (em inglês).