Conector de receptor de Kafka Connect Bigtable
Los conectores de receptor son complementos del framework Kafka Connect que puedes usar para transmitir datos de Kafka directamente a otros sistemas para almacenarlos y procesarlos. El sumidero de Kafka Connect Bigtable es un conector específico diseñado para transmitir datos a Bigtable en tiempo real con la menor latencia posible.
En esta página se describen las funciones y las limitaciones del conector. También proporciona ejemplos de uso para casos avanzados con transformaciones de un solo mensaje (SMTs) y creación de tablas automatizada. Para obtener instrucciones de instalación y documentación de referencia completa, consulta el repositorio del conector de receptor de Kafka Connect Bigtable.
Funciones
El conector de sink de Bigtable se suscribe a tus temas de Kafka, lee los mensajes recibidos en estos temas y, a continuación, escribe los datos en las tablas de Bigtable. En las siguientes secciones se ofrece una descripción general de cada función. Para obtener información sobre el uso, consulta la sección Configuración de este documento.
Asignación de teclas, SMTs y convertidores
Para escribir datos en una tabla de Bigtable, debes proporcionar una clave de fila, una familia de columnas y un nombre de columna únicos para cada operación.
Esta información se deduce de los campos de los mensajes de Kafka.
Puedes crear todos los identificadores necesarios con ajustes como los siguientes:
row.key.definition
, row.key.delimiter
o
default.column.family
.
Creación automática de tablas
Puedes usar los ajustes auto.create.tables
y auto.create.column.families
para crear automáticamente tablas de destino y familias de columnas si no existen en tu destino de Bigtable. Esta flexibilidad tiene un coste de rendimiento, por lo que generalmente recomendamos que primero cree las tablas en las que quiera transmitir datos.
Modos de escritura y eliminación de filas
Cuando escribes en una tabla, puedes sobrescribir completamente los datos
si ya existe una fila o abandonar la operación
con el ajuste insert.mode
. Puedes usar este ajuste junto con la gestión de errores de DLQ para conseguir la garantía de entrega al menos una vez.
Para enviar comandos DELETE
, configura la propiedad value.null.mode
. Puedes usarlo para eliminar filas completas, familias de columnas o columnas individuales.
Cola de mensajes fallidos
Configure la propiedad errors.deadletterqueue.topic.name
y asigne el valor errors.tolerance=all
para publicar los mensajes que no se puedan procesar en el tema de la cola de mensajes fallidos.
Compatibilidad con el conector de receptor de Bigtable de Confluent Platform
El conector de sink de Bigtable Kafka Connect de Google Cloud
ofrece paridad total con el
conector de sink de Bigtable de Confluent Platform autogestionado.
Puedes usar el archivo de configuración que ya tengas para el conector de Confluent Platform. Para ello, cambia el valor del ajuste connector.class
a connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
.
Limitaciones
Se aplican las siguientes limitaciones:
Actualmente, el conector de sink de Kafka Connect Bigtable solo se admite en clústeres de Kafka en los que puedes instalar conectores de forma independiente (clústeres de Kafka autogestionados o locales). Actualmente, este conector no es compatible con Google Cloud Managed Service para Apache Kafka.
Este conector puede crear familias de columnas y columnas a partir de nombres de campo con hasta dos niveles de anidación:
- Las estructuras anidadas a más de dos niveles se convierten en
JSON
y se guardan en su columna principal. - Las estructuras de nivel raíz se transforman en familias de columnas. Los campos de esas estructuras se convierten en nombres de columna.
- Los valores primitivos de nivel raíz se guardan de forma predeterminada en una familia de columnas que usa el tema de Kafka como nombre. Las columnas de esa familia tienen nombres iguales a los nombres de los campos. Puedes modificar este comportamiento con los ajustes
default.column.family
ydefault.column.qualifier
.
- Las estructuras anidadas a más de dos niveles se convierten en
Instalación
Para instalar este conector, sigue los pasos de instalación estándar: compila el proyecto con Maven, copia los archivos .jar
en el directorio de complementos de Kafka Connect y crea el archivo de configuración.
Para ver instrucciones detalladas, consulta la sección Ejecutar el conector del repositorio.
Configuración
Para configurar los conectores de Kafka Connect, debes escribir archivos de configuración. El conector de sink de Bigtable Kafka Connect de Google Cloud admite todas las propiedades básicas de los conectores de Kafka, así como algunos campos adicionales diseñados para trabajar con tablas de Bigtable.
En las secciones siguientes se ofrecen ejemplos detallados de casos prácticos más avanzados, pero no se describen todos los ajustes disponibles. Para ver ejemplos de uso básico y la referencia completa de las propiedades, consulta el repositorio del conector de sink de Kafka Connect Bigtable.
Ejemplo: creación flexible de claves de fila y familias de columnas
- Situación de ejemplo
-
Tus mensajes de Kafka entrantes contienen detalles sobre pedidos de compra con identificadores de usuario. Quieres escribir cada pedido en una fila con dos familias de columnas: una para los detalles del usuario y otra para los detalles del pedido.
- Formato de mensaje de Kafka de origen
-
Para dar formato a los mensajes de Kafka publicados en el tema con la estructura siguiente, usa
JsonConverter
:{ "user": "user123", "phone": "800‑555‑0199", "email": "business@example.com", "order": { id: "order123", items: ["itemUUID1", "itemUUID2"], discount: 0.2 } }
- Fila de Bigtable esperada
-
Quieres escribir cada mensaje como una fila de Bigtable con la siguiente estructura:
Clave de fila contact_details order_details name teléfono correo electrónico orderId elementos descuento user123#order123
user123 800-555-0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0,2 - Configuración del conector
-
Para obtener el resultado esperado, escribe el siguiente archivo de configuración:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Settings for row key creation row.key.definition=user,order.id row.key.delimiter=# # All user identifiers are root level fields. # Use the default column family to aggregate them into a single family. default.column.family=contact_details # Use SMT to rename "orders" field into "order_details" for the new column family transforms=renameOrderField transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key transforms.renameOrderField.renames=order:order_details
Estos son los resultados de usar este archivo:
-
row.key.definition=user,order.id
es una lista separada por comas de los campos que quieres usar para crear la clave de fila. Cada entrada se concatena con el conjunto de caracteres de la configuraciónrow.key.delimiter
.Cuando usas
row.key.definition
, todos tus mensajes deben usar el mismo esquema. Si necesitas procesar mensajes con estructuras diferentes en columnas o familias de columnas distintas, te recomendamos que crees instancias de conector independientes. Para obtener más información, consulta la sección Ejemplo: escribir mensajes en varias tablas de este documento. -
Los nombres de las familias de columnas de Bigtable se basan en los nombres de las estructuras de nivel raíz no nulas. Por tanto:
- Los valores de los detalles de contacto son tipos de datos primitivos de nivel raíz, por lo que se agregan en una familia de columnas predeterminada con el ajuste
default.column.family=contact_details
. - Los detalles del pedido ya están incluidos en el objeto
order
, pero quieres usarorder_details
como nombre de familia de columnas. Para ello, utiliza la SMT ReplaceFields y cambia el nombre del campo.
- Los valores de los detalles de contacto son tipos de datos primitivos de nivel raíz, por lo que se agregan en una familia de columnas predeterminada con el ajuste
Ejemplo: creación automática de tablas y escrituras idempotentes
- Situación de ejemplo
-
Tus mensajes de Kafka entrantes contienen detalles de los pedidos de compra. Los clientes pueden editar sus cestas antes de que se completen los pedidos, por lo que es habitual recibir mensajes de seguimiento con pedidos modificados que debe guardar como actualizaciones en la misma fila. Tampoco puedes garantizar que la tabla de destino exista en el momento de la escritura, por lo que quieres que el conector cree automáticamente la tabla si no existe.
- Configuración del conector
-
Para obtener el resultado esperado, escribe el siguiente archivo de configuración:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Automatically create destination tables if they don't exist auto.create.tables=true # UPSERT causes subsequent writes to overwrite existing rows. # This way you can update the same order when customers change the contents # of their baskets. insert.mode=upsert
Ejemplo: escribir mensajes en varias tablas
- Situación de ejemplo
-
Tus mensajes de Kafka entrantes contienen detalles de pedidos de compra de diferentes canales de tramitación. Estos mensajes se publican en temas diferentes y quieres usar el mismo archivo de configuración para escribirlos en tablas independientes.
- Configuración del conector
-
Puedes escribir tus mensajes en varias tablas, pero si usas un solo archivo de configuración para tu configuración, cada mensaje debe usar el mismo esquema. Si necesitas procesar mensajes de diferentes temas en columnas o familias distintas, te recomendamos que crees instancias de conector independientes.
Para obtener el resultado esperado, escribe el siguiente archivo de configuración:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation are also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topics where shopping details are posted topics=shopping_topic_store1,shopping_topic_store2 # Use a dynamic table name based on the Kafka topic name. table.name.format=orders_${topic}
Con este método, se usa la propiedad
table.name.format=orders_${topic}
para hacer referencia de forma dinámica a cada nombre de tema de Kafka. Si configura varios nombres de temas con el ajustetopics=shopping_topic_store1,
, cada mensaje se escribe en una tabla independiente:shopping_topic_store2 - Los mensajes del tema
shopping_topic_store1
se escriben en la tablaorders_shopping_topic_store1
. - Los mensajes del tema
shopping_topic_store2
se escriben en la tablaorders_shopping_topic_store2
.
- Los mensajes del tema