Conetor de destino do Bigtable do Kafka Connect
Os conetores de destino são plug-ins para a framework Kafka Connect que pode usar para fazer stream de dados do Kafka diretamente para outros sistemas para armazenamento e processamento. O Kafka Connect Bigtable sink é um conetor dedicado concebido para transmitir dados para o Bigtable em tempo real com a menor latência possível.
Esta página descreve as funcionalidades e as limitações do conector. Também fornece exemplos de utilização para cenários avançados com transformações de mensagens únicas (SMTs) e criação automática de tabelas. Para ver instruções de instalação e documentação de referência completa, consulte o repositório do conetor Kafka Connect Bigtable Sink.
Funcionalidades
O conetor de destino do Bigtable subscreve os seus tópicos do Kafka, lê as mensagens recebidas nestes tópicos e, em seguida, escreve os dados nas tabelas do Bigtable. As secções seguintes oferecem uma vista geral de cada funcionalidade. Para ver detalhes de utilização, consulte a secção Configuração deste documento.
Mapeamento de teclas, SMTs e conversores
Para escrever dados numa tabela do Bigtable, tem de fornecer uma chave de linha, uma família de colunas e um nome de coluna únicos para cada operação.
Estas informações são inferidas a partir dos campos nas mensagens Kafka.
Pode criar todos os identificadores necessários com definições como:
row.key.definition
, row.key.delimiter
ou
default.column.family
.
Criação automática de tabelas
Pode usar as definições auto.create.tables
e auto.create.column.families
para criar automaticamente tabelas de destino e famílias de colunas, se não existirem no seu destino do Bigtable. Esta flexibilidade tem um determinado custo de desempenho, pelo que recomendamos geralmente que crie primeiro as tabelas para as quais quer transmitir dados.
Modos de escrita e eliminação de linhas
Quando escreve numa tabela, pode substituir completamente os dados
se já existir uma linha ou optar por abandonar a operação
com a definição insert.mode
. Pode tirar partido desta definição
em conjunto com o processamento de erros da DLQ para alcançar a garantia de entrega, pelo menos, uma vez.
Para emitir comandos DELETE
, configure a propriedade value.null.mode
. Pode usá-lo para eliminar linhas completas, famílias de colunas ou colunas individuais.
Fila de mensagens rejeitadas
Configure a propriedade errors.deadletterqueue.topic.name
e defina errors.tolerance=all
para publicar mensagens que não consigam ser processadas no tópico DLQ.
Compatibilidade com o conetor do Bigtable Sink da Confluent Platform
O conetor de destino do Bigtable Kafka Connect da Google Cloud
oferece paridade total com o
conetor de destino do Bigtable da plataforma Confluent autogerido.
Pode usar o ficheiro de configuração existente para o conector da Confluent Platform
ajustando a definição connector.class
para
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
.
Limitações
Aplicam-se as seguintes limitações:
Atualmente, o conetor de destino do Bigtable do Kafka Connect só é suportado para clusters do Kafka onde pode instalar conetores de forma independente (clusters do Kafka autogeridos ou no local). Atualmente, este conetor não é suportado para o Google Cloud Managed Service para Apache Kafka.
Este conetor pode criar famílias de colunas e colunas a partir de nomes de campos até dois níveis de aninhamento:
- As estruturas aninhadas com mais de dois níveis são convertidas em
JSON
e guardadas na respetiva coluna principal. - As estruturas de nível raiz são transformadas em famílias de colunas. Os campos nessas structs tornam-se nomes de colunas.
- Os valores primitivos ao nível da raiz são guardados por predefinição numa família de colunas que usa o tópico do Kafka como nome. As colunas nessa família têm nomes iguais aos nomes dos campos. Pode modificar este comportamento através das definições
default.column.family
edefault.column.qualifier
.
- As estruturas aninhadas com mais de dois níveis são convertidas em
Instalação
Para instalar este conetor, siga os passos de instalação padrão:
crie o projeto com o Maven, copie os ficheiros .jar
para o diretório de plug-ins do Kafka Connect e crie o ficheiro de configuração.
Para ver instruções passo a passo, consulte a secção
Executar o conector
no repositório.
Configuração
Para configurar os conetores do Kafka Connect, tem de escrever ficheiros de configuração. O conetor de destino do Bigtable Kafka Connect da Google Cloud suporta todas as propriedades básicas do conetor Kafka, bem como alguns campos adicionais personalizados para trabalhar com tabelas do Bigtable.
As secções seguintes fornecem exemplos detalhados para exemplos de utilização mais avançados, mas não descrevem todas as definições disponíveis. Para ver exemplos de utilização básica e a referência completa das propriedades, consulte o repositório do conetor de destino do Bigtable do Kafka Connect.
Exemplo: criação flexível de chave de linha e família de colunas
- Cenário de exemplo
-
As suas mensagens Kafka recebidas contêm detalhes de encomendas de compras com identificadores de utilizadores. Quer escrever cada encomenda numa linha com duas famílias de colunas: uma para os detalhes do utilizador e outra para os detalhes da encomenda.
- Formato de mensagem Kafka de origem
-
Formate as mensagens Kafka publicadas no tópico com o elemento
JsonConverter
para alcançar a seguinte estrutura:{ "user": "user123", "phone": "800‑555‑0199", "email": "business@example.com", "order": { id: "order123", items: ["itemUUID1", "itemUUID2"], discount: 0.2 } }
- Linha do Bigtable esperada
-
Quer escrever cada mensagem como uma linha do Bigtable com a seguinte estrutura:
Chave da linha contact_details order_details nome telemóvel email orderId itens desconto user123#order123
user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0.2 - Configuração do conetor
-
Para alcançar o resultado esperado, escreve o seguinte ficheiro de configuração:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Settings for row key creation row.key.definition=user,order.id row.key.delimiter=# # All user identifiers are root level fields. # Use the default column family to aggregate them into a single family. default.column.family=contact_details # Use SMT to rename "orders" field into "order_details" for the new column family transforms=renameOrderField transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key transforms.renameOrderField.renames=order:order_details
Os resultados da utilização deste ficheiro são os seguintes:
-
row.key.definition=user,order.id
é uma lista separada por vírgulas dos campos que quer usar para construir a chave da linha. Cada entrada é concatenada com o conjunto de carateres definido na definiçãorow.key.delimiter
.Quando usa
row.key.definition
, todas as suas mensagens têm de usar o mesmo esquema. Se precisar de processar mensagens com estruturas diferentes em colunas ou famílias de colunas diferentes, recomendamos que crie instâncias de conetores separadas. Para mais informações, consulte a secção Exemplo: escrever mensagens em várias tabelas deste documento. -
Os nomes das famílias de colunas do Bigtable baseiam-se nos nomes das estruturas de nível raiz não nulas. Como tal:
- Os valores dos detalhes de contacto são tipos de dados primitivos ao nível da raiz, pelo que os agrega numa família de colunas predefinida com a definição
default.column.family=contact_details
. - Os detalhes da encomenda já estão incluídos no objeto
order
, mas quer usarorder_details
como o nome da família de colunas. Para o conseguir, use o SMT ReplaceFields e mude o nome do campo.
- Os valores dos detalhes de contacto são tipos de dados primitivos ao nível da raiz, pelo que os agrega numa família de colunas predefinida com a definição
Exemplo: criação automática de tabelas e gravações idempotentes
- Cenário de exemplo
-
As suas mensagens Kafka recebidas contêm detalhes das encomendas de compras. Os clientes podem editar os respetivos cestos antes do processamento, pelo que espera receber mensagens de seguimento com encomendas alteradas que tem de guardar como atualizações na mesma linha. Também não pode garantir que a tabela de destino existe no momento da escrita, pelo que quer que o conector crie automaticamente a tabela se não existir.
- Configuração do conetor
-
Para alcançar o resultado esperado, escreve o seguinte ficheiro de configuração:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Automatically create destination tables if they don't exist auto.create.tables=true # UPSERT causes subsequent writes to overwrite existing rows. # This way you can update the same order when customers change the contents # of their baskets. insert.mode=upsert
Exemplo: escrever mensagens em várias tabelas
- Cenário de exemplo
-
As suas mensagens Kafka recebidas contêm detalhes de encomendas de compras de diferentes canais de processamento. Estas mensagens são publicadas em tópicos diferentes e quer usar o mesmo ficheiro de configuração para as escrever em tabelas separadas.
- Configuração do conetor
-
Pode escrever as suas mensagens em várias tabelas, mas se usar um único ficheiro de configuração para a sua configuração, cada mensagem tem de usar o mesmo esquema. Se precisar de processar mensagens de diferentes tópicos em colunas ou famílias distintas, recomendamos que crie instâncias de conetores separadas.
Para alcançar o resultado esperado, escreve o seguinte ficheiro de configuração:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation are also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topics where shopping details are posted topics=shopping_topic_store1,shopping_topic_store2 # Use a dynamic table name based on the Kafka topic name. table.name.format=orders_${topic}
Nesta abordagem, usa a propriedade
table.name.format=orders_${topic}
para fazer referência dinamicamente a cada nome de tópico do Kafka. Quando configura vários nomes de tópicos com a definiçãotopics=shopping_topic_store1,
, cada mensagem é escrita numa tabela separada:shopping_topic_store2 - As mensagens do tópico
shopping_topic_store1
são escritas na tabelaorders_shopping_topic_store1
. - As mensagens do tópico
shopping_topic_store2
são escritas na tabelaorders_shopping_topic_store2
.
- As mensagens do tópico