Esta página explica como usar o conector Kafka para consumir e encaminhar dados de fluxos de alterações do Spanner.
Principais conceitos
A seguir, são descritos os conceitos principais do conector do Kafka.
Debezium
O Debezium é um projeto de código aberto que oferece uma plataforma de streaming de dados de baixa latência para captura de dados alterados.
Conector Kafka
O conector do Kafka fornece uma abstração sobre a API Spanner para publicar fluxo de alterações do Spanner no Kafka. Com esse conector, você não precisa gerenciar o ciclo de vida da partição fluxo de alterações, o que é necessário ao usar a API Spanner diretamente.
O conector do Kafka produz um evento de mudança para cada mod de registro de mudança de dados e envia registros de eventos de mudança para um tópico do Kafka separado para cada tabela rastreada por fluxo de mudança. Um registro de mudança de dados mod representa uma única modificação (inserção, atualização ou exclusão) que foi capturada. Um único registro de mudança de dados pode conter mais de uma modificação.
Saída do conector Kafka
O conector do Kafka encaminha os registros de fluxo de alterações diretamente
para um tópico do Kafka separado. O nome do tópico de saída precisa ser connector_name
.table_name
.
Se o tópico não existir, o conector do Kafka vai criar um automaticamente com esse nome.
Também é possível configurar transformações de roteamento de tópicos para redirecionar registros para tópicos especificados. Se você quiser usar o roteamento de tópicos, desative a funcionalidade marca-d'água baixa.
Ordem de registro
Os registros são ordenados por carimbo de data/hora de confirmação por chave primária nos
tópicos do Kafka. Os registros pertencentes a chaves primárias diferentes não têm
garantias de ordenação. Os registros com a mesma chave primária são armazenados na
mesma partição de tópico do Kafka. Se você quiser processar transações inteiras, também poderá
usar os campos registro de alteração de dados
server_transaction_id
e number_of_records_in_transaction
para
montar uma transação do Spanner.
Eventos de mudança
O conector do Kafka gera um evento de mudança de dados para cada operação INSERT
, UPDATE
e DELETE
. Cada evento contém uma chave e valores para a linha alterada.
É possível usar conversores do Kafka Connect para produzir eventos de mudança de dados nos formatos Protobuf
, AVRO
, JSON
ou JSON Schemaless
. Se você usar um
conversor do Kafka Connect que produz esquemas, o evento vai conter
esquemas separados para a chave e os valores. Caso contrário, o evento só contém
a chave e os valores.
O esquema da chave nunca muda. O esquema dos valores é uma amálgama de todas as colunas que o fluxo de mudanças rastreou desde o início do conector.
Se você configurar o conector para produzir eventos JSON, o evento de mudança de saída terá cinco campos:
O primeiro campo
schema
especifica um esquema do Kafka Connect que descreve o esquema de chave do Spanner.O primeiro campo
payload
tem a estrutura descrita pelo camposchema
anterior e contém a chave da linha que foi alterada.O segundo campo
schema
especifica o esquema do Kafka Connect que descreve o esquema da linha alterada.O segundo campo
payload
tem a estrutura descrita pelo camposchema
anterior e contém os dados reais da linha que foi alterada.O campo
source
é obrigatório e descreve os metadados de origem do evento.
Confira a seguir um exemplo de evento de alteração de dados:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Marca d'água baixa
A marca d'água baixa descreve o tempo T em que o conector do Kafka garante ter transmitido e publicado em um tópico do Kafka todos os eventos com carimbo de data/hora < T.
É possível ativar a marca d'água baixa no conector do Kafka usando o parâmetro gcp.spanner.low-watermark.enabled
. Esse parâmetro fica desativado
por padrão. Se a marca-d'água baixa estiver ativada, o campo low_watermark
no registro de mudança de dados do fluxo de alterações será preenchido com o carimbo de data/hora da marca-d'água baixa atual do conector do Kafka.
Se não houver registros sendo produzidos, o conector do Kafka vai enviar "batimentos cardíacos" periódicos de marca d'água para os tópicos de saída do Kafka detectados pelo conector.
Esses batimentos cardíacos de marca d'água são registros vazios, exceto o
campo low_watermark
. Em seguida, use a marca d'água baixa para realizar agregações baseadas em tempo.
Por exemplo, é possível usar a marca d'água baixa para ordenar eventos por carimbo de data/hora
de confirmação em chaves primárias.
Tópicos de metadados
O conector Kafka, assim como o framework do Kafka Connect, cria vários tópicos de metadados para armazenar informações relacionadas ao conector. Não é recomendável modificar a configuração ou o conteúdo desses tópicos de metadados.
Confira os tópicos de metadados a seguir:
_consumer_offsets
: um tópico criado automaticamente pelo Kafka. Armazena os deslocamentos de consumidores para consumidores criados no conector do Kafka._kafka-connect-offsets
: um tópico criado automaticamente pelo Kafka Connect. Armazena os deslocamentos do conector._sync_topic_spanner_connector_connectorname
: um tópico criado automaticamente pelo conector. Armazena metadados sobre partições de fluxo de alterações._rebalancing_topic_spanner_connector_connectorname
: um tópico criado automaticamente pelo conector. Usado para determinar a atividade da tarefa do conector._debezium-heartbeat.connectorname
: um tópico usado para processar os batimentos cardíacos do fluxo de alterações do Spanner.
Ambiente de execução do conector Kafka
O ambiente de execução do conector Kafka é descrito a seguir.
Escalonabilidade
O conector do Kafka é escalonável horizontalmente e é executado em uma ou mais tarefas distribuídas entre vários workers do Kafka Connect.
Garantias de entrega de mensagem
O conector Kafka oferece suporte à garantia de entrega pelo menos uma vez.
Tolerância a falhas
O conector do Kafka tolera falhas. À medida que o conector do Kafka lê mudanças e produz eventos, ele registra o carimbo de data/hora da última confirmação processada para cada partição do fluxo de mudanças. Se o conector do Kafka for interrompido por qualquer motivo (incluindo falhas de comunicação, problemas de rede ou falhas de software), ao reiniciar, o conector do Kafka vai continuar transmitindo os registros de onde parou.
O conector do Kafka lê o esquema de informações no carimbo de data/hora de início do conector para extrair informações do esquema. Por padrão, o Spanner não pode ler o esquema de informações em carimbos de data/hora de leitura antes do período de retenção da versão, que é definido como uma hora. Se você quiser iniciar o conector antes de uma hora, aumente o período de armazenamento da versão do banco de dados.
Configurar o conector do Kafka
Criar um stream de alterações
Para saber como criar um fluxo de alterações, consulte Criar um fluxo de alterações. Para continuar com as próximas etapas, é necessária uma instância do Spanner com um fluxo de alterações configurado.
Se você quiser que as colunas alteradas e inalteradas sejam retornadas em cada
evento de mudança de dados, use o tipo de captura de valor NEW_ROW
. Para mais informações, consulte Tipo de captura de valor.
Instalar o JAR do conector do Kafka
Com o Zookeeper, o Kafka e o Kafka Connect instalados, as tarefas restantes para implantar um conector do Kafka são fazer o download do arquivo de plug-in do conector, extrair os arquivos JAR para o ambiente do Kafka Connect e adicionar o diretório com os arquivos JAR ao plugin.path
do Kafka Connect.
Em seguida, reinicie o processo do Kafka Connect para selecionar os novos arquivos JAR.
Se você estiver trabalhando com contêineres imutáveis, poderá extrair imagens das imagens de contêiner do Debezium para Zookeeper, Kafka e Kafka Connect. A imagem do Kafka Connect tem o conector Spanner pré-instalado.
Para mais informações sobre como instalar JARs de conector do Kafka com base no Debezium, consulte Como instalar o Debezium.
Configurar o conector do Kafka
Confira a seguir um exemplo de configuração de um conector do Kafka que se conecta a um fluxo de alterações chamado changeStreamAll
no banco de dados users
na instância test-instance
e no projeto test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
Essa configuração contém o seguinte:
O nome do conector quando registrado com um serviço do Kafka Connect.
O nome dessa classe de conector do Spanner.
O ID do projeto.
O ID da instância do Spanner.
O ID do banco de dados do Spanner.
O nome do fluxo de alterações.
O objeto JSON da chave da conta de serviço.
(Opcional) O papel do banco de dados do Spanner a ser usado.
O número máximo de tarefas.
Para uma lista completa de propriedades do conector, consulte Propriedades de configuração do conector Kafka.
Adicionar a configuração do conector ao Kafka Connect
Para começar a executar um conector do Spanner:
Crie uma configuração para o conector do Spanner.
Use a API REST do Kafka Connect para adicionar essa configuração de conector ao cluster do Kafka Connect.
É possível enviar essa configuração com um comando POST
para um serviço do Kafka Connect em execução. Por padrão, o serviço do Kafka Connect é executado na porta 8083
.
O serviço registra a configuração e inicia a tarefa do conector
que se conecta ao banco de dados do Spanner e transmite registros de eventos de alteração para
tópicos do Kafka.
Confira a seguir um exemplo de comando POST
:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Exemplo de resposta bem-sucedida:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Atualizar a configuração do conector Kafka
Para atualizar a configuração do conector, envie um comando PUT
para o serviço
Kafka Connect em execução com o mesmo nome.
Suponha que tenhamos um conector em execução com a configuração da
seção anterior. Confira a seguir um exemplo de comando PUT
:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Exemplo de resposta bem-sucedida:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Interromper o conector Kafka
Para interromper o conector, envie um comando DELETE
para o serviço
Kafka Connect em execução com o mesmo nome.
Suponha que tenhamos um conector em execução com a configuração da
seção anterior. Confira a seguir um exemplo de comando DELETE
:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Exemplo de resposta bem-sucedida:
HTTP/1.1 204 No Content
Monitorar o conector Kafka
Além das métricas padrão do Kafka Connect e do Debezium, o conector do Kafka exporta as próprias métricas:
MilliSecondsLowWatermark
: o limite mínimo atual da tarefa do conector em milissegundos. A marca d'água baixa descreve o tempo T em que o conector tem garantia de ter transmitido todos os eventos com carimbo de data/hora < T.MilliSecondsLowWatermarkLag
: o atraso da marca d'água baixa em relação ao tempo atual em milissegundos. Todos os eventos com carimbo de data/hora < T foram transmitidosLatencyLowWatermark<Variant>MilliSeconds
: o atraso da marca d'água baixa em relação ao tempo atual em milissegundos. As variantes P50, P95, P99, Média, Mínimo e Máximo são fornecidas.LatencySpanner<Variant>MilliSeconds
: a latência de leitura de carimbo de data/hora de confirmação do Spanner para o conector. As variantes P50, P95, P99, Média, Mínima e Máxima são fornecidas.LatencyReadToEmit<Variant>MilliSeconds
: a latência do carimbo de data/hora de leitura do Spanner para emissão do conector. As variantes P50, P95, P99, Média, Mínimo e Máximo são fornecidas.LatencyCommitToEmit<Variant>tMilliSeconds
: a latência de emissão de carimbo de data/hora de confirmação do Spanner para o conector. As variantes P50, P95, P99, Média, Mínimo e Máximo são fornecidas.LatencyCommitToPublish<Variant>MilliSeconds
: a latência do carimbo de data/hora de confirmação do Spanner para o carimbo de data/hora de publicação do Kafka. As variantes P50, P95, P99, Média, Mínima e Máxima são fornecidas.NumberOfChangeStreamPartitionsDetected
: o número total de partições detectadas pela tarefa de conector atual.NumberOfChangeStreamQueriesIssued
: o número total de consultas de fluxo de alterações emitido pela tarefa atual.NumberOfActiveChangeStreamQueries
: o número ativo de consultas de fluxo de alterações detectadas pela tarefa atual do conector.SpannerEventQueueCapacity
: a capacidade total deStreamEventQueue
, uma fila que armazena elementos recebidos de consultas de fluxo de mudanças.SpannerEventQueueCapacity
: a capacidade restante deStreamEventQueue
.TaskStateChangeEventQueueCapacity
: a capacidade total deTaskStateChangeEventQueue
, uma fila que armazena eventos que ocorrem no conector.RemainingTaskStateChangeEventQueueCapacity
: a capacidade restante deTaskStateChangeEventQueue
.NumberOfActiveChangeStreamQueries
: o número ativo de consultas de fluxo de alterações detectadas pela tarefa atual do conector.
Propriedades de configuração do conector do Kafka
As propriedades de configuração obrigatórias para o conector são:
name
: nome exclusivo do conector. Tentar fazer o registro novamente com o mesmo nome causa falha. Essa propriedade é obrigatória para todos os conectores do Kafka Connect.connector.class
: o nome da classe Java do conector. Sempre use um valor deio.debezium.connector.spanner.SpannerConnector
para o conector Kafka.tasks.max
: o número máximo de tarefas que precisam ser criadas para esse conector.gcp.spanner.project.id
: o ID do projetogcp.spanner.instance.id
: o ID da instância do Spannergcp.spanner.database.id
: o ID do banco de dados do Spannergcp.spanner.change.stream
: o nome do fluxo de alterações do Spannergcp.spanner.credentials.json
: o objeto JSON da chave da conta de serviço.gcp.spanner.credentials.path
: o caminho do arquivo para o objeto JSON da chave da conta de serviço. Obrigatório se o campo acima não for fornecido.gcp.spanner.database.role
: o papel do banco de dados do Spanner a ser usado. Isso é necessário apenas quando o fluxo de mudanças é protegido com controle de acesso detalhado. A função de banco de dados precisa ter o privilégioSELECT
no fluxo de alterações e o privilégioEXECUTE
na função de leitura do fluxo de alterações. Para mais informações, consulte Controle de acesso detalhado para fluxos de alteração.
As propriedades de configuração avançadas a seguir têm padrões que funcionam na maioria das situações e, portanto, raramente precisam ser especificados na configuração do conector:
gcp.spanner.low-watermark.enabled
: indica se a marca d'água baixa está ativada para o conector. O padrão é "false".gcp.spanner.low-watermark.update-period.ms
: o intervalo em que a marca d'água baixa é atualizada. O padrão é 1.000 ms.heartbeat.interval.ms
: o intervalo de batimentos do Spanner. O padrão é 300000 (cinco minutos).gcp.spanner.start.time
: o horário de início do conector. O padrão é o horário atual.gcp.spanner.end.time
: o horário de término do conector. O padrão é infinito.tables.exclude.list
: as tabelas para excluir eventos de mudança. O padrão é vazio.tables.include.list
: as tabelas para incluir eventos de mudança. Se não for preenchido, todas as tabelas serão incluídas. O padrão é vazio.gcp.spanner.stream.event.queue.capacity
: a capacidade da fila de eventos do Spanner. O padrão é 10000.connector.spanner.task.state.change.event.queue.capacity
: a capacidade da fila de eventos de mudança de estado da tarefa. O valor padrão é 1000.connector.spanner.max.missed.heartbeats
: o número máximo de batimentos cardíacos perdidos para uma consulta de fluxo de mudanças antes que uma exceção seja gerada. O valor padrão é 10.scaler.monitor.enabled
: indica se o escalonamento automático de tarefas está ativado. O padrão é "false".tasks.desired.partitions
: o número preferencial de partições de fluxo de alterações por tarefa. Esse parâmetro é necessário para o escalonamento automático de tarefas. O padrão é 2.tasks.min
: o número mínimo de tarefas. Esse parâmetro é necessário para o escalonamento automático de tarefas. O padrão é 1.connector.spanner.sync.topic
: o nome do tópico de sincronização, um tópico de conector interno usado para armazenar a comunicação entre tarefas. O padrão é_sync_topic_spanner_connector_connectorname
se o usuário não fornecer um nome.connector.spanner.sync.poll.duration
: a duração da pesquisa do tópico de sincronização. O padrão é 500 ms.connector.spanner.sync.request.timeout.ms
: o tempo limite para solicitações ao tópico de sincronização. O padrão é 5.000 ms.connector.spanner.sync.delivery.timeout.ms
: o tempo limite para publicação no tópico de sincronização. O padrão é 15.000 ms.connector.spanner.sync.commit.offsets.interval.ms
: o intervalo em que os deslocamentos são confirmados para o tema de sincronização. O padrão é 60.000 ms.connector.spanner.sync.publisher.wait.timeout
: o intervalo em que as mensagens são publicadas no tópico de sincronização. O padrão é 5 ms.connector.spanner.rebalancing.topic
: o nome do tópico de reequilíbrio. O tópico de reequilíbrio é um tópico de conector interno usado para determinar a atividade da tarefa. O padrão é_rebalancing_topic_spanner_connector_connectorname
se o usuário não fornecer um nome.connector.spanner.rebalancing.poll.duration
: a duração da pesquisa para o tópico de reequilíbrio. O padrão é 5.000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: o tempo limite para confirmar os ajustes do tópico de reequilíbrio. O padrão é 5.000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: o intervalo em que os deslocamentos são confirmados para o tema de sincronização. O padrão é 60.000 ms.connector.spanner.rebalancing.task.waiting.timeout
: a duração do tempo que uma tarefa aguarda antes de processar um evento de reequilíbrio. O padrão é 1000 ms.
Para uma lista ainda mais detalhada das propriedades configuráveis do conector, consulte o repositório do GitHub.
Limitações
O conector não é compatível com eventos de snapshot por streaming.
Se a marca d'água estiver ativada no conector, não será possível configurar transformações de roteamento de tópicos do Debezium.