Créer des connexions de flux de modifications vers Kafka

Cette page explique comment utiliser le connecteur Kafka pour consommer et transférer les données des flux de modification Spanner.

Concepts fondamentaux

Vous trouverez ci-dessous la description des concepts fondamentaux du connecteur Kafka.

Debezium

Debezium est un projet Open Source qui fournit une plate-forme de streaming de données à faible latence pour la capture des données de modification.

Connecteur Kafka

Le connecteur Kafka fournit une abstraction sur l'API Spanner pour publier des flux de modifications Spanner sur Kafka. Avec ce connecteur, vous n'avez pas besoin de gérer le cycle de vie des partitions de flux de modifications, ce qui est nécessaire lorsque vous utilisez directement l'API Spanner.

Le connecteur Kafka génère un événement de modification pour chaque mod d'enregistrement de modification de données et envoie les enregistrements d'événements de modification en aval dans un sujet Kafka distinct pour chaque table faisant l'objet d'un suivi du flux de modifications. Un mod d'enregistrement des modifications de données représente une modification unique (insertion, mise à jour ou suppression) qui a été capturée. Un même enregistrement de modification de données peut contenir plusieurs mods.

Sortie du connecteur Kafka

Le connecteur Kafka transfère les enregistrements des flux de modifications directement dans un sujet Kafka distinct. Le nom du sujet de sortie doit être connector_name.table_name. Si le sujet n'existe pas, le connecteur Kafka crée automatiquement un sujet sous ce nom.

Vous pouvez également configurer des transformations de routage de sujets pour rediriger les enregistrements vers les sujets que vous spécifiez. Si vous souhaitez utiliser le routage des sujets, désactivez la fonctionnalité de faible filigrane.

Ordre des enregistrements

Les enregistrements sont triés par code temporel de commit par clé primaire dans les sujets Kafka. Les enregistrements appartenant à différentes clés primaires garanties de commande. Les enregistrements ayant la même clé primaire sont stockés même partition du sujet Kafka. Pour traiter l'intégralité d'une transaction, vous pouvez utilisez également l'enregistrement des modifications de données server_transaction_id et number_of_records_in_transaction sur assembler une transaction Spanner.

Événements de modification

Le connecteur Kafka génère un événement de modification des données pour chaque INSERT, UPDATE, et DELETE. Chaque événement contient une clé et des valeurs pour la ligne modifiée.

Vous pouvez utiliser des convertisseurs Kafka Connect pour générer des événements de modification de données dans Formats Protobuf, AVRO, JSON ou JSON Schemaless. Si vous utilisez un convertisseur Kafka Connect qui produit des schémas, l'événement contient des schémas distincts pour la clé et les valeurs. Sinon, l'événement ne contient que la clé et les valeurs.

Le schéma de la clé ne change jamais. Le schéma des valeurs est une agrégation de toutes les colonnes que le flux de modifications a suivies depuis le début du connecteur.

Si vous configurez le connecteur pour générer des événements JSON, l'événement de modification de sortie contient cinq champs:

  • Le premier champ schema spécifie un schéma Kafka Connect qui décrit le schéma de clé Spanner.

  • Le premier champ payload présente la structure décrite dans le champ schema précédent et contient la clé de la ligne qui a été modifiée.

  • Le deuxième champ schema spécifie le schéma Kafka Connect qui décrit le schéma de la ligne modifiée.

  • Le deuxième champ payload a la structure décrite par le champ schema précédent et contient les données réelles de la ligne modifiée.

  • Le champ source est obligatoire. Il décrit les métadonnées de la source de l'événement.

Voici un exemple d'événement de modification de données :

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

Filigrane bas

La marque de données basse décrit l'heure T à laquelle le connecteur Kafka est garanti d'avoir diffusé et publié dans un sujet Kafka tous les événements dont l'horodatage est inférieur à T.

Vous pouvez activer le filigrane faible dans le connecteur Kafka à l'aide du Paramètre gcp.spanner.low-watermark.enabled. Ce paramètre est désactivé par défaut. Si le filigrane faible est activé, le champ low_watermark dans les données du flux de modifications l'enregistrement de modification est renseigné avec le filigrane faible actuel du connecteur Kafka code temporel.

Si aucun enregistrement n'est produit, le connecteur Kafka envoie périodiquement filigrane "pulsations de cœur" aux sujets de sortie Kafka détectés par le connecteur.

Ces pulsations de filigrane sont des enregistrements vides, à l'exception de low_watermark. Vous pouvez ensuite utiliser le filigrane faible pour effectuer des agrégations temporelles. Par exemple, vous pouvez utiliser le repère bas pour trier les événements par code temporel de validation pour les clés primaires.

Sujets des métadonnées

Le connecteur Kafka et le framework Kafka Connect créent plusieurs des sujets de métadonnées pour stocker les informations liées aux connecteurs. Il est déconseillé de modifier la configuration ou le contenu de ces sujets de métadonnées.

Voici les sujets de métadonnées:

  • _consumer_offsets: sujet créé automatiquement par Kafka. Stocke les décalages des consommateurs pour les consommateurs créés dans le connecteur Kafka.
  • _kafka-connect-offsets : sujet créé automatiquement par Kafka Connect. Stocke les décalages du connecteur.
  • _sync_topic_spanner_connector_connectorname: sujet créé automatiquement par le connecteur. Stocke les métadonnées concernant les partitions du flux de modifications.
  • _rebalancing_topic_spanner_connector_connectorname: sujet créé automatiquement par le connecteur. Permet de déterminer l'activité des tâches du connecteur.
  • _debezium-heartbeat.connectorname: sujet utilisé pour traiter les pulsations des flux de modifications Spanner.

Environnement d'exécution du connecteur Kafka

Le code suivant décrit l'environnement d'exécution du connecteur Kafka.

Évolutivité

Le connecteur Kafka est évolutif horizontalement et s'exécute sur une ou plusieurs tâches réparties entre plusieurs nœuds de calcul Kafka Connect.

Garanties de distribution des messages

Le connecteur Kafka est compatible avec la garantie de distribution de type "au moins une fois".

Tolérance aux pannes

Le connecteur Kafka est tolérant aux échecs. Lorsque le connecteur Kafka lit les modifications et produit des événements, il enregistre l'horodatage du dernier commit traité pour chaque partition de flux de modifications. Si le connecteur Kafka s'arrête pour une raison quelconque (y compris de communication, de réseau ou de logiciels), au redémarrage le connecteur Kafka continue la diffusion des enregistrements là où il s'était arrêté.

Le connecteur Kafka lit le schéma d'informations au code temporel de début du connecteur Kafka pour récupérer les informations de schéma. Par défaut, Spanner ne peut pas lire le schéma d'informations au moment de la lecture la durée de conservation de la version, qui est par défaut d'une heure. Si vous souhaitez démarrer le connecteur avant au bout d'une heure, vous devez augmenter la durée de conservation de la version de la base de données période.

Configurer le connecteur Kafka

Créer un flux de modifications

Pour savoir comment créer un flux de modifications, consultez Créer un flux de modifications. Pour passer aux étapes suivantes, vous devez disposer d'une instance Spanner avec un flux de modifications configuré.

Notez que si vous souhaitez que les colonnes modifiées et inchangées soient renvoyées à chaque événement de modification de données, utilisez le type de capture de valeur NEW_ROW. Pour en savoir plus, consultez la section Type de capture de valeur.

Installer le fichier JAR du connecteur Kafka

Avec Zookeeper, Kafka et Kafka Connect installés, les tâches restantes de déploiement d'un connecteur Kafka consistent à télécharger l'archive des plug-ins du connecteur, extrayez les fichiers JAR dans votre environnement Kafka Connect et ajoutez le avec les fichiers JAR dans le fichier plugin.path de Kafka Connect. Vous devez ensuite redémarrer votre processus Kafka Connect pour récupérer les nouveaux fichiers JAR.

Si vous utilisez des conteneurs immuables, vous pouvez extraire des images Images de conteneurs de Debezium pour Zookeeper, Kafka et Kafka Connect. Le connecteur Spanner est préinstallé sur l'image Kafka Connect.

Pour en savoir plus sur l'installation des fichiers JAR du connecteur Kafka basé sur Debezium, consultez Installer Debezium

Configurer le connecteur Kafka

Voici un exemple de configuration d'un connecteur Kafka. qui se connecte à un flux de modifications appelé changeStreamAll dans base de données users dans l'instance test-instance et dans le projet test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Cette configuration contient les éléments suivants:

  • Nom du connecteur lorsqu'il est enregistré auprès d'un service Kafka Connect.

  • Nom de cette classe de connecteur Spanner.

  • ID du projet.

  • ID de l'instance Spanner.

  • ID de la base de données Spanner.

  • Nom du flux de modifications.

  • Objet JSON de la clé de compte de service.

  • (Facultatif) Rôle de base de données Spanner à utiliser.

  • Nombre maximal de tâches.

Pour obtenir la liste complète des propriétés du connecteur, consultez la section Propriétés de configuration du connecteur Kafka.

Ajouter la configuration du connecteur à Kafka Connect

Pour lancer l'exécution d'un connecteur Spanner:

  1. Créez une configuration pour le connecteur Spanner.

  2. Utilisez l'API REST Kafka Connect pour ajouter cette configuration de connecteur à votre cluster Kafka Connect.

Vous pouvez envoyer cette configuration à l'aide d'une commande POST à une connexion Kafka en cours d'exécution Google Cloud. Par défaut, le service Kafka Connect s'exécute sur le port 8083. Le service enregistre la configuration et démarre la tâche du connecteur qui se connecte à la base de données Spanner et diffuse les enregistrements d'événements de modification dans les sujets Kafka.

Voici un exemple de commande POST :

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Exemple de réponse réussie:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Mettre à jour la configuration du connecteur Kafka

Pour mettre à jour la configuration du connecteur, envoyez une commande PUT au service Kafka Connect en cours d'exécution avec le même nom de connecteur.

Supposons qu'un connecteur s'exécute avec la configuration de la section précédente. Voici un exemple de commande PUT :

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Exemple de réponse réussie :

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Arrêter le connecteur Kafka

Pour arrêter le connecteur, envoyez une commande DELETE au gestionnaire Service Kafka Connect portant le même nom de connecteur.

Supposons qu'un connecteur s'exécute avec la configuration de la section précédente. Voici un exemple de commande DELETE:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Exemple de réponse réussie:

HTTP/1.1 204 No Content

Surveiller le connecteur Kafka

En plus des métriques Kafka Connect et Debezium standards, le connecteur Kafka exporte ses propres métriques :

  • MilliSecondsLowWatermark: filigrane bas actuel de la tâche du connecteur, en millisecondes. La un filigrane faible décrit le moment T auquel le connecteur est certain tous les événements dont le code temporel est inférieur à M

  • MilliSecondsLowWatermarkLag : décalage du repère bas par rapport à l'heure actuelle, en millisecondes. tous les événements avec un code temporel < T ont été diffusés.

  • LatencyLowWatermark<Variant>MilliSeconds : décalage du filigrane bas par rapport à l'heure actuelle en millisecondes. Les variantes P50, P95, P99, Moyen, Min et Max sont fournies.

  • LatencySpanner<Variant>MilliSeconds : la latence Spanner-commit-timestamp-to-connector-read. Les variantes P50, P95, P99, Moyenne, Min, Max sont fournies.

  • LatencyReadToEmit<Variant>MilliSeconds : la latence Spanner-read-timestamp-to-connector-emit. Les variantes P50, P95, P99, Moyen, Min et Max sont fournies.

  • LatencyCommitToEmit<Variant>tMilliSeconds : la latence Spanner-commit-timestamp-to-connector-emit. Les variantes P50, P95, P99, Moyenne, Min et Max sont fournies.

  • LatencyCommitToPublish<Variant>MilliSeconds : la latence Spanner-commit-timestamp-to Kafka-publish-timestamp. Les variantes P50, P95, P99, Moyenne, Min, Max sont fournies.

  • NumberOfChangeStreamPartitionsDetected : nombre total de partitions détectées par la tâche de connecteur actuelle.

  • NumberOfChangeStreamQueriesIssued : nombre total de requêtes de flux de modifications émises par la tâche en cours.

  • NumberOfActiveChangeStreamQueries : nombre actif de requêtes de flux de modifications détectées par la tâche de connecteur actuelle.

  • SpannerEventQueueCapacity : capacité totale de StreamEventQueue, une file d'attente qui stocke les éléments reçus à partir des requêtes de flux de modifications.

  • SpannerEventQueueCapacity : capacité StreamEventQueue restante.

  • TaskStateChangeEventQueueCapacity : capacité totale de TaskStateChangeEventQueue, une file d'attente qui stocke les événements qui se produisent dans le connecteur.

  • RemainingTaskStateChangeEventQueueCapacity : capacité TaskStateChangeEventQueue restante.

  • NumberOfActiveChangeStreamQueries : nombre actif de requêtes de flux de modifications détectées par la tâche de connecteur actuelle.

Propriétés de configuration du connecteur Kafka

Les propriétés de configuration suivantes sont obligatoires pour le connecteur:

  • name : nom unique du connecteur. Si vous tentez de vous enregistrer à nouveau avec le même nom, l'opération échouera. Cette propriété est requise par tous les connecteurs Kafka Connect.

  • connector.class : nom de la classe Java du connecteur. Utilisez toujours la valeur io.debezium.connector.spanner.SpannerConnector pour le connecteur Kafka.

  • tasks.max: nombre maximal de tâches à créer pour ce connecteur.

  • gcp.spanner.project.id : ID du projet

  • gcp.spanner.instance.id: ID de l'instance Spanner

  • gcp.spanner.database.id : ID de la base de données Spanner

  • gcp.spanner.change.stream : nom du flux de modifications Spanner

  • gcp.spanner.credentials.json: objet JSON de la clé de compte de service.

  • gcp.spanner.credentials.path: chemin d'accès à l'objet JSON de clé de compte de service. Obligatoire si le champ ci-dessus n'est pas rempli.

  • gcp.spanner.database.role : rôle de base de données Spanner pour utiliser. Cette étape n'est requise que lorsque le flux de modifications est sécurisé avec un contrôle d'accès précis. Le rôle de base de données doit disposer du droit SELECT sur le le flux de modifications et le droit EXECUTE sur l'accès en lecture du flux de modifications . Pour en savoir plus, consultez la page Contrôle précis des accès aux modifications flux.

Les propriétés de configuration avancées suivantes ont des valeurs par défaut qui fonctionnent dans la plupart situations et doivent donc rarement être spécifiés dans la configuration du connecteur:

  • gcp.spanner.low-watermark.enabled: indique si le filigrane faible est activé pour le connecteur. La valeur par défaut est "false".

  • gcp.spanner.low-watermark.update-period.ms: intervalle auquel le filigrane faible est mis à jour. La valeur par défaut est 1 000 ms.

  • heartbeat.interval.ms : intervalle de battement de cœur Spanner. La valeur par défaut est 300 000 (cinq minutes).

  • gcp.spanner.start.time : heure de début du connecteur. La valeur par défaut est l'heure actuelle.

  • gcp.spanner.end.time: heure de fin du connecteur La valeur par défaut est l'infini.

  • tables.exclude.list: tableaux pour lesquels exclure les événements de modification. La valeur par défaut est vide.

  • tables.include.list: tables pour lesquelles des événements de modification doivent être inclus. Si ce champ n'est pas renseigné, toutes les tables sont incluses. La valeur par défaut est vide.

  • gcp.spanner.stream.event.queue.capacity : capacité de la file d'attente d'événements Spanner. La valeur par défaut est 10 000.

  • connector.spanner.task.state.change.event.queue.capacity: capacité de la file d'attente des événements de changement d'état de la tâche. Valeur par défaut : 1 000.

  • connector.spanner.max.missed.heartbeats : nombre maximal de pulsations manquées pour une requête de flux de modifications avant qu'une exception ne soit générée. Valeur par défaut : 10

  • scaler.monitor.enabled : indique si l'autoscaling des tâches est activé. Valeur par défaut : "false".

  • tasks.desired.partitions : nombre de partitions de flux de modifications par tâche. Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 2.

  • tasks.min: nombre minimal de tâches Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 1.

  • connector.spanner.sync.topic: nom du sujet de synchronisation (sujet de connecteur interne utilisé pour stocker les communications entre les tâches). Valeur par défaut : _sync_topic_spanner_connector_connectorname si l'utilisateur n'a pas fourni de nom.

  • connector.spanner.sync.poll.duration : durée de l'interrogation pour le sujet de synchronisation. La valeur par défaut est 500 ms.

  • connector.spanner.sync.request.timeout.ms : délai avant expiration des requêtes envoyées au sujet de synchronisation. La valeur par défaut est 5 000 ms.

  • connector.spanner.sync.delivery.timeout.ms : délai avant expiration de la publication sur le thème de synchronisation. La valeur par défaut est 15 000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms : intervalle auquel les décalages sont validés pour le thème de synchronisation. La valeur par défaut est 60 000 ms.

  • connector.spanner.sync.publisher.wait.timeout: intervalle auquel les messages sont publiés sur le sujet de synchronisation. La valeur par défaut est 5 ms.

  • connector.spanner.rebalancing.topic : nom du sujet de rééquilibrage. Le sujet sur le rééquilibrage est un sujet de connecteur interne utilisé pour déterminer l'activité des tâches. Valeur par défaut : _rebalancing_topic_spanner_connector_connectorname si l'utilisateur n'a pas fourni de nom.

  • connector.spanner.rebalancing.poll.duration: durée de l'interrogation du sujet de rééquilibrage. La valeur par défaut est 5 000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: délai avant expiration pour la validation des décalages pour le sujet de rééquilibrage. La valeur par défaut est 5 000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: intervalle auquel les décalages sont appliqués pour le sujet de synchronisation. La valeur par défaut est 60 000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: délai d'attente d'une tâche avant le traitement d'un événement de rééquilibrage. La valeur par défaut est 1 000 ms.

Pour obtenir une liste encore plus détaillée des propriétés du connecteur configurables, consultez le dépôt GitHub.

Limites