Créer des connexions de flux de modifications à Kafka

Cette page explique comment utiliser le connecteur Kafka pour consommer et transférer les données des flux de modifications Spanner.

Concepts fondamentaux

Vous trouverez ci-dessous une description des concepts de base du connecteur Kafka.

Debezium

Debezium est un projet Open Source qui fournit une plate-forme de streaming de données à faible latence pour la capture des données de modification.

Connecteur Kafka

Le connecteur Kafka fournit une abstraction sur l'API Spanner pour publier des flux de modifications Spanner dans Kafka. Avec ce connecteur, vous n'avez pas besoin de gérer le cycle de vie des partitions de flux de modifications, ce qui est nécessaire lorsque vous utilisez directement l'API Spanner.

Le connecteur Kafka génère un événement de modification pour chaque modification d'enregistrement de données et envoie des enregistrements d'événements de modification en aval dans un sujet Kafka distinct pour chaque table suivie par flux de modifications. Un mod d'enregistrement de modification de données représente une seule modification (insertion, mise à jour ou suppression) capturée. Un seul enregistrement de modification de données peut contenir plusieurs modifications.

Sortie du connecteur Kafka

Le connecteur Kafka transfère les enregistrements des flux de modifications directement dans un sujet Kafka distinct. Le nom du sujet de sortie doit être connector_name.table_name. Si le sujet n'existe pas, le connecteur Kafka crée automatiquement un sujet sous ce nom.

Vous pouvez également configurer des transformations de routage de sujets pour rediriger les enregistrements vers les sujets que vous spécifiez. Si vous souhaitez utiliser le routage par sujet, désactivez la fonctionnalité de filigrane bas.

Ordre des enregistrements

Les enregistrements sont triés par code temporel de commit par clé primaire dans les sujets Kafka. Les enregistrements appartenant à différentes clés primaires ne sont pas garantis en termes d'ordre. Les enregistrements avec la même clé primaire sont stockés dans la même partition de sujet Kafka. Si vous souhaitez traiter des transactions complètes, vous pouvez également utiliser les champs server_transaction_id et number_of_records_in_transaction de l'enregistrement de modification des données pour assembler une transaction Spanner.

Événements de modification

Le connecteur Kafka génère un événement de modification de données pour chaque opération INSERT, UPDATE et DELETE. Chaque événement contient une clé et des valeurs pour la ligne modifiée.

Vous pouvez utiliser des convertisseurs Kafka Connect pour générer des événements de modification de données dans les formats Protobuf, AVRO, JSON ou JSON Schemaless. Si vous utilisez un convertisseur Kafka Connect qui produit des schémas, l'événement contient des schémas distincts pour la clé et les valeurs. Sinon, l'événement ne contient que la clé et les valeurs.

Le schéma de la clé ne change jamais. Le schéma des valeurs est une agrégation de toutes les colonnes que le flux de modifications a suivies depuis le début du connecteur.

Si vous configurez le connecteur pour générer des événements JSON, l'événement de modification de sortie contient cinq champs:

  • Le premier champ schema spécifie un schéma Kafka Connect qui décrit le schéma de clé Spanner.

  • Le premier champ payload a la structure décrite par le champ schema précédent et contient la clé de la ligne modifiée.

  • Le deuxième champ schema spécifie le schéma Kafka Connect qui décrit le schéma de la ligne modifiée.

  • Le deuxième champ payload a la structure décrite par le champ schema précédent et contient les données réelles de la ligne modifiée.

  • Le champ source est obligatoire et décrit les métadonnées sources de l'événement.

Voici un exemple d'événement de modification de données:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

Filigrane bas

La marque de données basse décrit l'heure T à laquelle le connecteur Kafka est garanti d'avoir diffusé et publié dans un sujet Kafka tous les événements dont l'horodatage est inférieur à T.

Vous pouvez activer le repère bas dans le connecteur Kafka à l'aide du paramètre gcp.spanner.low-watermark.enabled. Ce paramètre est désactivé par défaut. Si la marque de bas niveau est activée, le champ low_watermark de l'enregistrement de modification des données du flux de modifications est renseigné avec le code temporel de la marque de bas niveau actuelle du connecteur Kafka.

Si aucun enregistrement n'est généré, le connecteur Kafka envoie des "battements de cœur" de balise temporelle périodiques aux sujets de sortie Kafka détectés par le connecteur.

Ces battements de cœur du filigrane sont des enregistrements vides, à l'exception du champ low_watermark. Vous pouvez ensuite utiliser le repère bas pour effectuer des agrégations basées sur le temps. Par exemple, vous pouvez utiliser le repère bas pour trier les événements par code temporel de validation pour les clés primaires.

Thèmes sur les métadonnées

Le connecteur Kafka, ainsi que le framework Kafka Connect, créent plusieurs sujets de métadonnées pour stocker des informations sur le connecteur. Il est déconseillé de modifier la configuration ou le contenu de ces sujets de métadonnées.

Voici les sujets de métadonnées:

  • _consumer_offsets: sujet créé automatiquement par Kafka. Stocke les décalages des consommateurs pour les consommateurs créés dans le connecteur Kafka.
  • _kafka-connect-offsets: sujet créé automatiquement par Kafka Connect. Stocke les décalages du connecteur.
  • _sync_topic_spanner_connector_connectorname: sujet créé automatiquement par le connecteur. Stocke les métadonnées concernant les partitions de flux de modifications.
  • _rebalancing_topic_spanner_connector_connectorname: sujet créé automatiquement par le connecteur. Permet de déterminer l'état de la tâche du connecteur.
  • _debezium-heartbeat.connectorname: sujet utilisé pour traiter les battements de cœur du flux de modifications Spanner.

Environnement d'exécution du connecteur Kafka

Ce qui suit décrit l'environnement d'exécution du connecteur Kafka.

Évolutivité

Le connecteur Kafka est évolutif horizontalement et s'exécute sur une ou plusieurs tâches réparties entre plusieurs nœuds de calcul Kafka Connect.

Garanties de distribution des messages

Le connecteur Kafka est compatible avec la garantie de distribution de type "au moins une fois".

Tolérance aux pannes

Le connecteur Kafka est tolérant aux échecs. Lorsque le connecteur Kafka lit les modifications et produit des événements, il enregistre l'horodatage du dernier commit traité pour chaque partition de flux de modifications. Si le connecteur Kafka s'arrête pour une raison quelconque (y compris en cas de défaillance de communication, de problème réseau ou de défaillance logicielle), le connecteur Kafka continue de diffuser les enregistrements à partir de l'endroit où il s'était arrêté.

Le connecteur Kafka lit le schéma d'informations au code temporel de début du connecteur Kafka pour récupérer les informations de schéma. Par défaut, Spanner ne peut pas lire le schéma d'informations à des horodatages de lecture antérieurs à la durée de conservation des versions, qui est par défaut d'une heure. Si vous souhaitez démarrer le connecteur avant une heure, vous devez augmenter la durée de conservation de la version de la base de données.

Configurer le connecteur Kafka

Créer un flux de modifications

Pour en savoir plus sur la création d'un flux de modifications, consultez la section Créer un flux de modifications. Pour passer aux étapes suivantes, vous devez disposer d'une instance Spanner avec un flux de modifications configuré.

Notez que si vous souhaitez que les colonnes modifiées et inchangées soient renvoyées à chaque événement de modification de données, utilisez le type de capture de valeur NEW_ROW. Pour en savoir plus, consultez la section Type de capture de valeur.

Installer le fichier JAR du connecteur Kafka

Une fois Zookeeper, Kafka et Kafka Connect installés, les tâches restantes pour déployer un connecteur Kafka consistent à télécharger l'archive du plug-in du connecteur, à extraire les fichiers JAR dans votre environnement Kafka Connect et à ajouter le répertoire contenant les fichiers JAR à plugin.path de Kafka Connect. Vous devez ensuite redémarrer votre processus Kafka Connect pour qu'il récupère les nouveaux fichiers JAR.

Si vous travaillez avec des conteneurs immuables, vous pouvez extraire des images à partir des images de conteneur de Debezium pour Zookeeper, Kafka et Kafka Connect. Le connecteur Spanner est préinstallé sur l'image Kafka Connect.

Pour savoir comment installer les fichiers JAR de connecteur Kafka basés sur Debezium, consultez la section Installer Debezium.

Configurer le connecteur Kafka

Vous trouverez ci-dessous un exemple de configuration d'un connecteur Kafka qui se connecte à un flux de modifications appelé changeStreamAll dans la base de données users de l'instance test-instance et du projet test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Cette configuration contient les éléments suivants:

  • Nom du connecteur lorsqu'il est enregistré auprès d'un service Kafka Connect.

  • Nom de cette classe de connecteur Spanner.

  • ID du projet.

  • ID de l'instance Spanner.

  • ID de la base de données Spanner.

  • Nom du flux de modifications.

  • Objet JSON de la clé de compte de service.

  • (Facultatif) Rôle de base de données Spanner à utiliser.

  • Nombre maximal de tâches.

Pour obtenir la liste complète des propriétés du connecteur, consultez la section Propriétés de configuration du connecteur Kafka.

Ajouter la configuration du connecteur à Kafka Connect

Pour commencer à exécuter un connecteur Spanner:

  1. Créez une configuration pour le connecteur Spanner.

  2. Utilisez l'API REST Kafka Connect pour ajouter cette configuration de connecteur à votre cluster Kafka Connect.

Vous pouvez envoyer cette configuration avec une commande POST à un service Kafka Connect en cours d'exécution. Par défaut, le service Kafka Connect s'exécute sur le port 8083. Le service enregistre la configuration et démarre la tâche du connecteur qui se connecte à la base de données Spanner et diffuse les enregistrements d'événements de modification dans les sujets Kafka.

Voici un exemple de commande POST:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Exemple de réponse réussie:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Mettre à jour la configuration du connecteur Kafka

Pour mettre à jour la configuration du connecteur, envoyez une commande PUT au service Kafka Connect en cours d'exécution avec le même nom de connecteur.

Supposons qu'un connecteur s'exécute avec la configuration de la section précédente. Voici un exemple de commande PUT:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Exemple de réponse réussie:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Arrêter le connecteur Kafka

Pour arrêter le connecteur, envoyez une commande DELETE au service Kafka Connect en cours d'exécution avec le même nom de connecteur.

Supposons qu'un connecteur s'exécute avec la configuration de la section précédente. Voici un exemple de commande DELETE:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Exemple de réponse réussie:

HTTP/1.1 204 No Content

Surveiller le connecteur Kafka

En plus des métriques Kafka Connect et Debezium standards, le connecteur Kafka exporte ses propres métriques:

  • MilliSecondsLowWatermark: niveau de référence bas actuel de la tâche du connecteur, en millisecondes. Le filigrane bas décrit l'heure T à laquelle le connecteur est garanti d'avoir diffusé tous les événements avec un code temporel < T.

  • MilliSecondsLowWatermarkLag: décalage du repère bas par rapport à l'heure actuelle, en millisecondes. tous les événements avec un code temporel < T ont été diffusés.

  • LatencyLowWatermark<Variant>MilliSeconds: décalage du filigrane bas par rapport à l'heure actuelle en millisecondes. Les variantes P50, P95, P99, Moyenne, Min et Max sont fournies.

  • LatencySpanner<Variant>MilliSeconds: la latence Spanner-commit-timestamp-to-connector-read. Les variantes P50, P95, P99, Moyenne, Min et Max sont fournies.

  • LatencyReadToEmit<Variant>MilliSeconds: la latence Spanner-read-timestamp-to-connector-emit. Les variantes P50, P95, P99, Moyenne, Min et Max sont fournies.

  • LatencyCommitToEmit<Variant>tMilliSeconds: la latence Spanner-commit-timestamp-to-connector-emit. Les variantes P50, P95, P99, Moyenne, Min et Max sont fournies.

  • LatencyCommitToPublish<Variant>MilliSeconds: la latence Spanner-commit-timestamp-to Kafka-publish-timestamp. Les variantes P50, P95, P99, Moyenne, Min et Max sont fournies.

  • NumberOfChangeStreamPartitionsDetected: nombre total de partitions détectées par la tâche de connecteur actuelle.

  • NumberOfChangeStreamQueriesIssued: nombre total de requêtes de flux de modifications émises par la tâche en cours.

  • NumberOfActiveChangeStreamQueries: nombre actif de requêtes de flux de modifications détectées par la tâche de connecteur actuelle.

  • SpannerEventQueueCapacity: capacité totale de StreamEventQueue, une file d'attente qui stocke les éléments reçus à partir des requêtes de flux de modifications.

  • SpannerEventQueueCapacity: capacité StreamEventQueue restante.

  • TaskStateChangeEventQueueCapacity: capacité totale de TaskStateChangeEventQueue, une file d'attente qui stocke les événements qui se produisent dans le connecteur.

  • RemainingTaskStateChangeEventQueueCapacity: capacité TaskStateChangeEventQueue restante.

  • NumberOfActiveChangeStreamQueries: nombre actif de requêtes de flux de modifications détectées par la tâche de connecteur actuelle.

Propriétés de configuration du connecteur Kafka

Voici les propriétés de configuration obligatoires pour le connecteur:

  • name: nom unique du connecteur. Toute tentative d'enregistrement avec le même nom échoue. Cette propriété est requise par tous les connecteurs Kafka Connect.

  • connector.class: nom de la classe Java du connecteur. Utilisez toujours la valeur io.debezium.connector.spanner.SpannerConnector pour le connecteur Kafka.

  • tasks.max: nombre maximal de tâches à créer pour ce connecteur.

  • gcp.spanner.project.id : ID du projet

  • gcp.spanner.instance.id: ID de l'instance Spanner

  • gcp.spanner.database.id: ID de la base de données Spanner

  • gcp.spanner.change.stream: nom du flux de modifications Spanner

  • gcp.spanner.credentials.json: objet JSON de la clé de compte de service.

  • gcp.spanner.credentials.path: chemin d'accès au fichier de l'objet JSON de la clé de compte de service. Obligatoire si le champ ci-dessus n'est pas fourni.

  • gcp.spanner.database.role : rôle de base de données Spanner à utiliser. Cette étape n'est requise que lorsque le flux de modifications est sécurisé avec un contrôle d'accès précis. Le rôle de base de données doit disposer du droit SELECT sur le flux de modifications et du droit EXECUTE sur la fonction de lecture du flux de modifications. Pour en savoir plus, consultez la section Contrôle des accès précis pour les flux de modifications.

Les propriétés de configuration avancées suivantes ont des valeurs par défaut qui fonctionnent dans la plupart des situations et qui doivent donc rarement être spécifiées dans la configuration du connecteur:

  • gcp.spanner.low-watermark.enabled: indique si le seuil de faible eau est activé pour le connecteur. La valeur par défaut est "false".

  • gcp.spanner.low-watermark.update-period.ms: intervalle auquel le seuil bas est mis à jour. La valeur par défaut est 1 000 ms.

  • heartbeat.interval.ms: intervalle de battement de cœur Spanner. La valeur par défaut est 300000 (cinq minutes).

  • gcp.spanner.start.time: heure de début du connecteur. La valeur par défaut est l'heure actuelle.

  • gcp.spanner.end.time: heure de fin du connecteur. La valeur par défaut est l'infini.

  • tables.exclude.list: tables pour lesquelles les événements de modification doivent être exclus. La valeur par défaut est vide.

  • tables.include.list: tables pour lesquelles inclure des événements de modification. Si ce champ n'est pas renseigné, toutes les tables sont incluses. La valeur par défaut est vide.

  • gcp.spanner.stream.event.queue.capacity: capacité de la file d'attente d'événements Spanner. La valeur par défaut est 10 000.

  • connector.spanner.task.state.change.event.queue.capacity: capacité de la file d'attente d'événements de changement d'état de la tâche. Valeur par défaut : 1 000.

  • connector.spanner.max.missed.heartbeats: nombre maximal de pulsations manquées pour une requête de flux de modifications avant qu'une exception ne soit générée. Valeur par défaut : 10

  • scaler.monitor.enabled: indique si l'autoscaling des tâches est activé. Valeur par défaut : "false".

  • tasks.desired.partitions: nombre de partitions de flux de modifications par tâche. Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 2.

  • tasks.min: nombre minimal de tâches. Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 1.

  • connector.spanner.sync.topic: nom du sujet de synchronisation, un sujet de connecteur interne utilisé pour stocker la communication entre les tâches. La valeur par défaut est _sync_topic_spanner_connector_connectorname si l'utilisateur n'a pas fourni de nom.

  • connector.spanner.sync.poll.duration: durée de l'interrogation pour le sujet de synchronisation. Valeur par défaut : 500 ms.

  • connector.spanner.sync.request.timeout.ms: délai avant expiration des requêtes envoyées au sujet de synchronisation. La valeur par défaut est 5 000 ms.

  • connector.spanner.sync.delivery.timeout.ms: délai avant expiration de la publication sur le thème de synchronisation. La valeur par défaut est 15 000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: intervalle auquel les décalages sont validés pour le thème de synchronisation. La valeur par défaut est 60 000 ms.

  • connector.spanner.sync.publisher.wait.timeout: intervalle auquel les messages sont publiés dans le sujet de synchronisation. La valeur par défaut est 5 ms.

  • connector.spanner.rebalancing.topic: nom du sujet de rééquilibrage. Le sujet de rééquilibrage est un sujet de connecteur interne utilisé pour déterminer l'état de la tâche. La valeur par défaut est _rebalancing_topic_spanner_connector_connectorname si l'utilisateur n'a pas fourni de nom.

  • connector.spanner.rebalancing.poll.duration: durée de l'interrogation pour le sujet de rééquilibrage. La valeur par défaut est 5 000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: délai avant expiration de l'enregistrement des décalages pour le sujet de rééquilibrage. La valeur par défaut est 5 000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: intervalle auquel les décalages sont validés pour le thème de synchronisation. La valeur par défaut est 60 000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: durée d'attente d'une tâche avant de traiter un événement de rééquilibrage. La valeur par défaut est 1 000 ms.

Pour obtenir une liste encore plus détaillée des propriétés du connecteur configurables, consultez le dépôt GitHub.

Limites