Alterar partições, registros e consultas de streams

Esta página descreve em detalhes os seguintes atributos dos streams de alteração:

  • O modelo de particionamento baseado em divisão
  • O formato e o conteúdo dos registros de stream de alterações
  • a sintaxe de baixo nível usada para consultar esses registros;
  • Um exemplo do fluxo de trabalho de consulta

As informações nesta página são mais relevantes para usar a API Spanner para consultar streams de alteração diretamente. Os aplicativos que usam o Dataflow para ler dados do fluxo de alterações não precisam trabalhar diretamente com o modelo de dados descrito aqui.

Para ver um guia introdutório mais amplo para fazer mudanças nos streams, consulte Visão geral dos fluxos de alterações.

Alterar partições de stream

Quando uma alteração ocorre em uma tabela observada por um fluxo de alteração, o Cloud Spanner grava um registro do stream de alterações correspondente no banco de dados, de maneira síncrona na mesma transação da alteração de dados. Isso garante que, se a transação for bem-sucedida, o Spanner também capturará e manterá a alteração. Internamente, o Spanner colocaliza o registro do stream de alteração e a alteração de dados para que eles sejam processados pelo mesmo servidor para minimizar a sobrecarga de gravação.

Como parte da DML para uma divisão específica, o Spanner anexa a gravação à divisão de dados de fluxo de alterações correspondente na mesma transação. Devido a essa colocation, os streams de alteração não adicionam coordenação extra aos recursos de veiculação, o que minimiza a sobrecarga de confirmação de transação.

imagem

O Spanner escalona dividindo e mesclando dados dinamicamente com base na carga e no tamanho do banco de dados e distribuindo divisões entre os recursos de disponibilização.

Para permitir que as gravações e leituras de streams de alteração sejam escalonadas, o Spanner divide e mescla o armazenamento interno do stream de alterações com os dados do banco de dados, evitando pontos de acesso automaticamente. Para oferecer suporte à leitura de registros de stream de alterações quase em tempo real conforme a gravação das gravações do banco de dados, a API Spanner é projetada para que um stream de alteração seja consultado simultaneamente usando partições de stream de alteração. Altere as partições do stream do stream para alterar as divisões de dados do stream que contêm os registros de stream de alteração. As partições de um stream de alterações são alteradas dinamicamente ao longo do tempo e estão relacionadas à forma como o Spanner divide e mescla dinamicamente os dados do banco de dados.

Uma partição de stream de alteração contém registros para um intervalo de chaves imutável para um período específico. Qualquer partição de stream de alteração pode ser dividida em uma ou mais partições de stream de alteração ou ser mesclada com outras partições de stream de alteração. Quando esses eventos de divisão ou mesclagem acontecem, partições partições são criadas para capturar as alterações nos respectivos intervalos de chaves imutáveis para o próximo período. Além dos registros de alteração de dados, uma consulta de stream de alteração retorna registros de partição filhos para notificar os leitores sobre novas partições de stream de alteração que precisam ser consultadas, assim como os registros de batimentos cardíacos para indicar o progresso quando nenhuma gravação ocorreu recentemente.

Ao consultar uma partição de stream de alteração específica, os registros de alteração são retornados em ordem de carimbo de data/hora de confirmação. Cada registro de alteração é retornado exatamente uma vez. Nas partições do stream de alterações, não há garantia de ordem dos registros de mudança. Os registros de alteração de uma chave primária específica são retornados em apenas uma partição para um período específico.

Devido à linhagem de partição pai-filho, para processar alterações em uma chave específica na ordem do carimbo de data/hora de confirmação, os registros retornados de partições filhas precisam ser processados apenas depois que os registros de todas as partições pais forem processados.

Alterar sintaxe de consulta de stream

Os streams de alteração são consultados usando a API ExecuteStreamingSql. Uma função com valor de tabela especial (TVF, na sigla em inglês) é criada automaticamente com o fluxo de alteração. Ela fornece acesso aos registros do stream de alteração. A convenção de nomenclatura do TVF é READ_change_stream_name.

Supondo que um fluxo de alterações SingersNameStream exista no banco de dados, a sintaxe de consulta é a seguinte:

SELECT ChangeRecord
    FROM READ_SingersNameStream (
        start_timestamp,
        end_timestamp,
        partition_token,
        heartbeat_milliseconds
    )

A função aceita os seguintes argumentos:

Nome do argumento Tipo Obrigatória? Descrição
start_timestamp TIMESTAMP Obrigatório Especifica que registros com commit_timestamp maior ou igual a start_timestamp precisam ser retornados. O valor precisa estar dentro do período de armazenamento do stream de alteração e ser menor ou igual ao horário atual e maior ou igual ao carimbo de data/hora da criação do stream de alteração.
end_timestamp TIMESTAMP Opcional (padrão: NULL) Especifica que registros com commit_timestamp menor ou igual a end_timestamp precisam ser retornados. O valor precisa estar dentro do período de armazenamento do stream de alteração e maior ou igual ao start_timestamp. A consulta será concluída depois de retornar todos os ChangeRecords até end_timestamp ou um conjunto de registros de partição filho. Se NULL ou não for especificado, a consulta será executada até a partição atual ser concluída e todos os ChangeRecords com os campos child_partition_record definidos serem retornados. Especificar NULL para end_timestamp indica que todas as alterações mais recentes serão lidas quando ocorrerem.
partition_token STRING Opcional (padrão: NULL) Especifica qual partição de stream de stream deve ser consultada com base no conteúdo de registros de partições filhas. Se NULL ou não for especificado, isso significa que o leitor está consultando o fluxo de alterações pela primeira vez e não recebeu nenhum token de partição específico para consultar.
heartbeat_milliseconds INT64 Obrigatório Determina com que frequência um ChangeRecord do sinal de funcionamento será retornado caso não haja transações confirmadas nessa partição. O valor precisa estar entre 1000 (um segundo) e 300000 (cinco minutos).

Recomendamos criar um método de conveniência para criar o texto da consulta de TVF e os parâmetros de vinculação a ela, conforme mostrado no exemplo a seguir.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
    return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

Mudar o formato do registro da transmissão

O stream de alteração TVF retorna uma única coluna ChangeRecord do tipo ARRAY<STRUCT<...>>. Em cada linha, essa matriz sempre contém um único elemento.

Os elementos da matriz têm o seguinte tipo:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Há três campos nesse struct: data_change_record, heartbeat_record e child_partitions_record, cada um do tipo ARRAY<STRUCT<...>>. Em qualquer linha retornada do stream de alteração TVF, apenas um desses três campos contém um valor; os outros dois estão vazios ou NULL. Esses campos de matriz contêm, no máximo, um elemento.

As seções a seguir examinam cada um desses três tipos de registro.

Registros de alterações de dados

Um registro de alteração de dados contém um conjunto de alterações em uma tabela com o mesmo tipo de modificação (inserir, atualizar ou excluir) confirmado no mesmo carimbo de data/hora de confirmação em uma partição de fluxo de alterações para a mesma transação. Vários registros de alteração de dados podem ser retornados para a mesma transação em várias partições de stream de alteração.

Todos os registros de mudança de dados têm os campos commit_timestamp, server_transaction_id e record_sequence, que determinam a ordem no stream de alterações de um registro de stream. Esses três campos são suficientes para derivar a ordem das mudanças e fornecer consistência externa.

Observe que várias transações podem ter o mesmo carimbo de data/hora de confirmação se tocarem em dados não sobrepostos. O campo server_transaction_id permite distinguir qual conjunto de mudanças (possivelmente em todas as partições do fluxo de alterações) foi emitido na mesma transação. Pareá-lo com os campos record_sequence e number_of_records_in_transaction também permite armazenar em buffer e ordenar todos os registros de uma transação específica.

Os campos de um registro de alteração de dados incluem o seguinte:

Campo Tipo Descrição
commit_timestamp TIMESTAMP O carimbo de data/hora em que a alteração foi confirmada.
record_sequence STRING O número da sequência do registro dentro da transação. Os números de sequência são garantidos e são monotonicamente crescentes, mas não necessariamente contíguos, em uma transação. Classifique os registros do mesmo "server_transaction_id" por "record_Sequence" para reconstruir a ordem das alterações dentro da transação.
server_transaction_id STRING Uma string globalmente exclusiva que representa a transação em que a alteração foi confirmada. O valor deve ser usado apenas no contexto de processamento de registros de stream de alteração e não está relacionado ao ID da transação na API do Spanner, por exemplo, "TransactionSelector.id". Ambos identificam uma transação de maneira exclusiva em comparação com outros valores dentro do mesmo contexto (ou seja, fluxo de dados "data_change_records" ou a API do Spanner).
is_last_record_in_transaction_in_partition BOOL Indica se este é o último registro de uma transação na partição atual.
table_name STRING Nome da tabela afetada pela alteração.
value_capture_type STRING

Descreve o tipo de captura de valor que foi especificado na configuração do fluxo de alterações quando essa mudança foi capturada.

No momento, sempre "OLD_AND_NEW_VALUES".

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
O nome da coluna, o tipo de coluna, se ela é uma chave primária e a posição da coluna, conforme definido no esquema ("ordinal_position"). A primeira coluna de uma tabela no esquema teria uma posição ordinal de "1". O tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura de tipos descrita na referência da API Spanner.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Descreve as alterações que foram feitas, incluindo os valores de chave primária e os valores antigos e novos das colunas alteradas se o fluxo de alteração estiver configurado com "value_capture_type". Os campos new_values e old_values contêm apenas as colunas que não são de chave.
mod_type STRING Descreve o tipo de alteração. Um INSERT, UPDATE ou DELETE.
number_of_records_in_transaction INT64 O número de registros de alterações de dados que fazem parte dessa transação em todas as partições do fluxo de alterações.
number_of_partitions_in_transaction INT64 O número de partições que retornarão os registros de alteração de dados para essa transação.

Veja a seguir um exemplo de registros de alterações de dados. Elas descrevem uma única transação em que há uma transferência entre duas contas. Observe que as duas contas estão em partições de stream de alteração separadas.

data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
         "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
}
data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
}

Recordes de batimentos cardíacos

Quando um registro de sinal de funcionamento é retornado, ele indica que todas as mudanças com commit_timestamp menor que ou igual ao registro de sinal de funcionamento foram retornadas, e os registros de dados futuros nessa partição precisam ter carimbos de data/hora de confirmação maiores do que os retornados pelo registro de sinal de funcionamento. Os registros de sinal de funcionamento são retornados quando não há alterações de dados gravadas em uma partição. Quando há mudanças de dados gravadas na partição, data_change_record.commit_timestamp pode ser usada em vez de heartbeat_record.timestamp para informar que o leitor está progredindo na leitura da partição.

É possível usar registros de sinal de funcionamento retornados em partições para sincronizar leitores em todas as partições. Depois que todos os leitores tiverem recebido um batimento cardíaco maior ou igual a algum carimbo de data/hora A ou recebido dados ou registros de partição filhos maiores ou iguais ao carimbo de data/hora A, eles saberão que receberam todos os registros confirmados no carimbo de data/hora A ou antes dele e poderão começar a processar os registros armazenados em buffer, por exemplo, classificar os registros entre partições por carimbo de data/hora e agrupá-los por server_transaction_id.

Um registro de batimentos cardíacos contém apenas um campo:

Campo Tipo Descrição
timestamp TIMESTAMP Carimbo de data/hora do registro de pulsação.

Um exemplo de registro de pulsação, comunicando que todos os registros com carimbos de data/hora menores ou iguais ao carimbo de data/hora desse registro foram retornados:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Registros de partições filhas

Um registro de partições filhas retorna informações sobre partições filhas: os tokens de partição delas, os tokens das partições mães e o start_timestamp que representa o carimbo de data/hora mais antigo em que as partições filhas contêm registros de alterações. Os registros com carimbos de data/hora de confirmação imediatamente anteriores à child_partitions_record.start_timestamp são retornados na partição atual. Depois de retornar todos os registros dessa partição de consulta, ela retornará com um status de sucesso, indicando que todos eles foram retornados.

Os campos de um registro de partições filhas incluem o seguinte:

Campo Tipo Descrição
start_timestamp TIMESTAMP Os registros de alteração de dados retornados das partições filhas nesse registro terão um carimbo de data/hora de confirmação maior ou igual a start_timestamp. Ao consultar uma partição filha, a consulta precisa especificar o token da partição filha e uma start_timestamp maior ou igual a child_partitions_token.start_timestamp. Todos os registros de partições filhas retornados por uma partição terão o mesmo start_timestamp e o carimbo de data/hora sempre estará entre as consultas especificadas start_timestamp e end_timestamp.
record_sequence STRING Um número de sequência monotonicamente crescente, que pode ser usado para definir a ordem do registro das partições filhas quando há vários registros desse tipo retornados com o mesmo start_timestamp em uma partição específica. O token de partição, start_timestamp e record_sequence, identificam exclusivamente um registro de partições filhas.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Retorna um conjunto de partições filhas e as informações associadas a elas. Isso inclui a string do token de partição usada para identificar a partição filha em consultas, bem como os tokens das partições pais.

Um exemplo de registro de partições filhas:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"],
    }
  ],
}

Alterar fluxo de trabalho de consultas de streams

Execute consultas de stream de alterações usando a API ExecuteStreamingSql com uma transação somente leitura de uso único e uma limitação de carimbo de data/hora forte. O TVF de stream de alterações permite que os usuários especifiquem start_timestamp e end_timestamp para o intervalo de interesse. Todos os registros de alterações dentro do período de armazenamento podem ser acessados usando o forte limite de carimbo de data/hora somente leitura.

Todos os outros TransactionOptions são inválidos para consultas de stream de alterações. Além disso, se TransactionOptions.read_only.return_read_timestamp for definido como verdadeiro, um valor especial de kint64max - 1 será retornado na mensagem Transaction que descreve a transação, em vez de um carimbo de data/hora de leitura válido. Esse valor especial deve ser descartado e não usado para consultas subsequentes.

Cada consulta de stream de alteração pode retornar qualquer número de linhas, cada uma contendo um registro de alteração de dados, um registro de sinal de funcionamento ou um registro de partições filhas. Não é necessário definir um prazo para a solicitação.

Exemplo:

O fluxo de trabalho de consulta de streaming começa com a emissão da primeira consulta de stream de alteração especificando partition_token para NULL. A consulta precisará especificar a função TVF para o stream de alteração, o carimbo de data/hora de início e término do interesse e o intervalo de sinal de funcionamento. Quando o end_timestamp for NULL, a consulta continuará retornando alterações de dados até o nascimento das partições filhas.

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:00-00",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

Processe os registros de dados dessa consulta até que os registros de partição filha sejam retornados. No exemplo abaixo, dois registros de partição filhos e três tokens de partição são retornados. Depois, a consulta é encerrada. Os registros de partição filha de uma consulta específica sempre compartilharão o mesmo start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
      // from the initial change stream queries.
      "parent_partition_tokens": [NULL],
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL],
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL],
    }
  ],
}

Para processar mudanças futuras após 2022-05-01 09:00:01-00, crie três novas consultas e execute-as em paralelo. Juntas, as três retornarão alterações de dados futuras para o mesmo intervalo de chaves abrangido pelo pai. Sempre defina start_timestamp como start_timestamp no mesmo registro de partição filho e use o mesmo intervalo de end_timestamp e intervalo de frequência cardíaca para processar os registros de forma consistente em todas as consultas.

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000);

Após algum tempo, a consulta em child_token_2 termina após retornar outro registro de partição filho, esses registros indicam que uma nova partição cobrirá as mudanças futuras de child_token_2 e child_token_3 a partir de 2022-05-01 09:30:15-00. O mesmo registro será retornado pela consulta em child_token_3, porque ambos são as partições pai do novo child_token_4. Para garantir um processamento ordenado de registros de dados para uma chave específica, a consulta em child_token_4 precisa começar apenas depois que todos os pais forem concluídos, que neste caso são child_token_2 e child_token_3. Crie apenas uma consulta para cada token de partição filho. O design do fluxo de trabalho de consulta apontará um pai para aguardar e programar a consulta em child_token_4.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:30:15-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:30:15-00",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

Veja exemplos de gerenciamento e análise de registros de stream de alterações no conector do Dataflow Beam para Apache Beam no GitHub.