Alterar partições, registros e consultas de streams

Nesta página, descrevemos os seguintes atributos dos fluxo de alterações em detalhes:

  • O modelo de particionamento baseado em divisão
  • O formato e o conteúdo dos registros de fluxo de alterações
  • A sintaxe de baixo nível usada para consultar esses registros
  • Um exemplo do fluxo de trabalho de consulta

As informações nesta página são mais relevantes para usar a API Spanner para consultar fluxo de alterações diretamente. Em vez disso, os aplicativos que usam o Dataflow para ler dados de fluxo de alterações não precisam trabalhar diretamente com o modelo de dados descrito aqui.

Para ver um guia introdutório mais amplo sobre os fluxo de alterações, consulte Visão geral dos fluxos de alterações.

Mudar partições de stream

Quando ocorre uma alteração em uma tabela que é observada por um fluxo de alterações, o Spanner grava um registro de fluxo de alterações correspondente no banco de dados, de maneira síncrona, na mesma transação que a alteração de dados. Isso garante que, se a transação for bem-sucedida, o Spanner também vai capturar e manter a alteração. Internamente, o Spanner colocaliza o registro do fluxo de alterações e a alteração dos dados para que eles sejam processados pelo mesmo servidor a fim de minimizar a sobrecarga de gravação.

Como parte da DML a uma divisão específica, o Spanner anexa a gravação à divisão de dados do fluxo de alterações correspondente na mesma transação. Por causa dessa colocation, os fluxos de alteração não adicionam coordenação extra entre os recursos de exibição, o que minimiza a sobrecarga de confirmação da transação.

imagem

O Spanner é escalonado dividindo e mesclando dinamicamente os dados com base na carga e no tamanho do banco de dados e distribuindo divisões entre os recursos de exibição.

Para permitir o escalonamento de gravações e leituras de fluxos de alterações, o Spanner divide e mescla o armazenamento interno do fluxo de alterações com os dados do banco de dados, evitando automaticamente os pontos de acesso. Para oferecer suporte à leitura de registros de fluxo de alterações quase em tempo real à medida que as gravações do banco de dados são escalonadas, a API Spanner é projetada para que um fluxo de alterações seja consultado simultaneamente usando partições de fluxo de alterações. As partições do fluxo de alterações são mapeadas para as divisões de dados do fluxo de alterações que contêm os registros do fluxo de alterações. As partições de um fluxo de alterações mudam dinamicamente ao longo do tempo e estão correlacionadas com a forma como o Spanner divide e mescla os dados do banco de dados dinamicamente.

Uma partição do fluxo de alterações contém registros para um intervalo de chaves imutável para um intervalo de tempo específico. Qualquer partição do fluxo de alterações pode ser dividida em uma ou mais partições ou mesclada com outras. Quando esses eventos de divisão ou mesclagem acontecem, as partições filhas são criadas para capturar as alterações nos respectivos intervalos de chaves imutáveis para o próximo intervalo. Além dos registros de alteração de dados, uma consulta de fluxo de alterações retorna registros de partição filha para notificar os leitores sobre novas partições de fluxo de alterações que precisam ser consultadas, bem como registros de pulsação para indicar o progresso quando nenhuma gravação tiver ocorrido recentemente.

Ao consultar uma partição de fluxo de alterações específica, os registros de alterações são retornados na ordem do carimbo de data/hora de confirmação. Cada registro de alterações é retornado exatamente uma vez. Nas partições de fluxo de alterações, não há ordem garantida dos registros de mudanças. Os registros de alterações de uma chave primária específica são retornados apenas em uma partição por um determinado intervalo de tempo.

Devido à linhagem da partição pai-filho, para processar alterações de uma chave específica na ordem do carimbo de data/hora de confirmação, os registros retornados das partições filhas precisam ser processados somente depois que os registros de todas as partições pai forem processados.

Alterar as funções de leitura do fluxo e a sintaxe de consulta

GoogleSQL

Consulte os fluxo de alterações usando a API ExecuteStreamingSql. O Spanner cria automaticamente uma função de leitura especial junto com o fluxo de alterações. A função de leitura fornece acesso aos registros do fluxo de alterações. A convenção de nomenclatura da função de leitura é READ_change_stream_name.

Supondo que haja um fluxo de alterações SingersNameStream no banco de dados, a sintaxe de consulta do GoogleSQL é a seguinte:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

A função de leitura aceita os seguintes argumentos:

Nome do argumento Tipo Obrigatório? Descrição
start_timestamp TIMESTAMP Obrigatório Especifica que registros com commit_timestamp maior ou igual a start_timestamp precisam ser retornados. O valor precisa estar dentro do período de armazenamento do fluxo de alterações e ser menor ou igual à hora atual e maior ou igual ao carimbo de data/hora da criação do fluxo de alterações.
end_timestamp TIMESTAMP Opcional (padrão: NULL) Especifica que registros com commit_timestamp menor ou igual a end_timestamp precisam ser retornados. O valor precisa estar dentro do período de retenção do fluxo de alterações e ser maior ou igual ao start_timestamp. A consulta termina depois de retornar todos os ChangeRecords até end_timestamp ou o usuário encerra a conexão. Se NULL ou não for especificado, a consulta será executada até que todos os ChangeRecords sejam retornados ou o usuário encerre a conexão.
partition_token STRING Opcional (padrão: NULL) Especifica qual partição de fluxo de alterações será consultada, com base no conteúdo dos registros de partições filhas. Se for NULL ou não for especificado, isso significa que o leitor está consultando o fluxo de alterações pela primeira vez e não recebeu tokens de partição específicos para fazer a consulta.
heartbeat_milliseconds INT64 Obrigatório Determina a frequência com que um sinal de funcionamento ChangeRecord é retornado caso não haja transações confirmadas nessa partição. O valor precisa estar entre 1,000 (um segundo) e 30,0000 (cinco minutos).
read_options ARRAY Opcional (padrão: NULL) Outras opções de leitura reservadas para uso futuro. Atualmente, o único valor permitido é NULL.

Recomendamos criar um método de conveniência para criar o texto da consulta de função de leitura e vincular parâmetros a ele, conforme mostrado no exemplo a seguir.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Consulte os fluxo de alterações usando a API ExecuteStreamingSql. O Spanner cria automaticamente uma função de leitura especial junto com o fluxo de alterações. A função de leitura fornece acesso aos registros do fluxo de alterações. A convenção de nomenclatura da função de leitura é spanner.read_json_change_stream_name.

Supondo que há um fluxo de alterações SingersNameStream no banco de dados, a sintaxe de consulta do PostgreSQL é a seguinte:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

A função de leitura aceita os seguintes argumentos:

Nome do argumento Tipo Obrigatório? Descrição
start_timestamp timestamp with time zone Obrigatório Especifica que registros de mudança com commit_timestamp maior ou igual a start_timestamp precisam ser retornados. O valor precisa estar dentro do período de armazenamento do fluxo de alterações e ser menor ou igual à hora atual e maior ou igual ao carimbo de data/hora da criação do fluxo de alterações.
end_timestamp timestamp with timezone Opcional (padrão: NULL) Especifica que registros de mudança com commit_timestamp menor ou igual a end_timestamp precisam ser retornados. O valor precisa estar dentro do período de retenção do fluxo de alterações e ser maior ou igual ao start_timestamp. A consulta termina depois de retornar todos os registros de mudanças até end_timestamp ou o usuário encerra a conexão. Se NULL, a consulta será executada até que todos os registros de mudanças sejam retornados ou o usuário encerre a conexão.
partition_token text Opcional (padrão: NULL) Especifica qual partição de fluxo de alterações será consultada, com base no conteúdo dos registros de partições filhas. Se for NULL ou não for especificado, isso significa que o leitor está consultando o fluxo de alterações pela primeira vez e não recebeu tokens de partição específicos para fazer a consulta.
heartbeat_milliseconds bigint Obrigatório Determina com que frequência um ChangeRecord de sinal de funcionamento será retornado caso não haja transações confirmadas nessa partição. O valor precisa estar entre 1,000 (um segundo) e 300,000 (cinco minutos).
null null Obrigatório Reservado para uso futuro

Recomendamos criar um método conveniente para criar o texto da função de leitura e vincular parâmetros a ele, conforme mostrado no exemplo abaixo.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Alterar formato de registro dos streams

GoogleSQL

A função de leitura dos fluxo de alterações retorna uma única coluna ChangeRecord do tipo ARRAY<STRUCT<...>>. Em cada linha, essa matriz sempre contém um único elemento.

Os elementos da matriz são do seguinte tipo:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Há três campos nesse struct: data_change_record, heartbeat_record e child_partitions_record, cada um do tipo ARRAY<STRUCT<...>>. Em qualquer linha que a função de leitura do fluxo de alterações retornar, apenas um desses três campos contém um valor. Os outros dois estão vazios ou NULL. Esses campos de matriz contêm, no máximo, um elemento.

As seções a seguir examinam cada um desses três tipos de registro.

PostgreSQL

A função de leitura dos fluxo de alterações retorna uma única coluna ChangeRecord do tipo JSON com a seguinte estrutura:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

Há três chaves possíveis nesse objeto: data_change_record, heartbeat_record e child_partitions_record. O tipo de valor correspondente é JSON. Em qualquer linha que a função de leitura do fluxo de alterações retornar, existe apenas uma dessas três chaves.

As seções a seguir examinam cada um desses três tipos de registro.

Registros de alteração de dados

Um registro de alteração de dados contém um conjunto de alterações em uma tabela com o mesmo tipo de modificação (inserir, atualizar ou excluir) confirmadas no mesmo carimbo de data/hora de confirmação em uma partição de fluxo de alterações para a mesma transação. Vários registros de alteração de dados podem ser retornados para a mesma transação em várias partições de fluxo de alterações.

Todos os registros de alteração de dados têm os campos commit_timestamp, server_transaction_id e record_sequence, que, juntos, determinam a ordem no fluxo de alterações de um registro de fluxo. Esses três campos são suficientes para derivar a ordem das alterações e fornecer consistência externa.

Várias transações poderão ter o mesmo carimbo de data/hora de confirmação se tocarem em dados não sobrepostos. O campo server_transaction_id oferece a capacidade de distinguir qual conjunto de alterações (possivelmente em partições de fluxo de alterações) foi emitido na mesma transação. Pareá-lo com os campos record_sequence e number_of_records_in_transaction permite armazenar em buffer e ordenar todos os registros de uma transação específica.

Os campos de um registro de alteração de dados incluem o seguinte:

GoogleSQL

Campo Tipo Descrição
commit_timestamp TIMESTAMP O carimbo de data/hora em que a alteração foi confirmada.
record_sequence STRING O número de sequência do registro na transação. Os números de sequência têm a garantia de serem únicos e aumentar monotonicamente (mas não necessariamente contíguos) dentro de uma transação. Classifique os registros do mesmo server_transaction_id por record_sequence para reconstruir a ordem das mudanças na transação.
server_transaction_id STRING Uma string globalmente exclusiva que representa a transação em que a alteração foi confirmada. O valor só deve ser usado no contexto do processamento de registros de fluxo de alterações e não está relacionado ao código da transação na API do Spanner.
is_last_record_in_transaction_in_partition BOOL Indica se este é o último registro de uma transação na partição atual.
table_name STRING Nome da tabela afetada pela alteração.
value_capture_type STRING

Descreve o tipo de captura de valor especificado na configuração do fluxo de alterações quando essa alteração foi capturada.

O tipo de captura do valor pode ser "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" ou "NEW_ROW_AND_OLD_VALUES". Por padrão, ele é "OLD_AND_NEW_VALUES". Para mais informações, consulte os tipos de captura de valor.

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
O nome, o tipo da coluna, se é uma chave primária, e a posição da coluna conforme definido no esquema ("ordinal_position"). A primeira coluna de uma tabela no esquema teria uma posição ordinal de "1". O tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura de tipo descrita na referência da API Spanner.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Descreve as alterações feitas, incluindo as chaves-valor primárias, os valores antigos e os novos valores das colunas alteradas ou rastreadas. A disponibilidade e o conteúdo dos valores antigos e novos dependerão do value_capture_type configurado. Os campos new_values e old_values contêm apenas as colunas sem chave.
mod_type STRING Descreve o tipo de alteração. Será INSERT, UPDATE ou DELETE.
number_of_records_in_transaction INT64 O número de registros de alteração de dados que fazem parte dessa transação em todas as partições de fluxo de alterações.
number_of_partitions_in_transaction INT64 O número de partições que retornarão registros de alteração de dados para esta transação.
transaction_tag STRING Tag da transação associada a essa transação.
is_system_transaction BOOL Indica se a transação é do sistema.

PostgreSQL

Campo Tipo Descrição
commit_timestamp STRING O carimbo de data/hora em que a alteração foi confirmada.
record_sequence STRING O número de sequência do registro na transação. Os números de sequência têm a garantia de serem únicos e aumentar monotonicamente (mas não necessariamente contíguos) dentro de uma transação. Classifique os registros do mesmo "server_transaction_id" por "record_sequence" para reconstruir a ordem das alterações na transação.
server_transaction_id STRING Uma string globalmente exclusiva que representa a transação em que a alteração foi confirmada. O valor só deve ser usado no contexto do processamento de registros de fluxo de alterações e não está relacionado ao ID da transação na API do Spanner.
is_last_record_in_transaction_in_partition BOOLEAN Indica se este é o último registro de uma transação na partição atual.
table_name STRING Nome da tabela afetada pela alteração.
value_capture_type STRING

Descreve o tipo de captura de valor especificado na configuração do fluxo de alterações quando essa alteração foi capturada.

O tipo de captura do valor pode ser "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" ou "NEW_ROW_AND_OLD_VALUES". Por padrão, ele é "OLD_AND_NEW_VALUES". Para mais informações, consulte os tipos de captura de valor.

column_types

[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
O nome, o tipo da coluna, se é uma chave primária, e a posição da coluna conforme definido no esquema ("ordinal_position"). A primeira coluna de uma tabela no esquema teria uma posição ordinal de "1". O tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura de tipo descrita na referência da API Spanner.
mods

[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Descreve as alterações que foram feitas, incluindo as chaves-valor primárias, os valores antigos e os novos valores das colunas alteradas ou rastreadas. A disponibilidade e o conteúdo dos valores antigos e novos dependerão do value_capture_type configurado. Os campos new_values e old_values contêm apenas as colunas sem chave.
mod_type STRING Descreve o tipo de alteração. Será INSERT, UPDATE ou DELETE.
number_of_records_in_transaction INT64 O número de registros de alteração de dados que fazem parte dessa transação em todas as partições de fluxo de alterações.
number_of_partitions_in_transaction NUMBER O número de partições que retornarão registros de alteração de dados para esta transação.
transaction_tag STRING Tag da transação associada a essa transação.
is_system_transaction BOOLEAN Indica se a transação é do sistema.

Confira a seguir dois exemplos de registros de mudança de dados. Elas descrevem uma única transação em que há uma transferência entre duas contas. Observe que as duas contas estão em partições de fluxo de alterações separadas.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

O registro de alteração de dados a seguir é um exemplo de registro com o tipo de captura de valor "NEW_VALUES". Observe que apenas os novos valores são preenchidos. Somente a coluna "LastUpdate" foi modificada, então apenas essa coluna foi retornada.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

O registro de alteração de dados a seguir é um exemplo de registro com o tipo de captura de valor "NEW_ROW". Somente a coluna "LastUpdate" foi modificada, mas todas as colunas rastreadas são retornadas.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

O registro de alteração de dados a seguir é um exemplo de registro com o tipo de captura de valor "NEW_ROW_AND_OLD_VALUES". Somente a coluna "LastUpdate" foi modificada, mas todas as colunas rastreadas são retornadas. Esse tipo de captura de valor captura o novo valor e o antigo valor de LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Registros de batimentos cardíacos

Quando um registro de sinal de funcionamento é retornado, isso indica que todas as alterações com commit_timestamp menor ou igual ao timestamp do registro de sinal de funcionamento foram retornadas e que os registros de dados futuros nessa partição precisam ter carimbos de data/hora de confirmação maiores do que aqueles retornados pelo registro de sinal de funcionamento. Registros de sinal de funcionamento são retornados quando não há alterações de dados gravadas em uma partição. Quando há alterações de dados gravados na partição, data_change_record.commit_timestamp pode ser usado em vez de heartbeat_record.timestamp para dizer que o leitor está progredindo na leitura da partição.

É possível usar registros de sinal de funcionamento retornados nas partições para sincronizar leitores em todas as partições. Quando todos os leitores tiverem recebido um sinal de funcionamento maior ou igual a algum carimbo de data/hora A ou tiverem recebido dados ou registros de partição A maiores ou iguais ao carimbo de data/hora A, os leitores saberão que receberam todos os registros confirmados no carimbo de data/hora A e podem começar a processar os registros em buffer, por exemplo, classificando os registros de partição cruzada por carimbo de data/hora e agrupando-os por server_transaction_id.

Um registro de sinal de funcionamento contém apenas um campo:

GoogleSQL

Campo Tipo Descrição
timestamp TIMESTAMP Carimbo de data/hora do registro do sinal de funcionamento.

PostgreSQL

Campo Tipo Descrição
timestamp STRING Carimbo de data/hora do registro do sinal de funcionamento.

Um exemplo de registro de sinal de funcionamento, comunicando que todos os registros com carimbos de data/hora menores ou iguais ao do registro foram retornados:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Registros de partições filhas

Um registro de partições filhas retorna informações sobre partições filhas: os tokens de partição, os tokens das partições pai e o start_timestamp que representa o carimbo de data/hora mais antigo de que as partições filhas contêm registros de alteração. Registros com carimbos de data/hora de confirmação imediatamente anteriores ao child_partitions_record.start_timestamp são retornados na partição atual. Depois de retornar todos os registros de partições filhas dessa partição, a consulta retornará com um status de sucesso, indicando que todos os registros foram retornados para ela.

Os campos de um registro de partições filhos incluem o seguinte:

GoogleSQL

Campo Tipo Descrição
start_timestamp TIMESTAMP Os registros de alteração de dados retornados das partições filhas nesse registro têm um carimbo de data/hora de confirmação maior ou igual a start_timestamp. Ao consultar uma partição filha, a consulta precisa especificar o token da partição filha e um start_timestamp maior ou igual a child_partitions_token.start_timestamp. Todos os registros de partições filhas retornados por uma partição têm o mesmo start_timestamp e o carimbo de data/hora sempre fica entre o start_timestamp e o end_timestamp especificados da consulta.
record_sequence STRING Um número de sequência monotonicamente crescente que pode ser usado para definir a ordem do registro de partições filhas quando há vários registros de partições filhas retornados com o mesmo start_timestamp em uma partição específica. Os tokens de partição, start_timestamp e record_sequence, identificam exclusivamente um registro de partições filhas.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Retorna um conjunto de partições filhas e as informações associadas a elas. Isso inclui a string do token de partição usada para identificar a partição filha nas consultas, bem como os tokens das partições pai.

PostgreSQL

Campo Tipo Descrição
start_timestamp STRING Os registros de alteração de dados retornados das partições filhas nesse registro têm um carimbo de data/hora de confirmação maior ou igual a start_timestamp. Ao consultar uma partição filha, ela precisa especificar o token da partição filha e um start_timestamp maior ou igual a child_partitions_token.start_timestamp. Todos os registros de partições filhas retornados por uma partição têm o mesmo start_timestamp e o carimbo de data/hora sempre fica entre o start_timestamp especificado da consulta e o end_timestamp.
record_sequence STRING Um número de sequência monotonicamente crescente que pode ser usado para definir a ordem do registro de partições filhas quando há vários registros de partições filhas retornados com o mesmo start_timestamp em uma partição específica. Os tokens de partição, start_timestamp e record_sequence, identificam exclusivamente um registro de partições filhas.
child_partitions

[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Retorna uma matriz de partições filhas e as informações associadas a elas. Isso inclui a string do token de partição usada para identificar a partição filha nas consultas, bem como os tokens das partições pai.

Este é um exemplo de registro de partição filha:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Alterar o fluxo de trabalho da consulta de streams

Execute consultas de fluxo de alterações usando a API ExecuteStreamingSql, com uma transação somente leitura de uso único e um limite de carimbo de data/hora de uso único. A função de leitura do fluxo de alterações permite especificar start_timestamp e end_timestamp para o intervalo de tempo de interesse. Todos os registros de alterações dentro do período de armazenamento podem ser acessados usando o forte limite de carimbo de data/hora somente leitura.

Todos os outros TransactionOptions são inválidos para consultas de fluxo de alterações. Além disso, se TransactionOptions.read_only.return_read_timestamp for definido como verdadeiro, um valor especial de kint64max - 1 será retornado na mensagem Transaction que descreve a transação, em vez de um carimbo de data/hora de leitura válido. Esse valor especial precisa ser descartado e não usado para consultas subsequentes.

Cada consulta de fluxo de alterações pode retornar qualquer número de linhas, cada uma contendo um registro de alteração de dados, de pulsação ou de partições filhas. Não é necessário definir um prazo para a solicitação.

Exemplo:

O fluxo de trabalho da consulta de streaming começa com a emissão da primeira consulta de fluxo de alterações especificando o partition_token para NULL. A consulta precisa especificar a função de leitura do fluxo de alterações, o carimbo de data/hora de início e término de interesse e o intervalo do sinal de funcionamento. Quando end_timestamp é NULL, a consulta continua retornando dados de alterações até que a partição termine.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Processe os registros de dados dessa consulta até que os registros da partição filha sejam retornados. No exemplo abaixo, dois registros de partição filha e três tokens de partição são retornados e a consulta é encerrada. Os registros de partição filhos de uma consulta específica sempre compartilham o mesmo start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Para processar alterações futuras após 2022-05-01T09:00:01Z, crie três novas consultas e execute-as em paralelo. As três consultas juntas retornam alterações futuras de dados para o mesmo intervalo de chaves que o pai abrange. Sempre defina start_timestamp como start_timestamp no mesmo registro de partição filha e use o mesmo end_timestamp e intervalo de pulsação para processar os registros de maneira consistente em todas as consultas.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Depois de um tempo, a consulta em child_token_2 é concluída após retornar outro registro de partição filho. Esses registros indicam que uma nova partição abrangerá mudanças futuras para child_token_2 e child_token_3 a partir de 2022-05-01T09:30:15Z. O mesmo registro será retornado pela consulta em child_token_3, porque ambas são as partições pai do novo child_token_4. Para garantir um processamento ordenado rigoroso dos registros de dados de uma chave específica, a consulta em child_token_4 só precisa começar depois que todos os pais terminarem, que nesse caso são child_token_2 e child_token_3. Crie apenas uma consulta para cada token de partição filha. O design do fluxo de trabalho da consulta precisa indicar um pai para aguardar e programar a consulta em child_token_4.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Veja exemplos de processamento e análise de registros de fluxo de alterações no conector do Dataflow para Apache Beam SpannerIO no GitHub.