En esta página se explica cómo usar el conector de Kafka para consumir y reenviar datos de flujos de cambios de Spanner.
Conceptos básicos
A continuación, se describen los conceptos básicos del conector de Kafka.
Debezium
Debezium es un proyecto de software libre que proporciona una plataforma de streaming de datos de baja latencia para la captura de datos de cambios.
Conector de Kafka
El conector de Kafka proporciona una abstracción sobre la API de Spanner para publicar flujos de cambios de Spanner en Kafka. Con este conector, no tienes que gestionar el ciclo de vida de la partición de los flujos de cambios, lo que es necesario cuando usas la API de Spanner directamente.
El conector de Kafka genera un evento de cambio por cada registro de cambio de datos mod y envía registros de eventos de cambio a un tema de Kafka independiente por cada tabla de la que se haga un seguimiento de los cambios. Un registro de modificación de datos representa una sola modificación (inserción, actualización o eliminación) que se ha capturado. Un único registro de cambio de datos puede contener más de una modificación.
Salida del conector de Kafka
El conector de Kafka reenvía los registros de los flujos de cambios directamente a un tema de Kafka independiente. El nombre del tema de salida debe ser connector_name
.table_name
.
Si el tema no existe, el conector de Kafka crea automáticamente un tema con ese nombre.
También puedes configurar transformaciones de enrutamiento de temas para redirigir registros a los temas que especifiques. Si quieres usar el enrutamiento por temas, inhabilita la función Marca de agua baja.
Ordenación de registros
Los registros se ordenan por la marca de tiempo de confirmación por clave principal en los temas de Kafka. Los registros que pertenecen a diferentes claves principales no tienen garantías de ordenación. Los registros con la misma clave principal se almacenan en la misma partición de temas de Kafka. Si quieres procesar transacciones completas, también puedes usar los campos registro de cambios de datos
server_transaction_id
y number_of_records_in_transaction
para
crear una transacción de Spanner.
Cambiar eventos
El conector de Kafka genera un evento de cambio de datos para cada operación INSERT
, UPDATE
y DELETE
. Cada evento contiene una clave y valores de la fila modificada.
Puedes usar convertidores de Kafka Connect para generar eventos de cambio de datos en formatos Protobuf
, AVRO
, JSON
o JSON Schemaless
. Si usas un convertidor de Kafka Connect que genera esquemas, el evento contiene esquemas independientes para la clave y los valores. De lo contrario, el evento solo contiene la clave y los valores.
El esquema de la clave nunca cambia. El esquema de los valores es una combinación de todas las columnas que ha monitorizado el flujo de cambios desde la hora de inicio del conector.
Si configura el conector para que genere eventos JSON, el evento de cambio de salida contendrá cinco campos:
El primer campo
schema
especifica un esquema de Kafka Connect que describe el esquema de clave de Spanner.El primer campo
payload
tiene la estructura descrita en el camposchema
anterior y contiene la clave de la fila que se ha modificado.El segundo campo
schema
especifica el esquema de Kafka Connect que describe el esquema de la fila modificada.El segundo campo
payload
tiene la estructura descrita por el camposchema
anterior y contiene los datos reales de la fila que se ha modificado.El campo
source
es obligatorio y describe los metadatos de origen del evento.
A continuación, se muestra un ejemplo de un evento de cambio de datos:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Marca de agua baja
La marca de agua inferior describe el momento T en el que el conector de Kafka tiene la garantía de haber transmitido y publicado en un tema de Kafka todos los eventos con una marca de tiempo inferior a T.
Puedes habilitar la marca de agua mínima en el conector de Kafka mediante el parámetro gcp.spanner.low-watermark.enabled
. Este parámetro está inhabilitado de forma predeterminada. Si la marca de agua mínima está habilitada, el campo low_watermark
del registro de cambios de datos de la secuencia de cambios se rellena con la marca de tiempo mínima actual del conector Kafka.
Si no se generan registros, el conector de Kafka envía "latidos" de marca de agua periódicos a los temas de salida de Kafka detectados por el conector.
Estos latidos de marca de agua son registros que están vacíos, excepto el campo low_watermark
. Después, puedes usar la marca de agua mínima para realizar agregaciones basadas en el tiempo.
Por ejemplo, puede usar la marca de agua mínima para ordenar los eventos por marca de tiempo de confirmación en las claves principales.
Temas de metadatos
El conector de Kafka, así como el framework Kafka Connect, crea varios temas de metadatos para almacenar información relacionada con el conector. No es recomendable modificar la configuración ni el contenido de estos temas de metadatos.
Estos son los temas de metadatos:
_consumer_offsets
: un tema creado automáticamente por Kafka. Almacena los desplazamientos del consumidor de los consumidores creados en el conector de Kafka._kafka-connect-offsets
: un tema creado automáticamente por Kafka Connect. Almacena los desplazamientos del conector._sync_topic_spanner_connector_connectorname
: un tema creado automáticamente por el conector. Almacena metadatos sobre las particiones de la secuencia de cambios._rebalancing_topic_spanner_connector_connectorname
: un tema creado automáticamente por el conector. Se usa para determinar si la tarea del conector está activa._debezium-heartbeat.connectorname
: un tema que se usa para procesar los latidos de los flujos de cambios de Spanner.
Tiempo de ejecución del conector de Kafka
A continuación, se describe el tiempo de ejecución del conector de Kafka.
Escalabilidad
El conector de Kafka se puede escalar horizontalmente y se ejecuta en una o varias tareas distribuidas entre varios trabajadores de Kafka Connect.
Garantías de entrega de mensajes
El conector de Kafka admite la garantía de entrega al menos una vez.
Tolerancia a fallos
El conector de Kafka es tolerante a los fallos. A medida que el conector de Kafka lee los cambios y genera eventos, registra la marca de tiempo de la última confirmación procesada para cada partición de flujo de cambios. Si el conector de Kafka se detiene por cualquier motivo (incluidos los fallos de comunicación, los problemas de red o los fallos de software), al reiniciarse, el conector de Kafka seguirá transmitiendo registros desde el punto en el que lo dejó.
El conector de Kafka lee el esquema de información en la marca de tiempo de inicio del conector de Kafka para obtener información sobre el esquema. De forma predeterminada, Spanner no puede leer el esquema de información en marcas de tiempo de lectura anteriores al periodo de conservación de versiones, que es de una hora. Si quiere iniciar el conector antes de una hora, debe aumentar el periodo de conservación de versiones de la base de datos.
Configurar el conector de Kafka
Crear un flujo de cambios
Para obtener información sobre cómo crear un flujo de cambios, consulta el artículo Crear un flujo de cambios. Para continuar con los pasos siguientes, se necesita una instancia de Spanner con un flujo de cambios configurado.
Tenga en cuenta que, si quiere que se devuelvan las columnas modificadas y las que no se han modificado en cada evento de cambio de datos, utilice el tipo de captura de valor NEW_ROW
. Para obtener más información, consulte Tipos de captura de valor.
Instalar el archivo JAR del conector de Kafka
Una vez que hayas instalado Zookeeper, Kafka y Kafka Connect, las tareas que te quedan para implementar un conector de Kafka son descargar el archivo de complemento del conector, extraer los archivos JAR en tu entorno de Kafka Connect y añadir el directorio con los archivos JAR al plugin.path
de Kafka Connect.
A continuación, debes reiniciar el proceso de Kafka Connect para que se incluyan los nuevos archivos JAR.
Si trabajas con contenedores inmutables, puedes extraer imágenes de los contenedores de Debezium para Zookeeper, Kafka y Kafka Connect. La imagen de Kafka Connect tiene el conector de Spanner preinstalado.
Para obtener más información sobre cómo instalar archivos JAR del conector Kafka basado en Debezium, consulta Instalar Debezium.
Configurar el conector de Kafka
A continuación, se muestra un ejemplo de la configuración de un conector de Kafka que se conecta a un flujo de cambios llamado changeStreamAll
en la base de datos users
de la instancia test-instance
y el proyecto test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
Esta configuración contiene lo siguiente:
Nombre del conector cuando se registra en un servicio de Kafka Connect.
Nombre de esta clase de conector de Spanner.
El ID del proyecto.
El ID de la instancia de Spanner.
El ID de la base de datos de Spanner.
Nombre del flujo de cambios.
El objeto JSON de la clave de la cuenta de servicio.
(Opcional) Rol de la base de datos de Spanner que se va a usar.
El número máximo de tareas.
Para ver una lista completa de las propiedades de los conectores, consulta Propiedades de configuración de los conectores de Kafka.
Añadir la configuración del conector a Kafka Connect
Para empezar a ejecutar un conector de Spanner, sigue estos pasos:
Crea una configuración para el conector de Spanner.
Usa la API REST de Kafka Connect para añadir esa configuración de conector a tu clúster de Kafka Connect.
Puedes enviar esta configuración con un comando POST
a un servicio de Kafka Connect en ejecución. De forma predeterminada, el servicio Kafka Connect se ejecuta en el puerto 8083
.
El servicio registra la configuración e inicia la tarea del conector que se conecta a la base de datos de Spanner y transmite registros de eventos de cambio a temas de Kafka.
A continuación se muestra un ejemplo de comando POST
:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Ejemplo de respuesta correcta:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Actualizar la configuración del conector de Kafka
Para actualizar la configuración del conector, envía un comando PUT
al servicio Kafka Connect en ejecución con el mismo nombre de conector.
Supongamos que tenemos un conector en ejecución con la configuración de la sección anterior. A continuación se muestra un ejemplo de comando PUT
:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Ejemplo de respuesta correcta:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Detener el conector de Kafka
Para detener el conector, envía un comando DELETE
al servicio Kafka Connect en ejecución con el mismo nombre de conector.
Supongamos que tenemos un conector en ejecución con la configuración de la sección anterior. A continuación se muestra un ejemplo de comando DELETE
:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Ejemplo de respuesta correcta:
HTTP/1.1 204 No Content
Monitorizar el conector de Kafka
Además de las métricas estándar de Kafka Connect y Debezium, el conector de Kafka exporta sus propias métricas:
MilliSecondsLowWatermark
: marca de agua mínima actual de la tarea del conector en milisegundos. La marca de agua inferior describe el tiempo T en el que se garantiza que el conector ha transmitido todos los eventos con una marca de tiempo inferior a T.MilliSecondsLowWatermarkLag
: el retraso de la marca de agua mínima con respecto a la hora actual en milisegundos. Se han enviado todos los eventos con una marca de tiempo < T.LatencyLowWatermark<Variant>MilliSeconds
: el retraso de la marca de agua mínima con respecto a la hora actual en milisegundos. Se proporcionan las variantes P50, P95, P99, Media, Mín. y Máx.LatencySpanner<Variant>MilliSeconds
: latencia desde la marca de tiempo de confirmación de Spanner hasta la lectura del conector. Se proporcionan las variantes P50, P95, P99, Media, Mín. y Máx.LatencyReadToEmit<Variant>MilliSeconds
: latencia entre la marca de tiempo de lectura de Spanner y la emisión del conector. Se proporcionan las variantes P50, P95, P99, Media, Mín. y Máx.LatencyCommitToEmit<Variant>tMilliSeconds
: latencia entre la marca de tiempo de confirmación de Spanner y la emisión del conector. Se proporcionan las variantes P50, P95, P99, Media, Mín. y Máx.LatencyCommitToPublish<Variant>MilliSeconds
: latencia entre la marca de tiempo de confirmación de Spanner y la marca de tiempo de publicación de Kafka. Se proporcionan las variantes P50, P95, P99, Media, Mín. y Máx.NumberOfChangeStreamPartitionsDetected
: número total de particiones detectadas por la tarea del conector actual.NumberOfChangeStreamQueriesIssued
: número total de consultas de flujo de cambios emitidas por la tarea actual.NumberOfActiveChangeStreamQueries
: número activo de consultas de flujo de cambios detectadas por la tarea del conector actual.SpannerEventQueueCapacity
: capacidad total deStreamEventQueue
, una cola que almacena elementos recibidos de consultas de flujo de cambios.SpannerEventQueueCapacity
: la capacidadStreamEventQueue
restante.TaskStateChangeEventQueueCapacity
: capacidad total deTaskStateChangeEventQueue
, una cola que almacena los eventos que se producen en el conector.RemainingTaskStateChangeEventQueueCapacity
: la capacidadTaskStateChangeEventQueue
restante.NumberOfActiveChangeStreamQueries
: número activo de consultas de flujo de cambios detectadas por la tarea del conector actual.
Propiedades de configuración del conector de Kafka
Estas son las propiedades de configuración obligatorias del conector:
name
: nombre único del conector. Si intentas registrarte de nuevo con el mismo nombre, se producirá un error. Todos los conectores de Kafka Connect deben incluir esta propiedad.connector.class
: nombre de la clase de Java del conector. Utilice siempre el valorio.debezium.connector.spanner.SpannerConnector
para el conector de Kafka.tasks.max
: número máximo de tareas que se deben crear para este conector.gcp.spanner.project.id
: el ID del proyectogcp.spanner.instance.id
: el ID de la instancia de Spannergcp.spanner.database.id
: el ID de la base de datos de Spannergcp.spanner.change.stream
: el nombre del flujo de cambios de Spannergcp.spanner.credentials.json
: el objeto JSON de la clave de la cuenta de servicio.gcp.spanner.credentials.path
: la ruta de acceso al objeto JSON de la clave de cuenta de servicio. Obligatorio si no se proporciona el campo anterior.gcp.spanner.database.role
: el rol de la base de datos de Spanner que se va a usar. Solo es necesario cuando el flujo de cambios está protegido con un control de acceso detallado. El rol de la base de datos debe tener el privilegioSELECT
en el flujo de cambios y el privilegioEXECUTE
en la función de lectura del flujo de cambios. Para obtener más información, consulta Control de acceso pormenorizado para secuencias de cambios.
Las siguientes propiedades de configuración avanzada tienen valores predeterminados que funcionan en la mayoría de las situaciones y, por lo tanto, rara vez es necesario especificarlas en la configuración del conector:
gcp.spanner.low-watermark.enabled
: indica si la marca de agua baja está habilitada en el conector. El valor predeterminado es "false".gcp.spanner.low-watermark.update-period.ms
: intervalo en el que se actualiza la marca de agua mínima. El valor predeterminado es 1000 ms.heartbeat.interval.ms
: el intervalo de latido de Spanner. El valor predeterminado es 300000 (cinco minutos).gcp.spanner.start.time
: hora de inicio del conector. El valor predeterminado es la hora actual.gcp.spanner.end.time
: la hora de finalización del conector. El valor predeterminado es infinito.tables.exclude.list
: las tablas de las que se deben excluir los eventos de cambio. El valor predeterminado es una cadena vacía.tables.include.list
: las tablas de las que se deben incluir los eventos de cambio. Si no se rellena, se incluyen todas las tablas. El valor predeterminado es una cadena vacía.gcp.spanner.stream.event.queue.capacity
: capacidad de la cola de eventos de Spanner. El valor predeterminado es 10.000.connector.spanner.task.state.change.event.queue.capacity
: capacidad de la cola de eventos de cambio de estado de la tarea. El valor predeterminado es 1000.connector.spanner.max.missed.heartbeats
: número máximo de latidos perdidos en una consulta de flujo de cambios antes de que se genere una excepción. El valor predeterminado es 10.scaler.monitor.enabled
: indica si el autoescalado de tareas está habilitado. El valor predeterminado es "false".tasks.desired.partitions
: número preferido de particiones de flujo de cambios por tarea. Este parámetro es necesario para el autoescalado de tareas. El valor predeterminado es 2.tasks.min
: número mínimo de tareas. Este parámetro es necesario para el autoescalado de tareas. El valor predeterminado es 1.connector.spanner.sync.topic
: nombre del tema de sincronización, un tema de conector interno que se usa para almacenar la comunicación entre tareas. El valor predeterminado es_sync_topic_spanner_connector_connectorname
si el usuario no ha proporcionado un nombre.connector.spanner.sync.poll.duration
: duración de la encuesta del tema de sincronización. El valor predeterminado es 500 ms.connector.spanner.sync.request.timeout.ms
: tiempo de espera de las solicitudes al tema de sincronización. El valor predeterminado es 5000 ms.connector.spanner.sync.delivery.timeout.ms
: Tiempo de espera para publicar en el tema de sincronización. El valor predeterminado es 15.000 ms.connector.spanner.sync.commit.offsets.interval.ms
: intervalo en el que se confirman los desplazamientos del tema de sincronización. El valor predeterminado es 60.000 ms.connector.spanner.sync.publisher.wait.timeout
: intervalo en el que se publican mensajes en el tema de sincronización. El valor predeterminado es 5 ms.connector.spanner.rebalancing.topic
: el nombre del tema de reequilibrio. El tema de reequilibrado es un tema de conector interno que se usa para determinar si una tarea está activa. El valor predeterminado es_rebalancing_topic_spanner_connector_connectorname
si el usuario no ha proporcionado un nombre.connector.spanner.rebalancing.poll.duration
: duración de la encuesta sobre el tema del reequilibrio. El valor predeterminado es 5000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: tiempo de espera para confirmar las compensaciones del tema de reequilibrio. El valor predeterminado es 5000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: intervalo en el que se confirman los desplazamientos del tema de sincronización. El valor predeterminado es 60.000 ms.connector.spanner.rebalancing.task.waiting.timeout
: tiempo que espera una tarea antes de procesar un evento de reequilibrio. El valor predeterminado es 1000 ms.
Para ver una lista aún más detallada de las propiedades configurables del conector, consulta el repositorio de GitHub.
Limitaciones
El conector no admite el streaming de eventos de instantánea.
Si la marca de agua está habilitada en el conector, no puedes configurar transformaciones de enrutamiento de temas de Debezium.