Conetor de destino do Bigtable do Kafka Connect


Os conetores de destino são plug-ins para a framework Kafka Connect que pode usar para fazer stream de dados do Kafka diretamente para outros sistemas para armazenamento e processamento. O Kafka Connect Bigtable sink é um conetor dedicado concebido para transmitir dados para o Bigtable em tempo real com a menor latência possível.

Esta página descreve as funcionalidades e as limitações do conector. Também fornece exemplos de utilização para cenários avançados com transformações de mensagens únicas (SMTs) e criação automática de tabelas. Para ver instruções de instalação e documentação de referência completa, consulte o repositório do conetor Kafka Connect Bigtable Sink.

Funcionalidades

O conetor de destino do Bigtable subscreve os seus tópicos do Kafka, lê as mensagens recebidas nestes tópicos e, em seguida, escreve os dados nas tabelas do Bigtable. As secções seguintes oferecem uma vista geral de cada funcionalidade. Para ver detalhes de utilização, consulte a secção Configuração deste documento.

Mapeamento de teclas, SMTs e conversores

Para escrever dados numa tabela do Bigtable, tem de fornecer uma chave de linha, uma família de colunas e um nome de coluna únicos para cada operação. Estas informações são inferidas a partir dos campos nas mensagens Kafka. Pode criar todos os identificadores necessários com definições como: row.key.definition, row.key.delimiter ou default.column.family.

Criação automática de tabelas

Pode usar as definições auto.create.tables e auto.create.column.families para criar automaticamente tabelas de destino e famílias de colunas, se não existirem no seu destino do Bigtable. Esta flexibilidade tem um determinado custo de desempenho, pelo que recomendamos geralmente que crie primeiro as tabelas para as quais quer transmitir dados.

Modos de escrita e eliminação de linhas

Quando escreve numa tabela, pode substituir completamente os dados se já existir uma linha ou optar por abandonar a operação com a definição insert.mode. Pode tirar partido desta definição em conjunto com o processamento de erros da DLQ para alcançar a garantia de entrega, pelo menos, uma vez.

Para emitir comandos DELETE, configure a propriedade value.null.mode. Pode usá-lo para eliminar linhas completas, famílias de colunas ou colunas individuais.

Fila de mensagens rejeitadas

Configure a propriedade errors.deadletterqueue.topic.name e defina errors.tolerance=all para publicar mensagens que não consigam ser processadas no tópico DLQ.

Compatibilidade com o conetor do Bigtable Sink da Confluent Platform

O conetor de destino do Bigtable Kafka Connect da Google Cloud oferece paridade total com o conetor de destino do Bigtable da plataforma Confluent autogerido. Pode usar o ficheiro de configuração existente para o conector da Confluent Platform ajustando a definição connector.class para connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector.

Limitações

Aplicam-se as seguintes limitações:

  • Atualmente, o conetor de destino do Bigtable do Kafka Connect só é suportado para clusters do Kafka onde pode instalar conetores de forma independente (clusters do Kafka autogeridos ou no local). Atualmente, este conetor não é suportado para o Google Cloud Managed Service para Apache Kafka.

  • Este conetor pode criar famílias de colunas e colunas a partir de nomes de campos até dois níveis de aninhamento:

    • As estruturas aninhadas com mais de dois níveis são convertidas em JSON e guardadas na respetiva coluna principal.
    • As estruturas de nível raiz são transformadas em famílias de colunas. Os campos nessas structs tornam-se nomes de colunas.
    • Os valores primitivos ao nível da raiz são guardados por predefinição numa família de colunas que usa o tópico do Kafka como nome. As colunas nessa família têm nomes iguais aos nomes dos campos. Pode modificar este comportamento através das definições default.column.family e default.column.qualifier.

Instalação

Para instalar este conetor, siga os passos de instalação padrão: crie o projeto com o Maven, copie os ficheiros .jar para o diretório de plug-ins do Kafka Connect e crie o ficheiro de configuração. Para ver instruções passo a passo, consulte a secção Executar o conector no repositório.

Configuração

Para configurar os conetores do Kafka Connect, tem de escrever ficheiros de configuração. O conetor de destino do Bigtable Kafka Connect da Google Cloud suporta todas as propriedades básicas do conetor Kafka, bem como alguns campos adicionais personalizados para trabalhar com tabelas do Bigtable.

As secções seguintes fornecem exemplos detalhados para exemplos de utilização mais avançados, mas não descrevem todas as definições disponíveis. Para ver exemplos de utilização básica e a referência completa das propriedades, consulte o repositório do conetor de destino do Bigtable do Kafka Connect.

Exemplo: criação flexível de chave de linha e família de colunas

Cenário de exemplo

As suas mensagens Kafka recebidas contêm detalhes de encomendas de compras com identificadores de utilizadores. Quer escrever cada encomenda numa linha com duas famílias de colunas: uma para os detalhes do utilizador e outra para os detalhes da encomenda.

Formato de mensagem Kafka de origem

Formate as mensagens Kafka publicadas no tópico com o elemento JsonConverter para alcançar a seguinte estrutura:

{
  "user": "user123",
  "phone": "800‑555‑0199",
  "email": "business@example.com",
  "order": {
    id: "order123",
    items: ["itemUUID1", "itemUUID2"],
    discount: 0.2
  }
}
Linha do Bigtable esperada

Quer escrever cada mensagem como uma linha do Bigtable com a seguinte estrutura:

Chave da linha contact_details order_details
nome telemóvel email orderId itens desconto
user123#order123 user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0.2
Configuração do conetor
Para alcançar o resultado esperado, escreve o seguinte ficheiro de configuração:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Settings for row key creation
row.key.definition=user,order.id
row.key.delimiter=#

# All user identifiers are root level fields.
# Use the default column family to aggregate them into a single family.
default.column.family=contact_details

# Use SMT to rename "orders" field into "order_details" for the new column family
transforms=renameOrderField
transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.renameOrderField.renames=order:order_details
Os resultados da utilização deste ficheiro são os seguintes:
  • row.key.definition=user,order.id é uma lista separada por vírgulas dos campos que quer usar para construir a chave da linha. Cada entrada é concatenada com o conjunto de carateres definido na definição row.key.delimiter.

    Quando usa row.key.definition, todas as suas mensagens têm de usar o mesmo esquema. Se precisar de processar mensagens com estruturas diferentes em colunas ou famílias de colunas diferentes, recomendamos que crie instâncias de conetores separadas. Para mais informações, consulte a secção Exemplo: escrever mensagens em várias tabelas deste documento.

  • Os nomes das famílias de colunas do Bigtable baseiam-se nos nomes das estruturas de nível raiz não nulas. Como tal:

    • Os valores dos detalhes de contacto são tipos de dados primitivos ao nível da raiz, pelo que os agrega numa família de colunas predefinida com a definição default.column.family=contact_details.
    • Os detalhes da encomenda já estão incluídos no objeto order, mas quer usar order_details como o nome da família de colunas. Para o conseguir, use o SMT ReplaceFields e mude o nome do campo.

Exemplo: criação automática de tabelas e gravações idempotentes

Cenário de exemplo

As suas mensagens Kafka recebidas contêm detalhes das encomendas de compras. Os clientes podem editar os respetivos cestos antes do processamento, pelo que espera receber mensagens de seguimento com encomendas alteradas que tem de guardar como atualizações na mesma linha. Também não pode garantir que a tabela de destino existe no momento da escrita, pelo que quer que o conector crie automaticamente a tabela se não existir.

Configuração do conetor
Para alcançar o resultado esperado, escreve o seguinte ficheiro de configuração:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Automatically create destination tables if they don't exist
auto.create.tables=true

# UPSERT causes subsequent writes to overwrite existing rows.
# This way you can update the same order when customers change the contents
# of their baskets.
insert.mode=upsert

Exemplo: escrever mensagens em várias tabelas

Cenário de exemplo

As suas mensagens Kafka recebidas contêm detalhes de encomendas de compras de diferentes canais de processamento. Estas mensagens são publicadas em tópicos diferentes e quer usar o mesmo ficheiro de configuração para as escrever em tabelas separadas.

Configuração do conetor

Pode escrever as suas mensagens em várias tabelas, mas se usar um único ficheiro de configuração para a sua configuração, cada mensagem tem de usar o mesmo esquema. Se precisar de processar mensagens de diferentes tópicos em colunas ou famílias distintas, recomendamos que crie instâncias de conetores separadas.

Para alcançar o resultado esperado, escreve o seguinte ficheiro de configuração:

# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation are also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topics where shopping details are posted
topics=shopping_topic_store1,shopping_topic_store2

# Use a dynamic table name based on the Kafka topic name.
table.name.format=orders_${topic}

Nesta abordagem, usa a propriedade table.name.format=orders_${topic} para fazer referência dinamicamente a cada nome de tópico do Kafka. Quando configura vários nomes de tópicos com a definição topics=shopping_topic_store1,shopping_topic_store2, cada mensagem é escrita numa tabela separada:

  • As mensagens do tópico shopping_topic_store1 são escritas na tabela orders_shopping_topic_store1.
  • As mensagens do tópico shopping_topic_store2 são escritas na tabela orders_shopping_topic_store2.

O que se segue?