Alterar partições, registros e consultas de streams

Nesta página, descrevemos os seguintes atributos dos fluxo de alterações em detalhes:

  • O modelo de particionamento baseado em divisão
  • O formato e o conteúdo dos registros de fluxo de alterações
  • a sintaxe de baixo nível usada para consultar esses registros
  • Um exemplo do fluxo de trabalho de consulta

As informações nesta página são mais relevantes para usar a API Spanner para consultar fluxo de alterações diretamente. Aplicativos que, em vez disso, usam o Dataflow para ler o fluxo de alterações dados não precisam trabalhar diretamente com o modelo de dados descritas aqui.

Para conferir um guia introdutório mais amplo sobre fluxo de alterações, consulte Fluxos de alteração. geral.

Alterar partições do fluxo

Quando ocorre uma alteração em uma tabela monitorada por um fluxo de alterações, O Spanner grava um registro de fluxo de alterações correspondente no banco de dados. de maneira síncrona na mesma transação que os dados mudam. Isso garante que, se a transação for bem-sucedida, o Spanner também capturou e manteve a mudança. Internamente, O Spanner coloca em conjunto o registro do fluxo de alterações e a alteração dos dados para que sejam processados pelo mesmo servidor a fim de minimizar a sobrecarga de gravação.

Como parte da DML para uma divisão específica, o Spanner anexa a gravação aos dados do fluxo de alterações correspondentes. na mesma transação. Por causa dessa colocation, você pode mudar de streaming não acrescentam coordenação extra entre os recursos de veiculação, o que minimiza a sobrecarga de confirmação da transação.

imagem

O Spanner escalona ao dividir e mesclar dinamicamente os dados com base com base na carga e no tamanho do banco de dados e na distribuição de divisões entre os recursos de veiculação.

Para permitir que as leituras e gravações de fluxo de alterações sejam escalonadas, o Spanner divide e mescla o armazenamento interno do fluxo de alterações com os dados do banco de dados, evitando pontos de acesso automaticamente. Para oferecer suporte à leitura de registros de fluxo de alterações em quase em tempo real à medida que as gravações no banco de dados são escalonadas, a API Spanner é projetada para que um fluxo de alterações seja consultado simultaneamente usando o fluxo de alterações partições diferentes. Mapa de partições do fluxo de alterações para alterar as divisões de dados dele que para conter os registros do fluxo de alterações. As partições de um fluxo de alterações mudam de maneira dinâmica ao longo do tempo e estão correlacionados à forma como o Spanner divide e mescla dinamicamente os dados do banco de dados.

Uma partição de fluxo de alterações contém registros de um intervalo de chaves imutável para uma em um período específico. Qualquer partição de fluxo de alterações pode ser dividida em uma ou mais partições de fluxo de alterações ou mesclada com outras partições de fluxo de alterações. Quando esses as partições filhas são criadas para capturar as mudanças, para os respectivos intervalos de chaves imutáveis para o próximo intervalo de tempo. Além disso, aos registros de alteração de dados, uma consulta de fluxo de alterações retorna os registros de partição filhos notificar os leitores sobre novas partições do fluxo de alterações que precisam ser consultadas; como registros de sinal de funcionamento para indicar o progresso quando nenhuma gravação tiver ocorrido. recentemente.

Ao consultar uma partição de fluxo de alterações específica, os registros de alteração são retornados na ordem do carimbo de data/hora de commit. Cada registro de alteração é retornado exatamente uma vez. Nas partições do fluxo de alterações, não há garantia de ordem de mudança. registros. Os registros de alteração de uma determinada chave primária são retornados somente partição para um determinado intervalo de tempo.

Devido à linhagem de partições pai-filho, para processar alterações em um chave específica na ordem do carimbo de data/hora de commit, os registros retornados do filho as partições só devem ser processadas após os registros de todas as partições foram processadas.

Funções de leitura e sintaxe de consulta do fluxo de alterações

GoogleSQL

Você consulta os fluxo de alterações usando o ExecuteStreamingSql API. O Spanner cria automaticamente uma função de leitura especial junto com o fluxo de alterações. A função de leitura dá acesso à mudança registros do stream. A convenção de nomenclatura da função de leitura é READ_change_stream_name:

Supondo que haja um fluxo de alterações SingersNameStream no banco de dados, o a sintaxe de consulta do GoogleSQL é a seguinte:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

A função de leitura aceita os seguintes argumentos:

Nome do argumento Tipo Obrigatório? Descrição
start_timestamp TIMESTAMP Obrigatório Especifica que os registros com commit_timestamp maior ou igual a start_timestamp deve ser retornado. O valor precisa estar dentro do fluxo de alterações período de armazenamento, e deve ser menor ou igual ao tempo atual, e maior ou igual ao carimbo de data/hora de criação do fluxo de alterações.
end_timestamp TIMESTAMP Opcional (padrão: NULL) Especifica que os registros com commit_timestamp a menos que ou igual a end_timestamp deve serão retornadas. O valor precisa estar dentro da retenção do fluxo de alterações e maior ou igual a start_timestamp. A consulta termina após retornar todos os ChangeRecords até end_timestamp ou o usuário encerra a conexão. Se NULL ou não especificado, a consulta é executada até que todos os ChangeRecords sejam retornados ou até que o o usuário encerra a conexão.
partition_token STRING Opcional (padrão: NULL) Especifica qual partição do fluxo de alterações será consultada, com base no o conteúdo das partições filhas .. Se NULL ou não for especificado, isso significa que leitor está consultando o fluxo de alterações pela primeira vez e tem não obteve nenhum token de partição específico para consultar.
heartbeat_milliseconds INT64 Obrigatório Determina a frequência com que um ChangeRecord de batimentos cardíacos é retornado. caso não haja transações confirmadas nessa partição. O valor precisa estar entre 1,000 (um segundo) e 30,0000 (cinco) minutos).
read_options ARRAY Opcional (padrão: NULL) Outras opções de leitura reservadas para uso futuro. Atualmente, o único valor permitido é NULL.

Recomendamos criar um método conveniente para criar o texto da ler parâmetros de consulta e vinculação da função para ela, conforme mostrado exemplo.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Você consulta os fluxo de alterações usando o API ExecuteStreamingSql. O Spanner cria automaticamente uma função de leitura especial junto com o fluxo de alterações. A função de leitura dá acesso à mudança registros do stream. A convenção de nomenclatura da função de leitura é spanner.read_json_change_stream_name:

Supondo que haja um fluxo de alterações SingersNameStream no banco de dados, o A sintaxe de consulta do PostgreSQL é esta:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

A função de leitura aceita os seguintes argumentos:

Nome do argumento Tipo Obrigatório? Descrição
start_timestamp timestamp with time zone Obrigatório Especifica que os registros de mudança com commit_timestamp maior ou igual a start_timestamp deve ser retornado. O valor precisa estar dentro do fluxo de alterações período de armazenamento, e deve ser menor ou igual ao tempo atual, e maior ou igual ao carimbo de data/hora de criação do fluxo de alterações.
end_timestamp timestamp with timezone Opcional (padrão: NULL) Especifica que os registros de mudança com commit_timestamp menor ou igual a end_timestamp deve serão retornadas. O valor precisa estar dentro da retenção do fluxo de alterações e maior ou igual a start_timestamp. A consulta termina após retornar todos os registros de alteração até end_timestamp ou o usuário encerrar a conexão. Se NULL, a consulta será executada até que todos os registros de alteração sejam retornados ou o usuário encerra a conexão.
partition_token text Opcional (padrão: NULL) Especifica qual partição do fluxo de alterações será consultada, com base no o conteúdo das partições filhas .. Se NULL ou não for especificado, isso significa que leitor está consultando o fluxo de alterações pela primeira vez e tem não obteve nenhum token de partição específico para consultar.
heartbeat_milliseconds bigint Obrigatório Determina com que frequência um ChangeRecord de batimentos cardíacos será retornado. caso não haja transações confirmadas nessa partição. O valor precisa estar entre 1,000 (um segundo) e 300,000 (cinco) minutos).
null null Obrigatório Reservado para uso futuro

Recomendamos criar um método conveniente para criar o texto da ler parâmetros de função e vinculação a ela, conforme mostrado abaixo exemplo.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Mudar formato de registro dos streams

GoogleSQL

A função de leitura dos fluxo de alterações retorna uma única coluna ChangeRecord do tipo. ARRAY<STRUCT<...>>. Em cada linha, essa matriz sempre contém um único elemento.

Os elementos da matriz têm o seguinte tipo:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Há três campos nesse struct: data_change_record, heartbeat_record e child_partitions_record, cada um do tipo ARRAY<STRUCT<...>>. Em qualquer linha em que a função de leitura do fluxo de alterações retorna, somente um desses três campos contém um valor; os outros dois estejam vazios ou NULL. Esses campos de matriz contêm no máximo um elemento.

As seções a seguir examinam cada um desses três tipos de registro.

PostgreSQL

A função de leitura dos fluxo de alterações retorna uma única coluna ChangeRecord de digite JSON com a seguinte estrutura:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

Há três chaves possíveis neste objeto: data_change_record, heartbeat_record e child_partitions_record, o valor correspondente o tipo é JSON. Em qualquer linha retornada pela função de leitura do fluxo de alterações, somente existe uma dessas três chaves.

As seções a seguir examinam cada um desses três tipos de registro.

Registros de alteração de dados

Um registro de alteração de dados contém um conjunto de alterações feitas em uma tabela com as mesmo tipo de modificação (inserir, atualizar ou excluir) confirmada no mesmo o carimbo de data/hora de commit em uma partição do fluxo de alterações para o mesmo transação. Vários registros de alteração de dados podem ser retornados para o mesmo em várias partições do fluxo de alterações.

Todos os registros de alteração de dados têm commit_timestamp, server_transaction_id, e record_sequence, que juntos determinam a ordem na mudança stream para um registro de stream. Esses três campos são suficientes para obter a ordem das mudanças e fornecer consistência externa.

Várias transações podem ter o mesmo carimbo de data/hora de confirmação elas tocam em dados não sobrepostos. O campo server_transaction_id a capacidade de distinguir qual conjunto de mudanças (possivelmente em todas as partições do fluxo de alterações) foram emitidos no mesmo transação. Pareá-lo com o record_sequence e Os campos number_of_records_in_transaction permitem armazenar em buffer e ordenar todos os registros de uma transação específica.

Os campos de um registro de alteração de dados incluem:

GoogleSQL

Campo Tipo Descrição
commit_timestamp TIMESTAMP O carimbo de data/hora em que a alteração foi confirmada.
record_sequence STRING O número de sequência do registro dentro da transação. Os números de sequência têm a garantia ser único e crescente monotonicamente (mas não necessariamente contíguos) dentro de uma transação. Classifique os registros para os mesmos server_transaction_id enviado por record_sequence a reconstruir a ordem das alterações dentro da transação.
server_transaction_id STRING Uma string globalmente exclusiva que representa a transação em com o qual a alteração foi confirmada. O valor deve ser usada no contexto do processamento de registros de fluxo de alterações e não é correlacionados ao ID da transação na API Spanner.
is_last_record_in_transaction_in_partition BOOL Indica se este é o último registro de uma transação na partição atual.
table_name STRING Nome da tabela afetada pela alteração.
value_capture_type STRING

Descreve o tipo de captura de valor que foi especificado no configuração do fluxo de alterações quando a alteração foi capturada.

O tipo de captura de valor pode ser "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" ou "NEW_ROW_AND_OLD_VALUES". Por padrão, ele é "OLD_AND_NEW_VALUES". Para mais informações, consulte os tipos de captura de valor.

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
O nome e o tipo da coluna, se é uma chave primária e a posição da coluna como definida no esquema ("posição_ordinal"). A primeira coluna de uma tabela no esquema teria uma posição ordinal de "1". O tipo de coluna podem ser aninhados para colunas de matriz. O formato corresponde à estrutura de tipos descritos na referência da API Spanner.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Descreve as alterações feitas, incluindo a chave primária valores, os valores antigos e os novos valores das colunas alteradas ou rastreadas. A disponibilidade e o conteúdo dos valores antigos e novos vão depender do value_capture_type configurado. Os campos new_values e old_values contêm apenas as colunas sem chave.
mod_type STRING Descreve o tipo de alteração. INSERT, UPDATE ou DELETE
number_of_records_in_transaction INT64 O número de registros de alteração de dados que fazem parte em todas as partições do fluxo de alterações.
number_of_partitions_in_transaction INT64 O número de partições que retornarão registros de alteração de dados para para essa transação.
transaction_tag STRING Tag da transação associada a essa transação.
is_system_transaction BOOL Indica se é uma transação do sistema.

PostgreSQL

Campo Tipo Descrição
commit_timestamp STRING O carimbo de data/hora em que a alteração foi confirmada.
record_sequence STRING O número de sequência do registro dentro da transação. Os números de sequência têm a garantia ser único e crescente monotonicamente (mas não necessariamente contíguos) dentro de uma transação. Classifique os registros para os mesmos `server_transaction_id` por `record_sequência` para reconstruir a ordem das alterações dentro da transação.
server_transaction_id STRING Uma string globalmente exclusiva que representa a transação em com o qual a alteração foi confirmada. O valor deve ser usada no contexto do processamento de registros de fluxo de alterações e não é correlacionado com o ID da transação na API Spanner
is_last_record_in_transaction_in_partition BOOLEAN Indica se este é o último registro de uma transação na partição atual.
table_name STRING Nome da tabela afetada pela alteração.
value_capture_type STRING

Descreve o tipo de captura de valor que foi especificado no configuração do fluxo de alterações quando a alteração foi capturada.

O tipo de captura de valor pode ser "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" ou "NEW_ROW_AND_OLD_VALUES". Por padrão, ele é "OLD_AND_NEW_VALUES". Para mais informações, consulte os tipos de captura de valor.

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
O nome e o tipo da coluna, se é uma chave primária e a posição da coluna como definida no esquema ("posição_ordinal"). A primeira coluna de uma tabela no esquema teria uma posição ordinal de "1". O tipo de coluna podem ser aninhados para colunas de matriz. O formato corresponde à estrutura de tipos descritos na referência da API Spanner.
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Descreve as alterações feitas, incluindo a chave primária os valores antigos e novos dos valores alterados ou acompanhados colunas. A disponibilidade e o conteúdo dos valores antigos e novos dependerão no value_capture_type configurado. Os métodos new_values e Os campos old_values contêm apenas as colunas sem chave.
mod_type STRING Descreve o tipo de alteração. INSERT, UPDATE ou DELETE
number_of_records_in_transaction INT64 O número de registros de alteração de dados que fazem parte em todas as partições do fluxo de alterações.
number_of_partitions_in_transaction NUMBER O número de partições que retornarão registros de alteração de dados para para essa transação.
transaction_tag STRING Tag da transação associada a essa transação.
is_system_transaction BOOLEAN Indica se é uma transação do sistema.

Confira a seguir dois exemplos de registros de alteração de dados. Eles descrevem uma única transação em que há um uma transferência entre duas contas. As duas contas estão fluxo de alterações separado partições diferentes.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

O registro de alteração de dados a seguir é um exemplo de um registro com o valor tipo de captura "NEW_VALUES". Apenas novos valores são preenchidos. Somente a coluna "LastUpdate" foi modificada, portanto, apenas essa coluna foi retornado.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

O registro de alteração de dados a seguir é um exemplo de um registro com o valor tipo de captura "NEW_ROW". Apenas o "LastUpdate" coluna foi modificada, mas todas as colunas rastreadas são retornadas.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

O registro de alteração de dados a seguir é um exemplo de um registro com o valor tipo de captura "NEW_ROW_AND_OLD_VALUES". Apenas o "LastUpdate" coluna foi modificada, mas todas as colunas rastreadas são retornadas. Esta captura de valor tipo captura os valores novos e antigos de LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Registros do sinal de funcionamento

Quando um registro de sinal de funcionamento é retornado, isso indica que todas as alterações com commit_timestamp menor ou igual ao registro do sinal de funcionamento timestamp foram retornados, e registros de dados futuros nesta partição precisa ter carimbos de data/hora de confirmação maiores do que os retornados pela gravação de batimentos cardíacos. Os registros de sinal de funcionamento são retornados quando não há dados. gravadas em uma partição. Quando há alterações de dados gravadas da partição, data_change_record.commit_timestamp poderá ser usada no lugar de heartbeat_record.timestamp para informar que o leitor está avançando o progresso da leitura da partição.

É possível usar registros de sinal de funcionamento retornados nas partições para sincronizar leitores em todas as partições. Depois que todos os leitores tiverem recebido uma sinal de funcionamento maior ou igual a algum carimbo de data/hora A ou recebeu dados ou filhos registros de partição maiores ou iguais ao carimbo de data/hora A, os leitores sabem que receberam todos os registros confirmados nesse carimbo de data/hora A ou antes dele e podem começar processar os registros em buffer, por exemplo, classificar as partições registros por carimbo de data/hora e agrupando-os por server_transaction_id.

Um registro de sinal de funcionamento contém apenas um campo:

GoogleSQL

Campo Tipo Descrição
timestamp TIMESTAMP O carimbo de data/hora do registro do sinal de funcionamento.

PostgreSQL

Campo Tipo Descrição
timestamp STRING O carimbo de data/hora do registro do sinal de funcionamento.

Um exemplo de registro do sinal de funcionamento, comunicando que todos os registros com carimbos de data/hora menores ou iguais ao carimbo de data/hora desse registro foram retornados:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Registros de partições filhas

Um registro de partição filho retorna informações sobre as partições filhas: os tokens de partição, os tokens das partições pai e o start_timestamp que representa o carimbo de data/hora mais antigo do filho partições contêm registros de alteração. Registros com carimbos de data/hora de commit imediatamente antes dos child_partitions_record.start_timestamp são retornados na partição atual. Depois de retornar todas registros de partições filhas desta partição, esta consulta retornará com um status de sucesso, indicando que todos os registros foram retornados para esta partição.

Os campos de um registro de partições filhas incluem:

GoogleSQL

Campo Tipo Descrição
start_timestamp TIMESTAMP Registros de alteração de dados retornados do filho as partições neste registro filho têm um carimbo de data/hora de confirmação maior ou igual a start_timestamp. Ao consultar uma partição filha, a consulta precisa especifique o token da partição filha e um start_timestamp maior ou igual a child_partitions_token.start_timestamp. Todos os registros de partições filhas retornados por uma partição têm os mesmos start_timestamp e o o carimbo de data/hora sempre está entre o start_timestamp especificado da consulta e end_timestamp.
record_sequence STRING Uma sequência monotonicamente crescente que pode ser usado para definir a ordem do que as partições filhas registros de partições filhas retornados com o mesmo start_timestamp em um em uma partição específica. O token de partição, start_timestamp e record_sequence identificam exclusivamente um de partições filhas.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Retorna um conjunto de partições filhas e as informações associadas a elas. Isso inclui a string do token de partição usada para identificar o em consultas, bem como os tokens da instância pai partições diferentes.

PostgreSQL

Campo Tipo Descrição
start_timestamp STRING Registros de alteração de dados retornados do filho as partições neste registro de partições filhas têm um carimbo de data/hora de confirmação maior ou igual a start_timestamp. Ao consultar um filho a consulta deve especificar o token da partição filha e uma start_timestamp maior ou igual a child_partitions_token.start_timestamp. Todas as partições filhas registros retornados por uma partição têm a mesma start_timestamp e o carimbo de data/hora sempre está entre o start_timestamp especificado da consulta, e end_timestamp.
record_sequence STRING Uma sequência monotonicamente crescente que pode ser usado para definir a ordem do que as partições filhas registros de partições filhas retornados com o mesmo start_timestamp em um em uma partição específica. O token de partição, start_timestamp e record_sequence identificam exclusivamente um de partições filhas.
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Retorna uma matriz de partições filhas e as informações associadas a elas. Isso inclui a string do token de partição usada para identificar o em consultas, bem como os tokens da instância pai partições diferentes.

Este é um exemplo de registro de partição filho:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Fluxo de trabalho de consulta de fluxos de alterações

Execute consultas de fluxo de alterações usando o API ExecuteStreamingSql, com um único uso somente leitura transação e uma limite de carimbo de data/hora forte. A mudança a função de leitura de stream permite especificar start_timestamp e end_timestamp para o período de interesse. Todos os registros de alteração no período de armazenamento podem ser acessados usando o recurso limite de carimbo de data/hora.

Todos os outros TransactionOptions são inválidos para consultas de fluxo de alterações. Além disso, se TransactionOptions.read_only.return_read_timestamp for definido como verdadeiro, um valor especial de kint64max - 1 será retornado no Transaction que descreve a transação, em vez de uma mensagem de leitura válida carimbo de data/hora. Este valor especial deve ser descartado e não deve ser usado para nenhum consultas subsequentes.

Cada consulta de fluxo de alterações pode retornar qualquer número de linhas, cada uma contendo um registro de alteração de dados, um registro de sinal de funcionamento ou partições filhas registro. Não é necessário definir um prazo para a solicitação.

Exemplo:

O fluxo de trabalho da consulta de streaming começa com a emissão do primeiro fluxo de alterações consulta especificando partition_token a NULL. A consulta precisa especificar a função de leitura do fluxo de alterações, o carimbo de data/hora de início e término de interesse; o intervalo do sinal de funcionamento. Quando o end_timestamp é NULL, a consulta mantém os dados retornados vão mudar até o fim da partição.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Processar os registros de dados dessa consulta até que os registros da partição filho sejam retornados. No exemplo abaixo, dois registros de partição filhos e três registros são retornados, a consulta é encerrada. Registros de partição filhos de uma consulta específica sempre compartilha o mesmo start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Para processar mudanças futuras após 2022-05-01T09:00:01Z, crie três novas consultas e executá-las em paralelo. As três consultas juntas retornam resultados mudanças de dados para o mesmo intervalo de chaves coberto pelo pai. Defina start_timestamp para start_timestamp no mesmo registro da partição filha e usar a mesma end_timestamp e o mesmo intervalo de sinal de funcionamento para processar os registros. de forma consistente em todas as consultas.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Depois de um tempo, a consulta em child_token_2 é concluída após retornar outro registro da partição filho, isso indica que será criada uma nova partição cobrindo mudanças futuras no child_token_2 e no child_token_3 a partir de 2022-05-01T09:30:15Z O mesmo registro será retornado pela consulta no child_token_3, porque ambas são as partições pai da nova child_token_4. Para garantir um processamento ordenado estrito de registros de dados para uma chave específica, a consulta em child_token_4 só precisa começar depois que todos os pais terminarem, que, neste caso, são child_token_2 e child_token_3. Crie apenas uma consulta para cada token de partição filha, o design do fluxo de trabalho da consulta deve indicar um pai aguarde e programe a consulta em child_token_4.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Encontre exemplos de como lidar e analisar registros de fluxo de alterações no SpannerIO do Apache Beam Conector do Dataflow ativado GitHub.