Cette page explique comment utiliser le connecteur Kafka pour consommer et transférer des données de flux de modifications Cloud Spanner.
Concepts fondamentaux
La section suivante décrit les concepts fondamentaux du connecteur Kafka.
Debezium
Debezium est un projet Open Source qui fournit une plate-forme de streaming de données à faible latence pour la capture de données modifiées.
Connecteur Kafka
Le connecteur Kafka fournit une abstraction sur l’API Spanner pour publier des flux de modification Spanner sur Kafka. Grâce à ce connecteur, vous n'avez pas à gérer le cycle de vie des partitions des flux de modifications, ce qui est nécessaire lorsque vous utilisez directement l'API Spanner.
Le connecteur Kafka génère un événement de modification pour chaque mod d'enregistrement de modification de données et envoie les enregistrements d'événements de modification en aval vers un sujet Kafka distinct pour chaque table de modifications faisant l'objet d'un suivi des flux. Un mod d'enregistrement de modification des données représente une modification unique (insérer, mettre à jour ou supprimer) qui a été capturée. Un enregistrement de modification de données peut contenir plusieurs mods.
Sortie du connecteur Kafka
Le connecteur Kafka transfère les enregistrements de flux de modifications directement dans un sujet Kafka distinct. Le nom du sujet de sortie doit être connector_name
.table_name
. Si le sujet n'existe pas, le connecteur Kafka crée automatiquement un sujet sous ce nom.
Vous pouvez également configurer des transformations de routage des sujets pour réacheminer les enregistrements vers les sujets que vous spécifiez. Si vous souhaitez utiliser le routage du sujet, désactivez la fonctionnalité Film en filigrane.
Ordre des enregistrements
Les enregistrements sont classés par horodatage de commit par clé primaire dans les sujets Kafka. Les enregistrements appartenant à différentes clés primaires ne sont pas garantis. Les enregistrements avec la même clé primaire sont stockés dans la même partition de sujet Kafka. Si vous souhaitez traiter des transactions entières, vous pouvez également utiliser les champs server_transaction_id
et number_of_records_in_transaction
de l'enregistrement de modification des données pour assembler une transaction Spanner.
Modifier des événements
Le connecteur Kafka génère un événement de modification de données pour chaque opération INSERT
, UPDATE
et DELETE
. Chaque événement contient une clé et des valeurs pour la ligne modifiée.
Vous pouvez utiliser les convertisseurs Kafka Connect pour générer des événements de modification des données aux formats Protobuf
, AVRO
, JSON
ou JSON Schemaless
. Si vous utilisez un convertisseur Kafka Connect qui produit des schémas, l'événement contient des schémas distincts pour la clé et les valeurs. Sinon, l'événement ne contient que la clé et les valeurs.
Le schéma de la clé ne change jamais. Le schéma des valeurs est une agrégation de toutes les colonnes suivies par le flux de modifications depuis le début du connecteur.
Si vous configurez le connecteur pour générer des événements JSON, l'événement de modification de sortie contient cinq champs:
Le premier champ
schema
spécifie un schéma Kafka Connect qui décrit le schéma de clé Spanner.Le premier champ
payload
présente la structure décrite dans le champschema
précédent et contient la clé de la ligne qui a été modifiée.Le deuxième champ
schema
spécifie le schéma Kafka Connect qui décrit le schéma de la ligne modifiée.Le second champ
payload
présente la structure décrite dans le champschema
précédent et contient les données réelles de la ligne modifiée.Le champ
source
est obligatoire et décrit les métadonnées sources de l'événement.
Voici un exemple d'événement de modification de données:
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
Faible filigrane
Le filigrane faible décrit l'heure T à laquelle le connecteur Kafka est assuré d'avoir diffusé et publié dans un sujet Kafka tous les événements dont l'horodatage est < T.
Vous pouvez activer le filigrane faible dans le connecteur Kafka à l'aide du paramètre gcp.spanner.low-watermark.enabled
. Ce paramètre est désactivé par défaut. Si le filigrane faible est activé, le champ low_watermark
faible de l'enregistrement de modifications du flux de modification contient l'horodatage de filigrane faible actuel du connecteur Kafka.
Si aucun enregistrement n'est produit, le connecteur Kafka envoie périodiquement des "filigranes" de filigrane aux sujets de sortie Kafka détectés par le connecteur.
Ces pulsations en filigrane sont des enregistrements vides, à l'exception du champ low_watermark
. Vous pouvez ensuite utiliser le faible filigrane pour effectuer des agrégations temporelles.
Par exemple, vous pouvez utiliser le filigrane faible pour classer les événements par horodatage de commit sur les clés primaires.
Sujets de métadonnées
Le connecteur Kafka, ainsi que le framework Kafka Connect, créent plusieurs sujets de métadonnées pour stocker des informations liées aux connecteurs. Il est déconseillé de modifier la configuration ou le contenu de ces sujets de métadonnées.
Les métadonnées sont les suivantes:
_consumer_offsets
: sujet créé automatiquement par Kafka. Enregistre les décalages des clients créés dans le connecteur Kafka._kafka-connect-offsets
: sujet créé automatiquement par Kafka Connect. Stocke les décalages du connecteur._sync_topic_spanner_connector_connectorname
: sujet créé automatiquement par le connecteur. Stocke les métadonnées concernant les partitions de flux de modifications._rebalancing_topic_spanner_connector_connectorname
: sujet créé automatiquement par le connecteur. Permet de déterminer l'activité des tâches du connecteur._debezium-heartbeat.connectorname
: sujet permettant de traiter les pulsations des flux de modification Spanner.
Environnement d'exécution du connecteur Kafka
La section suivante décrit l'environnement d'exécution du connecteur Kafka.
Évolutivité
Le connecteur Kafka est évolutif horizontalement et s'exécute sur une ou plusieurs tâches réparties sur plusieurs nœuds de calcul Kafka Connect.
Garanties de distribution des messages
Le connecteur Kafka offre une garantie de distribution de type "au moins une fois".
Tolérance aux pannes
Le connecteur Kafka tolère les défaillances. Lorsque le connecteur Kafka lit les modifications et produit des événements, il enregistre le dernier horodatage de commit traité pour chaque partition de flux de modification. Si le connecteur Kafka s'arrête pour une raison quelconque (y compris des échecs de communication, des problèmes de réseau ou des défaillances logicielles), au redémarrage, le connecteur continue à diffuser les enregistrements là où il s'était arrêté.
Le connecteur Kafka lit le schéma d'informations à l'horodatage de début du connecteur Kafka afin de récupérer les informations sur le schéma. Par défaut, Spanner ne peut pas lire le schéma d'informations à des horodatages de lecture antérieurs à la durée de conservation de la version, définie sur une heure par défaut. Si vous souhaitez démarrer le connecteur depuis plus d'une heure dans le passé, vous devez augmenter la durée de conservation de la version de la base de données.
Configurer le connecteur Kafka
Créer un flux de modifications
Pour savoir comment créer un flux de modifications, consultez Créer un flux de modifications. Pour passer aux étapes suivantes, vous devez disposer d'une instance Spanner avec un flux de modifications configuré.
Notez que si vous souhaitez que les colonnes modifiées et non modifiées soient renvoyées lors de chaque événement de modification de données, utilisez le type de capture de valeur NEW_ROW
. Pour en savoir plus, consultez Type de capture de valeur.
Installer le fichier JAR du connecteur Kafka
Une fois Zookeeper, Kafka et Kafka Connect installés, il vous suffit de télécharger les archives du plug-in du connecteur, d'extraire les fichiers JAR dans votre environnement Kafka Connect et d'ajouter le répertoire contenant les fichiers JAR sur le plugin.path
de Kafka.
Vous devrez ensuite redémarrer votre processus Kafka Connect pour récupérer les nouveaux fichiers JAR.
Si vous travaillez avec des conteneurs immuables, vous pouvez extraire des images de conteneurs de Debezium pour Zookeeper, Kafka et Kafka Connect. Le connecteur Cloud Spanner est préinstallé sur l'image Kafka Connect.
Pour en savoir plus sur l'installation des fichiers JAR du connecteur Kafka basé sur Debezium, consultez la page Installer Debezium.
Configurer le connecteur Kafka
L'exemple suivant présente la configuration d'un connecteur Kafka qui se connecte à un flux de modifications appelé changeStreamAll
dans la base de données users
de l'instance test-instance
et du projet test-project
.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "tasks.max": "10" }
Cette configuration contient les éléments suivants:
Nom du connecteur lorsqu'il est enregistré auprès d'un service Kafka Connect.
Nom de cette classe de connecteur Spanner.
ID du projet.
ID de l'instance Spanner.
ID de la base de données Spanner.
Nom du flux de modifications.
Objet JSON pour la clé du compte de service.
Nombre maximal de tâches.
Pour obtenir la liste complète des propriétés du connecteur, consultez Propriétés du connecteur Kafka.
Ajouter la configuration du connecteur à Kafka Connect
Pour commencer à exécuter un connecteur Spanner:
Créez une configuration pour le connecteur Spanner.
Utilisez l'API REST Kafka Connect pour ajouter cette configuration de connecteur à votre cluster Kafka Connect.
Vous pouvez envoyer cette configuration à l'aide d'une commande POST
à un service Kafka Connect en cours d'exécution. Par défaut, le service Kafka Connect s'exécute sur le port 8083
.
Le service enregistre la configuration et lance la tâche de connecteur qui se connecte à la base de données Spanner et diffuse les enregistrements d'événements en sujets Kafka.
Voici un exemple de commande POST
:
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
Exemple de réponse réussie:
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Mettre à jour la configuration du connecteur Kafka
Pour mettre à jour la configuration du connecteur, envoyez une commande PUT
au service Kafka Connect en cours d'exécution avec le même nom de connecteur.
Supposons qu'un connecteur s'exécute avec la configuration de la section précédente. Voici un exemple de commande PUT
:
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Exemple de réponse réussie:
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Arrêter le connecteur Kafka
Pour arrêter le connecteur, envoyez une commande DELETE
au service Kafka Connect en cours d'exécution avec le même nom de connecteur.
Supposons qu'un connecteur s'exécute avec la configuration de la section précédente. Voici un exemple de commande DELETE
:
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
Exemple de réponse réussie:
HTTP/1.1 204 No Content
Surveiller le connecteur Kafka
En plus des métriques standards de Kafka Connect et Debezium, le connecteur Kafka exporte ses propres métriques:
MilliSecondsLowWatermark
: filigrane faible en cours de la tâche du connecteur (en millisecondes). Le filigrane faible décrit l'heure T à laquelle le connecteur est assuré d'avoir diffusé tous les événements avec un horodatage < T.MilliSecondsLowWatermarkLag
: retard du filigrane faible par rapport à l'heure actuelle, en millisecondes. Diffusion de tous les événements avec l'horodatage < T.LatencyLowWatermark<Variant>MilliSeconds
: retard du filigrane faible par rapport à l'heure actuelle, en millisecondes. Les variantes P50, P95, P99, Average, Min et Max sont fournies.LatencySpanner<Variant>MilliSeconds
: latence de lecture (commit-timestamp) vers Spanner-to-connector-read. Les variantes P50, P95, P99, moyenne, minimale et maximale sont fournies.LatencyReadToEmit<Variant>MilliSeconds
: latence de lecture-horodatage-vers-le-connecteur-emit. Les variantes P50, P95, P99, Average, Min et Max sont fournies.LatencyCommitToEmit<Variant>tMilliSeconds
: latence Spanner-commit-timestamp-to-connector-emit. Les variantes P50, P95, P99, Average, Min et Max sont fournies.LatencyCommitToPublish<Variant>MilliSeconds
: latence Spanner-commit-timestamp-to Kafka-publish-timestamp. Les variantes P50, P95, P99, moyenne, minimale et maximale sont fournies.NumberOfChangeStreamPartitionsDetected
: nombre total de partitions détectées par la tâche de connecteur actuelle.NumberOfChangeStreamQueriesIssued
: nombre total de requêtes de flux de modifications émises par la tâche en cours.NumberOfActiveChangeStreamQueries
: nombre actif de requêtes de flux de modifications détectées par la tâche de connecteur actuelle.SpannerEventQueueCapacity
: capacité totale deStreamEventQueue
, une file d'attente qui stocke les éléments reçus de requêtes de flux de modifications.SpannerEventQueueCapacity
: capacitéStreamEventQueue
restante.TaskStateChangeEventQueueCapacity
: capacité totale deTaskStateChangeEventQueue
, une file d'attente qui stocke les événements liés au connecteur.RemainingTaskStateChangeEventQueueCapacity
: capacitéTaskStateChangeEventQueue
restante.NumberOfActiveChangeStreamQueries
: nombre actif de requêtes de flux de modifications détectées par la tâche de connecteur actuelle.
Propriétés de configuration du connecteur Kafka
Les propriétés de configuration suivantes sont requises pour le connecteur:
name
: nom unique du connecteur. Si vous essayez de vous enregistrer avec le même nom, cela échouera. Cette propriété est requise par tous les connecteurs Kafka Connect.connector.class
: nom de la classe Java du connecteur. Utilisez toujours une valeurio.debezium.connector.spanner.SpannerConnector
pour le connecteur Kafka.tasks.max
: nombre maximal de tâches à créer pour ce connecteur.gcp.spanner.project.id
: ID du projet.gcp.spanner.instance.id
: ID de l'instance Spannergcp.spanner.database.id
: ID de la base de données Spannergcp.spanner.change.stream
: nom du flux de modifications Spannergcp.spanner.credentials.json
: objet JSON de clé de compte de service.gcp.spanner.credentials.path
: chemin d'accès au fichier au format JSON correspondant à la clé du compte de service. Obligatoire si le champ ci-dessus n'est pas renseigné.
Les propriétés de configuration avancées suivantes ont des valeurs par défaut qui fonctionnent dans la plupart des situations et doivent donc rarement être spécifiées dans la configuration du connecteur:
gcp.spanner.low-watermark.enabled
: indique si le filigrane faible est activé pour le connecteur. La valeur par défaut est "false".gcp.spanner.low-watermark.update-period.ms
: intervalle de mise à jour du filigrane faible. La valeur par défaut est de 1 000 ms.heartbeat.interval.ms
: intervalle de pulsation Spanner. La valeur par défaut est de 300 000 (cinq minutes).gcp.spanner.start.time
: heure de début du connecteur. L'heure actuelle est définie par défaut.gcp.spanner.end.time
: heure de fin du connecteur. Valeur par défaut : infini.tables.exclude.list
: tables pour lesquelles exclure les événements de modification. Par défaut, ce champ est vide.tables.include.list
: les tables pour lesquelles inclure des événements de modification. Si ce champ n'est pas renseigné, toutes les tables sont incluses. Par défaut, ce champ est vide.gcp.spanner.stream.event.queue.capacity
: capacité de la file d'attente d'événements Spanner. La valeur par défaut est 10 000.connector.spanner.task.state.change.event.queue.capacity
: capacité de la file d'attente des événements de modification de l'état de la tâche. La valeur par défaut est 1000.connector.spanner.max.missed.heartbeats
: nombre maximal de pulsations manquées pour une requête de flux de modifications avant qu'une exception ne soit générée. Valeur par défaut : 10scaler.monitor.enabled
: indique si l'autoscaling des tâches est activé. Valeur par défaut : "false".tasks.desired.partitions
: nombre préféré de partitions de flux de modification par tâche. Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 2.tasks.min
: nombre minimal de tâches. Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 1.connector.spanner.sync.topic
: nom du sujet de synchronisation, un sujet de connecteur interne utilisé pour stocker la communication entre les tâches. La valeur par défaut est_sync_topic_spanner_connector_connectorname
si l'utilisateur n'a pas fourni de nom.connector.spanner.sync.poll.duration
: durée du sondage pour le sujet de synchronisation. La valeur par défaut est 500 ms.connector.spanner.sync.request.timeout.ms
: délai avant expiration des requêtes envoyées au sujet de synchronisation. La valeur par défaut est 5 000 ms.connector.spanner.sync.delivery.timeout.ms
: délai avant expiration de la publication dans le sujet de synchronisation. La valeur par défaut est 15 000 ms.connector.spanner.sync.commit.offsets.interval.ms
: intervalle de commit des commits pour le sujet de synchronisation. La valeur par défaut est de 60 000 ms.connector.spanner.sync.publisher.wait.timeout
: intervalle de publication des messages dans le sujet de synchronisation. La valeur par défaut est de 5 ms.connector.spanner.rebalancing.topic
: nom du sujet de rééquilibrage. Le sujet de rééquilibrage est un sujet de connecteur interne utilisé pour déterminer l'activité des tâches. La valeur par défaut est_rebalancing_topic_spanner_connector_connectorname
si l'utilisateur n'a pas fourni de nom.connector.spanner.rebalancing.poll.duration
: durée du sondage pour le sujet de rééquilibrage. La valeur par défaut est 5 000 ms.connector.spanner.rebalancing.commit.offsets.timeout
: délai avant expiration des commits pour le sujet de rééquilibrage. La valeur par défaut est 5 000 ms.connector.spanner.rebalancing.commit.offsets.interval.ms
: intervalle de commit des commits pour le sujet de synchronisation. La valeur par défaut est de 60 000 ms.connector.spanner.rebalancing.task.waiting.timeout
: durée pendant laquelle une tâche attend avant de traiter un événement de rééquilibrage. La valeur par défaut est 1 000 ms.
Pour obtenir une liste encore plus détaillée des propriétés de connecteur configurables, consultez le dépôt GitHub.
Limites
Le connecteur n'est pas compatible avec la diffusion d'instantanés.
Si le filigrane est activé dans le connecteur, vous ne pouvez pas configurer les transformations de routage des sujets Debezium.
Ce connecteur n'est actuellement pas compatible avec l'interface PostgreSQL pour Cloud Spanner.