Criar conexões de fluxo de alterações com o Kafka

Nesta página, explicamos como usar o conector Kafka para consumir e encaminhar dados dos fluxos de alterações do Spanner.

Principais conceitos

Veja a seguir os principais conceitos do conector Kafka.

Debezium

O Debezium é um projeto de código aberto que fornece uma plataforma de streaming de dados de baixa latência para captura de dados alterados.

Conector de Kafka

O conector do Kafka fornece uma abstração sobre a API Spanner para publicar fluxo de alterações do Spanner no Kafka. Com ele, você não precisa gerenciar o ciclo de vida da partição de fluxo de alterações, o que é necessário ao usar a API Spanner diretamente.

O conector do Kafka produz um evento de alteração para cada modificação de registro de alteração de dados e envia registros de eventos de alteração downstream para um tópico separado do Kafka para cada tabela acompanhada de fluxo de alterações. Uma modificação de registro de mudança de dados representa uma única modificação (inserir, atualizar ou excluir) que foi capturada. Um único registro de mudança de dados pode conter mais de uma modificação.

Saída do conector do Kafka

O conector do Kafka encaminha os registros de fluxo de alterações diretamente para um tópico separado do Kafka. O nome do tópico de saída deve ser connector_name.table_name. Se o tópico não existir, o conector do Kafka criará automaticamente um tópico com esse nome.

Também é possível configurar transformações de roteamento de tópicos para redirecionar os registros a tópicos especificados. Se você quiser usar o roteamento de tópicos, desative a funcionalidade de baixa marca-d'água.

Ordenação de registros

Os registros são ordenados por carimbo de data/hora de confirmação por chave primária nos tópicos do Kafka. Os registros pertencentes a diferentes chaves primárias não têm garantias de ordenação. Os registros com a mesma chave primária são armazenados na mesma partição de tópicos do Kafka. Se você quiser processar transações inteiras, também poderá usar os campos server_transaction_id e number_of_records_in_transaction do registro de alteração de dados para montar uma transação do Spanner.

Eventos de alteração

O conector do Kafka gera um evento de alteração de dados para cada operação INSERT, UPDATE e DELETE. Cada evento contém uma chave e valores para a linha alterada.

Você pode usar os conversores do Kafka Connect para produzir eventos de mudança de dados nos formatos Protobuf, AVRO, JSON ou JSON Schemaless. Se você usar um conversor do Kafka Connect que produz esquemas, o evento conterá esquemas separados para a chave e os valores. Caso contrário, o evento conterá apenas a chave e os valores.

O esquema da chave nunca muda. O esquema dos valores é uma fusão de todas as colunas que o fluxo de alterações rastreou desde o horário de início do conector.

Se você configurar o conector para produzir eventos JSON, o evento de alteração de saída conterá cinco campos:

  • O primeiro campo schema especifica um esquema do Kafka Connect que descreve o esquema de chave do Spanner.

  • O primeiro campo payload tem a estrutura descrita pelo campo schema anterior e contém a chave da linha que foi modificada.

  • O segundo campo schema especifica o esquema do Kafka Connect que descreve o esquema da linha alterada.

  • O segundo campo payload tem a estrutura descrita pelo campo schema anterior e contém os dados reais da linha que foi modificada.

  • O campo source é obrigatório e descreve os metadados de origem do evento.

Veja a seguir um exemplo de evento de alteração de dados:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": {
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": {
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c",
  "ts_ms": 1559033904863 //
}

Marca-d'água baixa

A marca-d'água baixa descreve o tempo T em que o conector do Kafka certamente terá transmitido e publicado em um tópico do Kafka todos os eventos com carimbo de data/hora < T.

É possível ativar a marca d'água baixa no conector do Kafka usando o parâmetro gcp.spanner.low-watermark.enabled. Esse parâmetro está desativado por padrão. Se a marca-d'água baixa estiver ativada, o campo low_watermark no registro de mudança de dados do fluxo de alterações será preenchido com o carimbo de data/hora de baixa marca d'água atual do conector do Kafka.

Se nenhum registro estiver sendo produzido, o conector do Kafka enviará "pulsões" periódicas da marca-d'água para os tópicos de saída do Kafka detectados pelo conector.

Esses sinais de funcionamento são registros vazios, exceto no campo low_watermark. Você pode usar a marca-d'água baixa para fazer agregações baseadas em tempo. Por exemplo, é possível usar a marca d'água baixa para ordenar eventos por carimbo de data/hora de confirmação nas chaves primárias.

Tópicos de metadados

O conector do Kafka, bem como o framework do Kafka Connect, criam vários tópicos de metadados para armazenar informações relacionadas ao conector. Não é aconselhável modificar a configuração nem o conteúdo desses tópicos de metadados.

Estes são os tópicos de metadados:

  • _consumer_offsets: um tópico criado automaticamente pelo Kafka. Armazena os deslocamentos dos consumidores criados no conector do Kafka.
  • _kafka-connect-offsets: um tópico criado automaticamente pelo Kafka Connect. Armazena os deslocamentos do conector.
  • _sync_topic_spanner_connector_connectorname: um tópico criado automaticamente pelo conector. Armazena metadados sobre as partições de fluxo de alterações.
  • _rebalancing_topic_spanner_connector_connectorname: um tópico criado automaticamente pelo conector. Usado para determinar a atividade da tarefa do conector.
  • _debezium-heartbeat.connectorname: um tópico usado para processar os sinais de funcionamento do fluxo de alterações do Spanner.

Ambiente de execução do conector do Kafka

Veja a seguir a descrição do tempo de execução do conector do Kafka.

Escalonabilidade

O conector do Kafka é escalonável horizontalmente e executado em uma ou mais tarefas distribuídas entre vários workers do Kafka Connect.

Garantias de entrega de mensagem

O conector de Kafka aceita a garantia de entrega pelo menos uma vez.

Tolerância a falhas

O conector de Kafka é tolerante a falhas. À medida que o conector do Kafka lê as alterações e produz eventos, ele registra o último carimbo de data/hora de confirmação processado para cada partição do fluxo de alterações. Se o conector do Kafka parar por qualquer motivo (incluindo falhas de comunicação, problemas de rede ou falhas de software), após a reinicialização, o conector do Kafka continuará fazendo streaming de registros de onde parou.

O conector do Kafka lê o esquema de informações no carimbo de data/hora de início do conector do Kafka para recuperar informações do esquema. Por padrão, o Spanner não pode ler o esquema de informações em carimbos de data/hora de leitura antes do período de armazenamento da versão, que tem como padrão uma hora. Se você quiser iniciar o conector há menos de uma hora, aumente o período de armazenamento da versão do banco de dados.

Configurar o conector do Kafka

Criar um stream de alterações

Para detalhes sobre como criar um fluxo de alterações, consulte Criar um fluxo de alterações. Para continuar nas próximas etapas, é necessário ter uma instância do Spanner com um fluxo de alterações configurado.

Se você quiser que as colunas alteradas e inalteradas sejam retornadas em cada evento de alteração de dados, use o tipo de captura de valor NEW_ROW. Para mais informações, consulte o tipo de captura de valor.

Instalar o JAR do conector do Kafka

Com o Zookeeper, o Kafka e o Kafka Connect instalados, as tarefas restantes para implantar um conector do Kafka são fazer o download do arquivo do plug-in do conector (links em inglês), extrair os arquivos JAR para o ambiente do Kafka Connect e adicionar o diretório com os arquivos JAR ao plugin.path do Kafka Connect. Em seguida, é necessário reiniciar o processo do Kafka Connect para coletar os novos arquivos JAR.

Se você está trabalhando com contêineres imutáveis, extraia imagens das imagens de contêiner do Debezium para Zookeeper, Kafka e Kafka Connect. A imagem do Kafka Connect tem o conector do Spanner pré-instalado.

Para mais informações sobre como instalar JARs do conector do Kafka baseados no Debezium, consulte Como instalar o Debezium.

Configurar o conector do Kafka

Veja a seguir um exemplo de configuração de um conector do Kafka que se conecta a um fluxo de alterações chamado changeStreamAll no banco de dados users na instância test-instance e no projeto test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Essa configuração contém o seguinte:

  • O nome do conector quando registrado com um serviço do Kafka Connect.

  • O nome desta classe do conector do Spanner.

  • O ID do projeto.

  • O ID da instância do Spanner.

  • O ID do banco de dados do Spanner.

  • O nome do fluxo de alterações.

  • O objeto JSON da chave da conta de serviço.

  • (Opcional) O papel do banco de dados do Spanner a ser usado.

  • O número máximo de tarefas.

Para uma lista completa das propriedades do conector, consulte Propriedades de configuração do conector do Kafka.

Adicionar a configuração do conector ao Kafka Connect

Para começar a executar um conector do Spanner:

  1. Criar uma configuração para o conector do Spanner.

  2. Use a API REST do Kafka Connect (em inglês) para adicionar essa configuração de conector ao cluster do Kafka Connect.

É possível enviar essa configuração com um comando POST para um serviço do Kafka Connect em execução. Por padrão, o serviço do Kafka Connect é executado na porta 8083. O serviço registra a configuração e inicia a tarefa do conector que se conecta ao banco de dados do Spanner e transmite registros de eventos de alteração para tópicos do Kafka.

Veja a seguir um exemplo de comando POST:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Exemplo de resposta bem-sucedida:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Atualizar a configuração do conector do Kafka

Para atualizar a configuração do conector, envie um comando PUT para o serviço do Kafka Connect em execução com o mesmo nome de conector.

Vamos supor que temos um conector em execução com a configuração da seção anterior. Veja a seguir um exemplo de comando PUT:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Exemplo de resposta bem-sucedida:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Parar o conector do Kafka

Para interromper o conector, envie um comando DELETE ao serviço do Kafka Connect em execução com o mesmo nome de conector.

Vamos supor que temos um conector em execução com a configuração da seção anterior. Veja a seguir um exemplo de comando DELETE:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Exemplo de resposta bem-sucedida:

HTTP/1.1 204 No Content

Monitorar o conector do Kafka

Além das métricas padrão do Kafka Connect e do Debezium, o conector do Kafka exporta as próprias métricas:

  • MilliSecondsLowWatermark: a marca-d'água baixa atual da tarefa do conector em milissegundos. A marca-d'água baixa descreve o tempo T em que o conector tem a garantia de ter transmitido todos os eventos com carimbo de data/hora < T.

  • MilliSecondsLowWatermarkLag: o atraso da marca-d'água baixa em milissegundos em milissegundos. transmitidos todos os eventos com carimbo de data/hora < T.

  • LatencyLowWatermark<Variant>MilliSeconds: o atraso da marca-d'água baixa em relação ao tempo atual, em milissegundos. São fornecidas as variantes P50, P95, P99, média, mínima e máxima.

  • LatencySpanner<Variant>MilliSeconds: a latência do carimbo de data/hora de confirmação do Spanner para a leitura do conector. São fornecidas as variantes P50, P95, P99, média, mínima e máxima.

  • LatencyReadToEmit<Variant>MilliSeconds: a latência do carimbo de data/hora de leitura do Spanner para a emissão do conector. São fornecidas as variantes P50, P95, P99, média, mínima e máxima.

  • LatencyCommitToEmit<Variant>tMilliSeconds: a latência do carimbo de data/hora de confirmação do Spanner para emitir o conector. São fornecidas as variantes P50, P95, P99, média, mínima e máxima.

  • LatencyCommitToPublish<Variant>MilliSeconds: a latência do carimbo de data/hora de confirmação do Spanner para o carimbo de data/hora de publicação do Kafka. São fornecidas as variantes P50, P95, P99, média, mínima e máxima.

  • NumberOfChangeStreamPartitionsDetected: o número total de partições detectadas pela tarefa do conector atual.

  • NumberOfChangeStreamQueriesIssued: o número total de consultas de fluxo de alterações emitidas pela tarefa atual.

  • NumberOfActiveChangeStreamQueries: o número ativo de consultas de fluxo de alterações detectadas pela tarefa atual do conector.

  • SpannerEventQueueCapacity: a capacidade total de StreamEventQueue, uma fila que armazena elementos recebidos de consultas de fluxo de alterações.

  • SpannerEventQueueCapacity: a capacidade restante de StreamEventQueue.

  • TaskStateChangeEventQueueCapacity: a capacidade total de TaskStateChangeEventQueue, uma fila que armazena eventos que ocorrem no conector.

  • RemainingTaskStateChangeEventQueueCapacity: a capacidade restante de TaskStateChangeEventQueue.

  • NumberOfActiveChangeStreamQueries: o número ativo de consultas de fluxo de alterações detectadas pela tarefa atual do conector.

Propriedades de configuração do conector do Kafka

Estas são as propriedades de configuração obrigatórias para o conector:

  • name: nome exclusivo do conector. Tentar se registrar novamente com o mesmo nome causa falha. Esta propriedade é exigida por todos os conectores do Kafka Connect.

  • connector.class: o nome da classe Java do conector. Sempre use o valor io.debezium.connector.spanner.SpannerConnector para o conector do Kafka.

  • tasks.max: o número máximo de tarefas que devem ser criadas para esse conector.

  • gcp.spanner.project.id: o ID do projeto

  • gcp.spanner.instance.id: o ID da instância do Spanner

  • gcp.spanner.database.id: o ID do banco de dados do Spanner

  • gcp.spanner.change.stream: o nome do fluxo de alterações do Spanner

  • gcp.spanner.credentials.json: o objeto JSON da chave da conta de serviço.

  • gcp.spanner.credentials.path: o caminho do arquivo para o objeto JSON da chave da conta de serviço. Obrigatório se o campo acima não for fornecido.

  • gcp.spanner.database.role : o papel do banco de dados do Spanner a ser usado. Isso só é necessário quando o fluxo de alterações está protegido com controle de acesso detalhado. O papel de banco de dados precisa ter o privilégio SELECT no fluxo de alterações e o privilégio EXECUTE na função de leitura do fluxo. Para mais informações, consulte Controle de acesso refinado para fluxos de mudança.

As propriedades de configuração avançada a seguir têm padrões que funcionam na maioria das situações e, portanto, raramente precisam ser especificadas na configuração do conector:

  • gcp.spanner.low-watermark.enabled: indica se a marca-d'água baixa está ativada para o conector. O padrão é "false".

  • gcp.spanner.low-watermark.update-period.ms: o intervalo em que a marca-d'água baixa é atualizada. O padrão é 1.000 ms.

  • heartbeat.interval.ms: o intervalo do sinal de funcionamento do Spanner. O padrão é 300000 (cinco minutos).

  • gcp.spanner.start.time: o horário de início do conector. O padrão é a hora atual.

  • gcp.spanner.end.time: o horário de término do conector. O padrão é infinito.

  • tables.exclude.list: as tabelas em que os eventos de mudança serão excluídos. O padrão é vazio.

  • tables.include.list: as tabelas em que os eventos de alteração serão incluídos. Se não for preenchida, todas as tabelas serão incluídas. O padrão é vazio.

  • gcp.spanner.stream.event.queue.capacity: a capacidade da fila de eventos do Spanner. O padrão é 10000.

  • connector.spanner.task.state.change.event.queue.capacity: a capacidade da fila de eventos de alteração de estado da tarefa. O valor padrão é 1000.

  • connector.spanner.max.missed.heartbeats: o número máximo de batimentos de funcionamento perdidos para uma consulta de fluxo de alterações antes que uma exceção seja gerada. O valor padrão é 10.

  • scaler.monitor.enabled: indica se o escalonamento automático de tarefas está ativado. O padrão é "false".

  • tasks.desired.partitions: o número preferido de partições de fluxo de alterações por tarefa. Esse parâmetro é necessário para o escalonamento automático de tarefas. O padrão é 2.

  • tasks.min: o número mínimo de tarefas. Esse parâmetro é necessário para o escalonamento automático de tarefas. O padrão é 1.

  • connector.spanner.sync.topic: o nome do tópico de sincronização, um tópico de conector interno usado para armazenar a comunicação entre tarefas. O padrão será _sync_topic_spanner_connector_connectorname se o usuário não tiver fornecido um nome.

  • connector.spanner.sync.poll.duration: a duração da enquete sobre o tema de sincronização. O padrão é 500 ms.

  • connector.spanner.sync.request.timeout.ms: o tempo limite das solicitações para o tópico de sincronização. O padrão é 5.000 ms.

  • connector.spanner.sync.delivery.timeout.ms: o tempo limite para publicação no tópico de sincronização. O padrão é 15.000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: o intervalo em que os deslocamentos são confirmados no tópico de sincronização. O padrão é 60.000 ms.

  • connector.spanner.sync.publisher.wait.timeout: o intervalo em que as mensagens são publicadas no tópico de sincronização. O padrão é 5 ms.

  • connector.spanner.rebalancing.topic: o nome do tópico de rebalanceamento. O tópico de rebalanceamento é um tópico de conector interno usado para determinar a atividade da tarefa. O padrão será _rebalancing_topic_spanner_connector_connectorname se o usuário não tiver fornecido um nome.

  • connector.spanner.rebalancing.poll.duration: a duração da enquete para o tópico de rebalanceamento. O padrão é 5.000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: o tempo limite para confirmar deslocamentos do tópico de rebalanceamento. O padrão é 5.000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: o intervalo em que os deslocamentos são confirmados no tópico de sincronização. O padrão é 60.000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: o tempo que uma tarefa aguarda antes de processar um evento de rebalanceamento. O padrão é 1.000 ms.

Para obter uma lista ainda mais detalhada das propriedades configuráveis do conector, consulte o repositório do GitHub.

Limitações