En esta página, se explica cómo usar el conector de Kafka para consumir y reenviar datos de cambios de transmisiones de Cloud Spanner.
Conceptos básicos
A continuación, se describen los conceptos básicos del conector de Kafka.
Debezium
Debezium es un proyecto de código abierto que proporciona una plataforma de transmisión de datos de baja latencia para la captura de datos modificados.
Conector de Kafka
El conector de Kafka proporciona una abstracción sobre la API de Spanner para publicar transmisiones de cambios de Spanner en Kafka. Con este conector, no tienes que administrar el ciclo de vida de la partición de las transmisiones de cambios, lo cual es necesario cuando usas la API de Spanner de forma directa.
El conector de Kafka produce un evento de cambio por cada modificación de registro de cambio de datos y envía registros de evento de cambio de forma descendente en un tema de Kafka independiente para cada tabla con seguimiento de transmisión de cambios. Una modificación del registro de cambios de datos representa una sola modificación (insert, update o delete) que se capturó. Un registro de cambio de datos único puede contener más de una modificación.
Salida del conector de Kafka
El conector de Kafka reenvía los registros de transmisiones directamente a un tema de Kafka independiente. El nombre del tema de salida debería ser connector_name
.table_name
. Si el tema no existe, el conector de Kafka crea automáticamente un tema con ese nombre.
También puede configurar transformaciones de enrutamiento de temas para redirigir registros a los temas que especifique. Si deseas usar el enrutamiento de temas, inhabilita la funcionalidad de marca de agua baja.
Orden de los registros
Los registros se ordenan por marca de tiempo de confirmación por clave primaria en los temas de Kafka. Los registros que pertenecen a diferentes claves primarias no tienen garantías de ordenamiento. Los registros con la misma clave primaria se almacenan en la misma partición de tema de Kafka. Si deseas procesar transacciones completas, también puedes usar los campos server_transaction_id
y number_of_records_in_transaction
del registro de cambio de datos para ensamblar una transacción de Spanner.
Cambiar eventos
El conector de Kafka genera un evento de cambio de datos para cada operación INSERT
, UPDATE
y DELETE
. Cada evento contiene una clave y valores para la fila modificada.
Puedes usar los convertidores de Kafka Connect para producir eventos de cambio de datos en formatos Protobuf
, AVRO
, JSON
o JSON Schemaless
. Si usas un convertidor de Kafka Connect que produce esquemas, el evento contiene esquemas separados para la clave y los valores. De lo contrario, el evento solo contiene la clave y los valores.
El esquema para la clave nunca cambia. El esquema de los valores es una combinación de todas las columnas que la cadena de cambios realizó de un seguimiento desde la hora de inicio del conector.
Si configuras el conector para producir eventos JSON, el evento de cambio de salida contiene cinco campos:
El primer campo
schema
especifica un esquema de Kafka Connect que describe el esquema de clave de Spanner.El primer campo
payload
tiene la estructura descrita en el camposchema
anterior y contiene la clave de la fila que se modificó.El segundo campo
schema
especifica el esquema de Kafka Connect que describe el esquema de la fila modificada.El segundo campo
payload
tiene la estructura descrita en el camposchema
anterior y contiene los datos reales de la fila que se modificó.El campo
source
es obligatorio y describe los metadatos de origen del evento.
El siguiente es un ejemplo de un evento de cambio de datos:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Marca de agua baja
La marca de agua baja describe el tiempo en el que se garantiza que el conector de Kafka transmitió y publicó en un tema de Kafka todos los eventos con marca de tiempo < T.
Puedes habilitar la marca de agua baja en el conector de Kafka mediante el parámetro gcp.spanner.low-watermark.enabled
. Este parámetro está inhabilitado de forma predeterminada. Si la marca de agua baja está habilitada, el campo low_watermark
en el registro de cambios de datos del flujo de cambios se propaga con la marca de tiempo de marca de agua baja actual del conector de Kafka.
Si no se producen registros, el conector de Kafka envía marcas de marca de agua periódicas a los temas de salida de Kafka que detecta el conector.
Estas señales de monitoreo de funcionamiento son registros que están vacíos, excepto el campo low_watermark
. Luego, puedes usar la marca de agua baja para realizar agregaciones basadas en el tiempo.
Por ejemplo, puedes usar la marca de agua baja para ordenar los eventos por marca de tiempo de confirmación en las claves primarias.
Temas de metadatos
El conector de Kafka y el marco de trabajo de Kafka Connect crean varios temas de metadatos para almacenar información relacionada con el conector. No es recomendable modificar la configuración ni el contenido de estos temas de metadatos.
Los siguientes son los temas de metadatos:
_consumer_offsets
: un tema creado automáticamente por Kafka Almacena las compensaciones de consumidor para los consumidores creados en el conector de Kafka._kafka-connect-offsets
: un tema creado automáticamente por Kafka Connect Almacena las compensaciones del conector._sync_topic_spanner_connector_connectorname
: Un tema que el conector crea automáticamente Almacena metadatos sobre las particiones de transmisión de cambios._rebalancing_topic_spanner_connector_connectorname
: Un tema que el conector crea automáticamente Se utiliza para determinar la actividad de las tareas del conector._debezium-heartbeat.connectorname
: Es un tema que se usa para procesar las señales de monitoreo de funcionamiento de las transmisiones de cambios de Spanner.
Entorno de ejecución del conector de Kafka
A continuación, se describe el entorno de ejecución del conector de Kafka.
Escalabilidad
El conector de Kafka es escalable de manera horizontal y se ejecuta en una o más tareas distribuidas entre varios trabajadores de Kafka Connect.
Garantías de entrega de mensajes
El conector de Kafka es compatible con la garantía de entrega de al menos una vez.
Tolerancia a errores
El conector de Kafka tolera los errores. A medida que el conector de Kafka lee los cambios y produce eventos, registra la última marca de tiempo de confirmación procesada para cada partición del flujo de cambios. Si el conector de Kafka se detiene por cualquier motivo (incluidas las fallas de comunicación, los problemas de red o las fallas de software), cuando se reinicie, el conector de Kafka continuará transmitiendo registros donde se detuvo.
El conector de Kafka lee el esquema de información en la marca de tiempo de inicio del conector de Kafka para recuperar la información del esquema. De forma predeterminada, Spanner no puede leer el esquema de información en marcas de tiempo de lectura antes del período de retención de la versión, cuyo valor predeterminado es de una hora. Si quieres iniciar el conector desde antes de una hora hacia el pasado, debes aumentar el período de retención de la versión de la base de datos.
Configure el conector de Kafka
Crear transmisión de cambios
Para obtener detalles sobre cómo crear un flujo de cambios, consulte Cómo crear un flujo de cambios. Para continuar con los pasos siguientes, se necesita una instancia de Spanner con una transmisión de cambios configurada.
Ten en cuenta que, si deseas que se muestren las columnas modificadas y las no modificadas en cada evento de cambio de datos, usa el tipo de captura de valor NEW_ROW
. Para obtener más información, consulta el tipo de captura de valor.
Instalar el JAR del conector de Kafka
Con Zookeeper, Kafka y Kafka Connect instalados, las tareas restantes para implementar un conector Kafka son descargar el archivo del complemento del conector, extraer los archivos JAR en tu entorno de Kafka Connect y agregar el directorio con los archivos JAR a Kafka Connect plugin.path
.
Luego, debe reiniciar el proceso de Kafka Connect para obtener los archivos JAR nuevos.
Si trabajas con contenedores inmutables, puedes extraer imágenes de las imágenes de contenedor de Debezium para Zookeeper, Kafka y Kafka Connect. La imagen de Kafka Connect tiene preinstalado el conector de Cloud Spanner.
Para obtener más información sobre cómo instalar los archivos JAR del conector de Kafka basado en Debezium, consulta Instala Debezium.
Configure el conector de Kafka
El siguiente es un ejemplo de la configuración de un conector de Kafka que se conecta a un flujo de cambio llamado changeStreamAll
en la base de datos users
en la instancia test-instance
y el proyecto test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "tasks.max": "10" }
Esta configuración contiene lo siguiente:
El nombre del conector cuando se registra con un servicio de Kafka Connect.
El nombre de esta clase de conector de Spanner.
El ID del proyecto
El ID de la instancia de Spanner.
El ID de la base de datos de Spanner.
El nombre del flujo de cambios.
El objeto JSON para la clave de la cuenta de servicio.
La cantidad máxima de tareas.
Para obtener una lista completa de las propiedades del conector, consulta Propiedades de la configuración del conector de Kafka.
Agregar la configuración del conector a Kafka Connect
Para comenzar a ejecutar un conector de Spanner, haz lo siguiente:
Crea una configuración para el conector de Spanner.
Usa la API de REST de Kafka Connect para agregar esa configuración del conector a tu clúster de Kafka Connect.
Puedes enviar esta configuración con un comando POST
a un servicio de Kafka Connect en ejecución. De forma predeterminada, el servicio de Kafka Connect se ejecuta en el puerto 8083
.
El servicio registra la configuración y, luego, inicia la tarea del conector que se conecta a la base de datos de Spanner y transmite los registros de eventos a los temas de Kafka.
El siguiente es un ejemplo del comando POST
:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Ejemplo de respuesta exitosa:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Actualice la configuración del conector de Kafka
Para actualizar la configuración del conector, envía un comando PUT
al servicio de Kafka en ejecución en ejecución con el mismo nombre del conector.
Supongamos que tenemos un conector que se ejecuta con la configuración de la sección anterior. El siguiente es un ejemplo del comando PUT
:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Ejemplo de respuesta exitosa:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Detén el conector de Kafka
Para detener el conector, envía un comando DELETE
al servicio de Kafka en ejecución en ejecución con el mismo nombre del conector.
Supongamos que tenemos un conector que se ejecuta con la configuración de la sección anterior. El siguiente es un ejemplo del comando DELETE
:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Ejemplo de respuesta exitosa:
HTTP/1.1 204 No Content
Supervise el conector de Kafka
Además de las métricas estándar de Kafka Connect y Debezium, el conector de Kafka exporta sus propias métricas:
MilliSecondsLowWatermark
: La marca de agua baja actual de la tarea del conector en milisegundos. La marca de agua baja describe el tiempo en el que se garantiza que el conector haya transmitido todos los eventos con marca de tiempo < T.MilliSecondsLowWatermarkLag
: El retraso de la marca de agua baja detrás de la hora actual en milisegundos. Se transmitió por todos los eventos con marca de tiempo < T.LatencyLowWatermark<Variant>MilliSeconds
: Es el retraso de la marca de agua baja detrás de la hora actual en milisegundos. Se proporcionan las variantes P50, P95, P99, Promedio, Mín. y Máx.LatencySpanner<Variant>MilliSeconds
: Es la latencia de Spanner-commit-timestamp-to-connector-read. Se proporcionan las variantes P50, P95, P99, Promedio, Mín. y Máx.LatencyReadToEmit<Variant>MilliSeconds
: Es la latencia de lectura, marca de tiempo, conector yemit. Se proporcionan las variantes P50, P95, P99, Promedio, Mín. y Máx.LatencyCommitToEmit<Variant>tMilliSeconds
: Es la latencia de Spanner-commit-timestamp-to-connector-emit. Se proporcionan las variantes P50, P95, P99, Promedio, Mín. y Máx.LatencyCommitToPublish<Variant>MilliSeconds
: La latencia de Spanner-commit-timestamp-to a Kafka-publish-timestamp. Se proporcionan las variantes P50, P95, P99, Promedio, Mín. y Máx.NumberOfChangeStreamPartitionsDetected
: La cantidad total de particiones detectadas por la tarea del conector actual.NumberOfChangeStreamQueriesIssued
: La cantidad total de consultas de transmisión de cambios emitidas por la tarea actual.NumberOfActiveChangeStreamQueries
: La cantidad activa de consultas de transmisión de cambios detectadas por la tarea del conector actual.SpannerEventQueueCapacity
: La capacidad total deStreamEventQueue
, una cola que almacena elementos recibidos de las consultas de transmisión de cambios.SpannerEventQueueCapacity
: La capacidad restante deStreamEventQueue
.TaskStateChangeEventQueueCapacity
: La capacidad total deTaskStateChangeEventQueue
, una cola que almacena eventos en el conector.RemainingTaskStateChangeEventQueueCapacity
: La capacidad restante deTaskStateChangeEventQueue
.NumberOfActiveChangeStreamQueries
: La cantidad activa de consultas de transmisión de cambios detectadas por la tarea del conector actual.
Propiedades de configuración del conector de Kafka
Las siguientes son propiedades de configuración requeridas para el conector:
name
: nombre único para el conector. Si intentas volver a registrarte con el mismo nombre, se producirá un error. Todos los conectores de Kafka Connect requieren esta propiedad.connector.class
: El nombre de la clase Java para el conector. Usa siempre un valor deio.debezium.connector.spanner.SpannerConnector
para el conector de Kafka.tasks.max
: La cantidad máxima de tareas que se deben crear para este conector.gcp.spanner.project.id
: El ID del proyectogcp.spanner.instance.id
: El ID de la instancia de Spannergcp.spanner.database.id
: El ID de la base de datos de Spannergcp.spanner.change.stream
: Es el nombre del flujo de cambios de Spanner.gcp.spanner.credentials.json
: Es el objeto JSON de la clave de la cuenta de servicio.gcp.spanner.credentials.path
: La ruta del archivo al objeto JSON de la clave de cuenta de servicio. Obligatorio si no se proporciona el campo anterior.
Las siguientes propiedades de configuración avanzada tienen valores predeterminados que funcionan en la mayoría de los casos y, por lo tanto, rara vez deben especificarse en la configuración del conector:
gcp.spanner.low-watermark.enabled
: Indica si la marca de agua baja está habilitada para el conector. El valor predeterminado es falso.gcp.spanner.low-watermark.update-period.ms
: El intervalo en el que se actualiza la marca de agua baja. El valor predeterminado es 1,000 ms.heartbeat.interval.ms
: El intervalo de la señal de monitoreo de funcionamiento de Spanner. El valor predeterminado es 300,000 (cinco minutos).gcp.spanner.start.time
: Es la hora de inicio del conector. La configuración predeterminada es la hora actual.gcp.spanner.end.time
: Es la hora de finalización del conector. La configuración predeterminada es el infinito.tables.exclude.list
: Las tablas para las que se excluirán los eventos de cambio. La configuración predeterminada es "empty".tables.include.list
: Las tablas para las que se incluirán los eventos de cambio. Si no se propaga, se incluyen todas las tablas. La configuración predeterminada es "empty".gcp.spanner.stream.event.queue.capacity
: La capacidad de la cola de eventos de Spanner. La configuración predeterminada es 10,000.connector.spanner.task.state.change.event.queue.capacity
: El estado de la tarea cambia la capacidad de la cola de eventos. La configuración predeterminada es 1,000.connector.spanner.max.missed.heartbeats
: Es la cantidad máxima de señales de monitoreo de funcionamiento perdidas para una consulta de transmisión de cambios antes de que se produzca una excepción. La configuración predeterminada es 10.scaler.monitor.enabled
: Indica si el ajuste de escala automático de tareas está habilitado. La configuración predeterminada es "false".tasks.desired.partitions
: La cantidad preferida de particiones de transmisiones de cambio por tarea. Este parámetro es necesario para el ajuste de escala automático de tareas. La configuración predeterminada es 2.tasks.min
: La cantidad mínima de tareas. Este parámetro es necesario para el ajuste de escala automático de tareas. El valor predeterminado es 1.connector.spanner.sync.topic
: El nombre del tema de sincronización, un tema de conector interno que se usa para almacenar la comunicación entre tareas La configuración predeterminada es_sync_topic_spanner_connector_connectorname
si el usuario no proporcionó un nombre.connector.spanner.sync.poll.duration
: Es la duración de la encuesta para el tema de sincronización. La configuración predeterminada es 500 ms.connector.spanner.sync.request.timeout.ms
: El tiempo de espera para las solicitudes al tema de sincronización. El valor predeterminado es 5,000 ms.connector.spanner.sync.delivery.timeout.ms
: El tiempo de espera para publicar en el tema de sincronización. El valor predeterminado es 15,000 ms.connector.spanner.sync.commit.offsets.interval.ms
: El intervalo en el que se confirman las compensaciones para el tema de sincronización. El valor predeterminado es 60,000 ms.connector.spanner.sync.publisher.wait.timeout
: El intervalo en el que se publican los mensajes en el tema de sincronización. La configuración predeterminada es 5 ms.connector.spanner.rebalancing.topic
: Es el nombre del tema de rebalanceo. El tema de rebalanceo es un tema de conector interno que se usa para determinar la actividad de las tareas. La configuración predeterminada es_rebalancing_topic_spanner_connector_connectorname
si el usuario no proporcionó un nombre.connector.spanner.rebalancing.poll.duration
: Es la duración de la encuesta para el tema de rebalanceo. El valor predeterminado es 5,000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: El tiempo de espera para confirmar las compensaciones para el tema de rebalanceo. El valor predeterminado es 5,000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: El intervalo en el que se confirman las compensaciones para el tema de sincronización. El valor predeterminado es 60,000 ms.connector.spanner.rebalancing.task.waiting.timeout
: El tiempo que espera una tarea antes de procesar un evento de rebalanceo. La configuración predeterminada es 1,000 ms.
Para obtener una lista aún más detallada de las propiedades del conector configurable, consulta el repositorio de GitHub.
Limitaciones
El conector no admite la transmisión de eventos de instantánea.
Si la marca de agua está habilitada en el conector, no puedes configurar las transformaciones de enrutamiento de temas Debezium.
Actualmente, este conector no es compatible con la interfaz de PostgreSQL para Cloud Spanner.