Conector de receptor de Kafka Connect Bigtable


Los conectores de receptor son complementos del framework Kafka Connect que puedes usar para transmitir datos de Kafka directamente a otros sistemas para almacenarlos y procesarlos. El sumidero de Kafka Connect Bigtable es un conector específico diseñado para transmitir datos a Bigtable en tiempo real con la menor latencia posible.

En esta página se describen las funciones y las limitaciones del conector. También proporciona ejemplos de uso para casos avanzados con transformaciones de un solo mensaje (SMTs) y creación de tablas automatizada. Para obtener instrucciones de instalación y documentación de referencia completa, consulta el repositorio del conector de receptor de Kafka Connect Bigtable.

Funciones

El conector de sink de Bigtable se suscribe a tus temas de Kafka, lee los mensajes recibidos en estos temas y, a continuación, escribe los datos en las tablas de Bigtable. En las siguientes secciones se ofrece una descripción general de cada función. Para obtener información sobre el uso, consulta la sección Configuración de este documento.

Asignación de teclas, SMTs y convertidores

Para escribir datos en una tabla de Bigtable, debes proporcionar una clave de fila, una familia de columnas y un nombre de columna únicos para cada operación. Esta información se deduce de los campos de los mensajes de Kafka. Puedes crear todos los identificadores necesarios con ajustes como los siguientes: row.key.definition, row.key.delimiter o default.column.family.

Creación automática de tablas

Puedes usar los ajustes auto.create.tables y auto.create.column.families para crear automáticamente tablas de destino y familias de columnas si no existen en tu destino de Bigtable. Esta flexibilidad tiene un coste de rendimiento, por lo que generalmente recomendamos que primero cree las tablas en las que quiera transmitir datos.

Modos de escritura y eliminación de filas

Cuando escribes en una tabla, puedes sobrescribir completamente los datos si ya existe una fila o abandonar la operación con el ajuste insert.mode. Puedes usar este ajuste junto con la gestión de errores de DLQ para conseguir la garantía de entrega al menos una vez.

Para enviar comandos DELETE, configura la propiedad value.null.mode. Puedes usarlo para eliminar filas completas, familias de columnas o columnas individuales.

Cola de mensajes fallidos

Configure la propiedad errors.deadletterqueue.topic.name y asigne el valor errors.tolerance=all para publicar los mensajes que no se puedan procesar en el tema de la cola de mensajes fallidos.

Compatibilidad con el conector de receptor de Bigtable de Confluent Platform

El conector de sink de Bigtable Kafka Connect de Google Cloud ofrece paridad total con el conector de sink de Bigtable de Confluent Platform autogestionado. Puedes usar el archivo de configuración que ya tengas para el conector de Confluent Platform. Para ello, cambia el valor del ajuste connector.class a connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector.

Limitaciones

Se aplican las siguientes limitaciones:

  • Actualmente, el conector de sink de Kafka Connect Bigtable solo se admite en clústeres de Kafka en los que puedes instalar conectores de forma independiente (clústeres de Kafka autogestionados o locales). Actualmente, este conector no es compatible con Google Cloud Managed Service para Apache Kafka.

  • Este conector puede crear familias de columnas y columnas a partir de nombres de campo con hasta dos niveles de anidación:

    • Las estructuras anidadas a más de dos niveles se convierten en JSON y se guardan en su columna principal.
    • Las estructuras de nivel raíz se transforman en familias de columnas. Los campos de esas estructuras se convierten en nombres de columna.
    • Los valores primitivos de nivel raíz se guardan de forma predeterminada en una familia de columnas que usa el tema de Kafka como nombre. Las columnas de esa familia tienen nombres iguales a los nombres de los campos. Puedes modificar este comportamiento con los ajustes default.column.family y default.column.qualifier.

Instalación

Para instalar este conector, sigue los pasos de instalación estándar: compila el proyecto con Maven, copia los archivos .jar en el directorio de complementos de Kafka Connect y crea el archivo de configuración. Para ver instrucciones detalladas, consulta la sección Ejecutar el conector del repositorio.

Configuración

Para configurar los conectores de Kafka Connect, debes escribir archivos de configuración. El conector de sink de Bigtable Kafka Connect de Google Cloud admite todas las propiedades básicas de los conectores de Kafka, así como algunos campos adicionales diseñados para trabajar con tablas de Bigtable.

En las secciones siguientes se ofrecen ejemplos detallados de casos prácticos más avanzados, pero no se describen todos los ajustes disponibles. Para ver ejemplos de uso básico y la referencia completa de las propiedades, consulta el repositorio del conector de sink de Kafka Connect Bigtable.

Ejemplo: creación flexible de claves de fila y familias de columnas

Situación de ejemplo

Tus mensajes de Kafka entrantes contienen detalles sobre pedidos de compra con identificadores de usuario. Quieres escribir cada pedido en una fila con dos familias de columnas: una para los detalles del usuario y otra para los detalles del pedido.

Formato de mensaje de Kafka de origen

Para dar formato a los mensajes de Kafka publicados en el tema con la estructura siguiente, usa JsonConverter:

{
  "user": "user123",
  "phone": "800‑555‑0199",
  "email": "business@example.com",
  "order": {
    id: "order123",
    items: ["itemUUID1", "itemUUID2"],
    discount: 0.2
  }
}
Fila de Bigtable esperada

Quieres escribir cada mensaje como una fila de Bigtable con la siguiente estructura:

Clave de fila contact_details order_details
name teléfono correo electrónico orderId elementos descuento
user123#order123 user123 800-555-0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0,2
Configuración del conector
Para obtener el resultado esperado, escribe el siguiente archivo de configuración:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Settings for row key creation
row.key.definition=user,order.id
row.key.delimiter=#

# All user identifiers are root level fields.
# Use the default column family to aggregate them into a single family.
default.column.family=contact_details

# Use SMT to rename "orders" field into "order_details" for the new column family
transforms=renameOrderField
transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.renameOrderField.renames=order:order_details
Estos son los resultados de usar este archivo:
  • row.key.definition=user,order.id es una lista separada por comas de los campos que quieres usar para crear la clave de fila. Cada entrada se concatena con el conjunto de caracteres de la configuración row.key.delimiter.

    Cuando usas row.key.definition, todos tus mensajes deben usar el mismo esquema. Si necesitas procesar mensajes con estructuras diferentes en columnas o familias de columnas distintas, te recomendamos que crees instancias de conector independientes. Para obtener más información, consulta la sección Ejemplo: escribir mensajes en varias tablas de este documento.

  • Los nombres de las familias de columnas de Bigtable se basan en los nombres de las estructuras de nivel raíz no nulas. Por tanto:

    • Los valores de los detalles de contacto son tipos de datos primitivos de nivel raíz, por lo que se agregan en una familia de columnas predeterminada con el ajuste default.column.family=contact_details.
    • Los detalles del pedido ya están incluidos en el objeto order, pero quieres usar order_details como nombre de familia de columnas. Para ello, utiliza la SMT ReplaceFields y cambia el nombre del campo.

Ejemplo: creación automática de tablas y escrituras idempotentes

Situación de ejemplo

Tus mensajes de Kafka entrantes contienen detalles de los pedidos de compra. Los clientes pueden editar sus cestas antes de que se completen los pedidos, por lo que es habitual recibir mensajes de seguimiento con pedidos modificados que debe guardar como actualizaciones en la misma fila. Tampoco puedes garantizar que la tabla de destino exista en el momento de la escritura, por lo que quieres que el conector cree automáticamente la tabla si no existe.

Configuración del conector
Para obtener el resultado esperado, escribe el siguiente archivo de configuración:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Automatically create destination tables if they don't exist
auto.create.tables=true

# UPSERT causes subsequent writes to overwrite existing rows.
# This way you can update the same order when customers change the contents
# of their baskets.
insert.mode=upsert

Ejemplo: escribir mensajes en varias tablas

Situación de ejemplo

Tus mensajes de Kafka entrantes contienen detalles de pedidos de compra de diferentes canales de tramitación. Estos mensajes se publican en temas diferentes y quieres usar el mismo archivo de configuración para escribirlos en tablas independientes.

Configuración del conector

Puedes escribir tus mensajes en varias tablas, pero si usas un solo archivo de configuración para tu configuración, cada mensaje debe usar el mismo esquema. Si necesitas procesar mensajes de diferentes temas en columnas o familias distintas, te recomendamos que crees instancias de conector independientes.

Para obtener el resultado esperado, escribe el siguiente archivo de configuración:

# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation are also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topics where shopping details are posted
topics=shopping_topic_store1,shopping_topic_store2

# Use a dynamic table name based on the Kafka topic name.
table.name.format=orders_${topic}

Con este método, se usa la propiedad table.name.format=orders_${topic} para hacer referencia de forma dinámica a cada nombre de tema de Kafka. Si configura varios nombres de temas con el ajuste topics=shopping_topic_store1,shopping_topic_store2, cada mensaje se escribe en una tabla independiente:

  • Los mensajes del tema shopping_topic_store1 se escriben en la tabla orders_shopping_topic_store1.
  • Los mensajes del tema shopping_topic_store2 se escriben en la tabla orders_shopping_topic_store2.

Siguientes pasos