Esta página explica como usar o conector Kafka para consumir e encaminhar os dados dos fluxos de alterações do Spanner.
Principais conceitos
Veja a seguir os principais conceitos do conector Kafka.
Debezium
O Debezium é um projeto de código aberto que fornece uma plataforma de streaming de dados de baixa latência para captura de dados alterados.
Conector Kafka
O conector Kafka fornece uma abstração sobre a API Spanner para publicar fluxo de alterações do Spanner no Kafka. Com esse conector, você não precisa gerenciar o ciclo de vida da partição dos fluxo de alterações, o que é necessário quando você usa a API Spanner diretamente.
O conector Kafka produz um evento de alteração para cada modificação de registro de alteração de dados e envia registros de eventos de alteração downstream em um tópico do Kafka separado para cada tabela rastreada pelo fluxo de alterações. Uma modificação de registro de alteração de dados representa uma única modificação (inserir, atualizar ou excluir) que foi capturada. Um único registro de mudança de dados pode conter mais de uma modificação.
Saída do conector do Kafka
O conector Kafka encaminha os registros de fluxo de alterações diretamente
em um tópico Kafka separado. O nome do tópico de saída precisa ser connector_name
.table_name
.
Se o tópico não existir, o conector do Kafka criará automaticamente um tópico com esse nome.
Também é possível configurar transformações de roteamento de tópicos para redirecionar registros em tópicos especificados por você. Se você quiser usar o roteamento por tópicos, desative a funcionalidade de marca d'água baixa.
Ordenação de registros
Os registros são ordenados por carimbo de data/hora de confirmação por chave primária em
tópicos do Kafka. Os registros pertencentes a diferentes chaves primárias não têm
de ordenação. Registros com a mesma chave primária são armazenados
na mesma partição de tópicos do Kafka. Se quiser processar transações inteiras, você poderá
use também o registro de alteração de dados
os campos server_transaction_id
e number_of_records_in_transaction
para
e montar uma transação do Spanner.
Alterar eventos
O conector Kafka gera um evento de alteração de dados para cada INSERT
, UPDATE
,
e operação DELETE
. Cada evento contém uma chave e valores para a linha alterada.
Você pode usar os conversores do Kafka Connect (em inglês) para produzir eventos de alteração de dados em
Formatos Protobuf
, AVRO
, JSON
ou JSON Schemaless
. Se você usa um
conversor do Kafka Connect que produz esquemas, o evento contém
esquemas separados para a chave e os valores. Caso contrário, o evento terá apenas
a chave e os valores.
O esquema da chave nunca muda. O esquema dos valores é um de todas as colunas que o fluxo de alterações rastreou desde o início ao horário de início do conector.
Se você configurar o conector para produzir eventos JSON, o evento de alteração de saída contém cinco campos:
O primeiro campo
schema
especifica um esquema do Kafka Connect que descreve o esquema de chave do Spanner.O primeiro campo
payload
tem a estrutura descrita pelo camposchema
anterior e contém a chave da linha que foi alterada.O segundo campo
schema
especifica o esquema do Kafka Connect que descreve o esquema da linha alterada.O segundo campo
payload
tem a estrutura descrita pelo camposchema
anterior e contém os dados reais da linha que foi alterada.O campo
source
é obrigatório e descreve os metadados de origem do evento.
Veja a seguir um exemplo de um evento de alteração de dados:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Marca-d'água baixa
A marca-d'água baixa descreve o momento T em que o conector do Kafka tem a garantia de transmitir e publicar em um tópico Kafka todos os eventos com carimbo de data/hora < T)
É possível ativar a marca-d'água baixa no conector do Kafka usando
parâmetro gcp.spanner.low-watermark.enabled
. Esse parâmetro está desativado
por padrão. Se a marca-d'água baixa estiver ativada, o campo low_watermark
nos dados do fluxo de alterações
o registro de alteração é preenchido com a marca-d'água baixa atual do conector do Kafka
carimbo de data/hora.
Se não houver registros sendo produzidos, o conector Kafka envia marca-d'água "batimentos" aos tópicos de saída do Kafka detectados pelo conector.
Esses sinais de funcionamento de marca d'água são registros vazios, exceto o
low_watermark
. Você pode usar a marca-d'água baixa para realizar agregações com base no tempo.
Por exemplo, é possível usar a marca-d'água baixa para ordenar eventos por confirmação
carimbo de data/hora nas chaves primárias.
Tópicos de metadados
O conector Kafka e o framework do Kafka Connect criam vários tópicos de metadados para armazenar informações relacionadas ao conector. Não é aconselhável modificar a configuração ou o conteúdo desses tópicos de metadados.
Estes são os tópicos de metadados:
_consumer_offsets
: um tópico criado automaticamente pelo Kafka. Armazena as compensações dos consumidores criados no conector Kafka._kafka-connect-offsets
: um tópico criado automaticamente pelo Kafka Connect. Armazena os deslocamentos do conector._sync_topic_spanner_connector_connectorname
: um tópico criado automaticamente pelo conector. Armazena metadados sobre as partições do fluxo de alterações._rebalancing_topic_spanner_connector_connectorname
: um tópico criado automaticamente pelo conector. Usado para determinar a atividade de tarefas do conector._debezium-heartbeat.connectorname
: um tópico usado para processar os sinais de funcionamento do fluxo de alterações do Spanner.
Ambiente de execução do conector do Kafka
Veja a seguir a descrição do ambiente de execução do conector do Kafka.
Escalonabilidade
O conector Kafka é escalonável horizontalmente e é executado em uma ou mais tarefas distribuídas entre vários workers do Kafka Connect.
Garantias de entrega de mensagem
O conector Kafka oferece suporte a uma garantia de entrega pelo menos uma vez.
Tolerância a falhas
O conector Kafka é tolerante a falhas. Quando o conector Kafka lê as alterações e produz eventos, ele registra o último carimbo de data/hora de commit processado para cada alteração partição de stream. Se o conector Kafka for interrompido por qualquer motivo (incluindo falhas de comunicação, problemas de rede ou falhas de software), após a reinicialização o conector Kafka continua transmitindo registros de onde parou.
O conector Kafka lê o esquema de informações no início do conector do Kafka para recuperar informações do esquema. Por padrão, o Spanner não pode leia o esquema de informações em carimbos de data/hora de leitura antes do período de armazenamento de versões que tem como padrão uma hora. Se quiser iniciar o conector antes de há uma hora, é preciso aumentar a retenção da versão do banco de dados período
Configurar o conector Kafka
Criar um stream de alterações
Para detalhes sobre como criar um fluxo de alterações, consulte Criar um fluxo de alterações. Para continuar com as próximas etapas, é necessário ter uma instância do Spanner com um fluxo de alterações configurado.
Se você quiser que as colunas alteradas e inalteradas sejam retornadas em cada
evento de alteração de dados, use o tipo de captura de valor NEW_ROW
. Para mais informações, consulte o tipo de captura de valor.
Instale o JAR do conector do Kafka
Com o Zookeeper, o Kafka e o Kafka Connect instalados, as tarefas restantes para implantar um conector Kafka são fazer o download
arquivo de plug-in do conector, extraia os arquivos JAR no ambiente do Kafka Connect e adicione a
com os arquivos JAR no plugin.path
do Kafka Connect.
Em seguida, reinicie o processo do Kafka Connect para coletar os novos arquivos JAR.
Se você estiver trabalhando com contêineres imutáveis, poderá extrair imagens de Imagens de contêiner do Debezium para Zookeeper, Kafka e Kafka Connect. A imagem do Kafka Connect tem Conector do Spanner pré-instalado.
Para mais informações sobre como instalar JARs de conectores Kafka baseados em Debezium, consulte Como instalar o Debezium.
Configurar o conector Kafka
Este é um exemplo de configuração de um conector Kafka
que se conecta a um fluxo de mudanças chamado changeStreamAll
no
banco de dados users
na instância test-instance
e no projeto test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
Essa configuração contém o seguinte:
O nome do conector quando registrado em um serviço do Kafka Connect.
O nome desta classe de conector do Spanner.
O ID do projeto.
O ID da instância do Spanner.
O ID do banco de dados do Spanner.
É o nome do fluxo de alterações.
O objeto JSON da chave da conta de serviço.
(Opcional) O papel de banco de dados do Spanner a ser usado.
O número máximo de tarefas.
Para uma lista completa de propriedades do conector, consulte as propriedades de configuração do conector do Kafka.
Adicionar a configuração do conector ao Kafka Connect
Para começar a executar um conector do Spanner:
Criar uma configuração para o conector do Spanner.
Use a API REST do Kafka Connect para adicionar isso. do conector ao cluster do Kafka Connect.
É possível enviar essa configuração com um comando POST
para um Kafka Connect em execução
serviço. Por padrão, o serviço Kafka Connect é executado na porta 8083
.
O serviço registra a configuração e inicia a tarefa do conector
que se conecta ao banco de dados do Spanner e transmite registros de eventos de alteração para
tópicos do Kafka.
Veja a seguir um exemplo de comando POST
:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Exemplo de resposta bem-sucedida:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Atualizar a configuração do conector do Kafka
Para atualizar a configuração do conector, envie um comando PUT
para a instância
serviço do Kafka Connect com o mesmo nome de conector.
Suponha que temos um conector em execução com a configuração de
na seção anterior. Veja a seguir um exemplo de comando PUT
:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Exemplo de resposta bem-sucedida:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Como parar o conector Kafka
Para interromper o conector, envie um comando DELETE
para a instância
serviço do Kafka Connect com o mesmo nome de conector.
Suponha que temos um conector em execução com a configuração de
na seção anterior. Veja a seguir um exemplo de comando DELETE
:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Exemplo de resposta bem-sucedida:
HTTP/1.1 204 No Content
Monitorar o conector Kafka
Além das métricas padrão do Kafka Connect e do Debezium (em inglês), o conector Kafka exporta as próprias métricas:
MilliSecondsLowWatermark
: a marca-d'água baixa atual da tarefa do conector, em milissegundos. O marca d'água baixa descreve o tempo T em que o conector tem a garantia de transmitidos por streaming todos os eventos com carimbo de data/hora < EMilliSecondsLowWatermarkLag
: o atraso, em milissegundos, da marca-d'água baixa atrás do horário atual. transmitidos por streaming todos os eventos com carimbo de data/hora < ELatencyLowWatermark<Variant>MilliSeconds
: o atraso da marca-d'água baixa atrás do horário atual em milissegundos. As variantes P50, P95, P99, Média, Mín. e Máx. são fornecidas.LatencySpanner<Variant>MilliSeconds
: a latência do Spanner-commit-timestamp-to-connector-read. São fornecidas variantes P50, P95, P99, Média, Mín. e Máx..LatencyReadToEmit<Variant>MilliSeconds
: a latência do Spanner-read-timestamp-to-connector-emit. As variantes P50, P95, P99, Média, Mín. e Máx. são fornecidas.LatencyCommitToEmit<Variant>tMilliSeconds
: a latência do Spanner-commit-timestamp-to-connector-emit. As variantes P50, P95, P99, Média, Mín. e Máx. são fornecidas.LatencyCommitToPublish<Variant>MilliSeconds
: a latência do Spanner-commit-timestamp-para Kafka-publish-timestamp. São fornecidas variantes P50, P95, P99, Média, Mín. e Máx..NumberOfChangeStreamPartitionsDetected
: o número total de partições. detectado pela tarefa atual do conector.NumberOfChangeStreamQueriesIssued
: o número total de consultas do fluxo de alterações emitidos pela tarefa atual.NumberOfActiveChangeStreamQueries
: o número ativo de fluxo de alterações consultas detectadas pela tarefa atual do conector.SpannerEventQueueCapacity
: a capacidade total deStreamEventQueue
, uma fila que armazena elementos recebidos de consultas de fluxo de alterações.SpannerEventQueueCapacity
: a capacidade restante deStreamEventQueue
.TaskStateChangeEventQueueCapacity
: a capacidade total deTaskStateChangeEventQueue
, uma fila que armazena eventos que acontecem no conector.RemainingTaskStateChangeEventQueueCapacity
: a capacidade restante deTaskStateChangeEventQueue
.NumberOfActiveChangeStreamQueries
: o número ativo de fluxo de alterações consultas detectadas pela tarefa atual do conector.
Propriedades de configuração do conector do Kafka
Veja a seguir as propriedades de configuração obrigatórias do conector:
name
: nome exclusivo do conector. A tentativa de registro novamente com o mesmo nome causa uma falha. Essa propriedade é exigida por todos os conectores do Kafka Connect.connector.class
: o nome da classe Java do conector. Sempre use um valor deio.debezium.connector.spanner.SpannerConnector
para o conector Kafka.tasks.max
: o número máximo de tarefas que precisam ser criadas para esse conector.gcp.spanner.project.id
: o ID do projetogcp.spanner.instance.id
: o ID da instância do Spannergcp.spanner.database.id
: o ID do banco de dados do Spannergcp.spanner.change.stream
: o nome do fluxo de alterações do Spannergcp.spanner.credentials.json
: o objeto JSON da chave da conta de serviço.gcp.spanner.credentials.path
: o caminho do arquivo para o objeto JSON da chave da conta de serviço. Obrigatório se o campo acima não for fornecido.gcp.spanner.database.role
: o papel de banco de dados do Spanner para usar. Isso é necessário somente quando o fluxo de alterações está protegido um controle de acesso refinado. O papel de banco de dados precisa ter o privilégioSELECT
na fluxo de alterações e o privilégioEXECUTE
na leitura do fluxo de alterações função. Para mais informações, consulte Controle de acesso refinado para alterações córregos.
As propriedades de configuração avançada a seguir têm padrões que funcionam na maioria específicas e, portanto, raramente precisam ser especificados na configuração do conector:
gcp.spanner.low-watermark.enabled
: indica se a marca-d'água baixa está ativada para o conector. O padrão é "false".gcp.spanner.low-watermark.update-period.ms
: o intervalo em que a marca-d'água baixa é atualizada. O padrão é 1.000 ms.heartbeat.interval.ms
: o intervalo do sinal de funcionamento do Spanner. O padrão é 300000 (cinco minutos).gcp.spanner.start.time
: o horário de início do conector. O padrão é o horário atual.gcp.spanner.end.time
: o horário de término do conector. O padrão é infinito.tables.exclude.list
: as tabelas para excluir eventos de alteração. O padrão é vazio.tables.include.list
: as tabelas em que serão incluídos eventos de alteração. Se não for preenchido, todas as tabelas serão incluídas. O padrão é vazio.gcp.spanner.stream.event.queue.capacity
: a capacidade da fila de eventos do Spanner. O padrão é 10000.connector.spanner.task.state.change.event.queue.capacity
: a capacidade da fila de eventos de alteração de estado da tarefa. O valor padrão é 1000.connector.spanner.max.missed.heartbeats
: o número máximo de sinais de funcionamento perdidos para uma consulta de fluxo de alterações antes que uma exceção seja gerada. O valor padrão é 10.scaler.monitor.enabled
: indica se o escalonamento automático de tarefas está ativado. O padrão é "false".tasks.desired.partitions
: o número preferencial de fluxo de alterações de partições por tarefa. Esse parâmetro é necessário para o escalonamento automático de tarefas. O padrão é 2.tasks.min
: o número mínimo de tarefas. Esse parâmetro é necessário para o escalonamento automático de tarefas. O padrão é 1.connector.spanner.sync.topic
: o nome do tópico de sincronização, um tópico interno do conector usado para armazenar a comunicação entre tarefas. Se o usuário não fornecer um nome, o padrão será_sync_topic_spanner_connector_connectorname
.connector.spanner.sync.poll.duration
: a duração da enquete sobre o tópico de sincronização. O padrão é 500ms.connector.spanner.sync.request.timeout.ms
: o tempo limite das solicitações para o tópico de sincronização. O padrão é 5.000 ms.connector.spanner.sync.delivery.timeout.ms
: o tempo limite para publicar no tópico de sincronização. O padrão é 15.000 ms.connector.spanner.sync.commit.offsets.interval.ms
: o intervalo em que os deslocamentos são confirmados para o tópico de sincronização. O padrão é 60.000 ms.connector.spanner.sync.publisher.wait.timeout
: o intervalo em que as mensagens são publicadas no tópico de sincronização. O padrão é de 5 ms.connector.spanner.rebalancing.topic
: o nome do tópico de rebalanceamento. O tópico de rebalanceamento é um tópico de conector interno usado para determinar a atividade da tarefa. Se o usuário não fornecer um nome, o padrão será_rebalancing_topic_spanner_connector_connectorname
.connector.spanner.rebalancing.poll.duration
: a duração da enquete para o tópico de rebalanceamento. O padrão é 5.000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: o tempo limite para confirmar deslocamentos para o tópico de rebalanceamento. O padrão é 5.000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: o intervalo em que os deslocamentos são confirmados para o tópico de sincronização. O padrão é 60.000 ms.connector.spanner.rebalancing.task.waiting.timeout
: o tempo que uma tarefa aguarda antes de processar um evento de rebalanceamento. O padrão é 1.000 ms.
Para uma lista ainda mais detalhada das propriedades do conector configuráveis, consulte o repositório do GitHub (link em inglês).
Limitações
O conector não oferece suporte a eventos de snapshot de streaming.
Se a marca d'água estiver ativada no conector, não será possível configurar o tópico do Debezium transformações de roteamento.