Créer des connexions de flux de modifications vers Kafka

Cette page explique comment utiliser le connecteur Kafka pour consommer et transférer les données des flux de modifications Spanner.

Concepts fondamentaux

Vous trouverez ci-dessous une description des concepts fondamentaux du connecteur Kafka.

Debezium

Debezium est un projet Open Source qui fournit une plate-forme de diffusion de données à faible latence pour la capture de données modifiées.

Connecteur Kafka

Le connecteur Kafka fournit une abstraction sur l'API Spanner afin de publier les flux de modifications Spanner dans Kafka. Avec ce connecteur, vous n'avez pas à gérer le cycle de vie de la partition des flux de modifications, ce qui est nécessaire lorsque vous utilisez directement l'API Spanner.

Le connecteur Kafka génère un événement de modification pour chaque mod d'enregistrement de modifications de données et envoie les enregistrements d'événements de modification en aval dans un sujet Kafka distinct pour chaque table faisant l'objet d'un suivi du flux de modifications. Un mod d'enregistrement des modifications de données représente une modification unique (insertion, mise à jour ou suppression) qui a été capturée. Un même enregistrement de modification de données peut contenir plusieurs mods.

Sortie du connecteur Kafka

Le connecteur Kafka transfère les enregistrements des flux de modifications directement dans un sujet Kafka distinct. Le nom du sujet de sortie doit être connector_name.table_name. Si le sujet n'existe pas, le connecteur Kafka crée automatiquement un sujet sous ce nom.

Vous pouvez également configurer des transformations de routage de sujet pour réacheminer les enregistrements vers les sujets que vous spécifiez. Si vous souhaitez utiliser le routage des sujets, désactivez la fonctionnalité de faible filigrane.

Tri des enregistrements

Les enregistrements sont classés par code temporel de commit par clé primaire dans les sujets Kafka. Les enregistrements appartenant à différentes clés primaires n'ont pas de garantie de classement. Les enregistrements ayant la même clé primaire sont stockés dans la même partition du sujet Kafka. Si vous souhaitez traiter des transactions entières, vous pouvez également utiliser les champs server_transaction_id et number_of_records_in_transaction de l'enregistrement des modifications de données pour assembler une transaction Spanner.

Événements de modification

Le connecteur Kafka génère un événement de modification de données pour chaque opération INSERT, UPDATE et DELETE. Chaque événement contient une clé et des valeurs pour la ligne modifiée.

Vous pouvez utiliser des convertisseurs Kafka Connect pour générer des événements de modification de données aux formats Protobuf, AVRO, JSON ou JSON Schemaless. Si vous utilisez un convertisseur Kafka Connect qui génère des schémas, l'événement contient des schémas distincts pour la clé et les valeurs. Sinon, l'événement ne contient que la clé et les valeurs.

Le schéma de la clé ne change jamais. Le schéma des valeurs est une combinaison de toutes les colonnes suivies par le flux de modifications depuis l'heure de début du connecteur.

Si vous configurez le connecteur pour générer des événements JSON, l'événement de modification de sortie contient cinq champs:

  • Le premier champ schema spécifie un schéma Kafka Connect qui décrit le schéma de clé Spanner.

  • Le premier champ payload présente la structure décrite dans le champ schema précédent et contient la clé de la ligne qui a été modifiée.

  • Le deuxième champ schema spécifie le schéma Kafka Connect qui décrit le schéma de la ligne modifiée.

  • Le deuxième champ payload présente la structure décrite dans le champ schema précédent, et il contient les données réelles de la ligne modifiée.

  • Le champ source est obligatoire. Il décrit les métadonnées de la source de l'événement.

Voici un exemple d'événement de modification de données:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

Filigrane faible

Le filigrane bas décrit l'heure T à laquelle le connecteur Kafka est certain d'avoir diffusé et publié sur un sujet Kafka tous les événements dont le code temporel est inférieur à T.

Vous pouvez activer le filigrane faible dans le connecteur Kafka à l'aide du paramètre gcp.spanner.low-watermark.enabled. Ce paramètre est désactivé par défaut. Si le filigrane faible est activé, le champ low_watermark de l'enregistrement de modification des données du flux de modifications est renseigné avec l'horodatage actuel du filigrane faible du connecteur Kafka.

Si aucun enregistrement n'est produit, le connecteur Kafka envoie régulièrement des "pulsations" de filigrane aux sujets de sortie Kafka détectés par le connecteur.

Ces pulsations de filigrane sont des enregistrements vides, à l'exception du champ low_watermark. Vous pouvez ensuite utiliser le filigrane faible pour effectuer des agrégations temporelles. Par exemple, vous pouvez utiliser le faible filigrane pour classer les événements par horodatage de commit sur les clés primaires.

Sujets des métadonnées

Le connecteur Kafka, ainsi que le framework Kafka Connect, créent plusieurs sujets de métadonnées pour stocker les informations liées au connecteur. Il est déconseillé de modifier la configuration ou le contenu de ces sujets de métadonnées.

Voici les sujets de métadonnées:

  • _consumer_offsets: sujet créé automatiquement par Kafka. Stocke les décalages des consommateurs créés dans le connecteur Kafka.
  • _kafka-connect-offsets: sujet créé automatiquement par Kafka Connect. Stocke les décalages du connecteur.
  • _sync_topic_spanner_connector_connectorname: sujet créé automatiquement par le connecteur. Stocke les métadonnées concernant les partitions du flux de modifications.
  • _rebalancing_topic_spanner_connector_connectorname: sujet créé automatiquement par le connecteur. Permet de déterminer l'activité des tâches du connecteur.
  • _debezium-heartbeat.connectorname: sujet utilisé pour traiter les pulsations des flux de modifications Spanner.

Environnement d'exécution du connecteur Kafka

La section suivante décrit l'environnement d'exécution du connecteur Kafka.

Évolutivité

Le connecteur Kafka est évolutif horizontalement et s'exécute sur une ou plusieurs tâches réparties sur plusieurs nœuds de calcul Kafka Connect.

Garanties de distribution des messages

Le connecteur Kafka offre une garantie de distribution de type "au moins une fois".

Tolérance aux pannes

Le connecteur Kafka tolère les défaillances. Lorsque le connecteur Kafka lit les modifications et génère des événements, il enregistre le dernier code temporel de commit traité pour chaque partition du flux de modifications. Si le connecteur Kafka s'arrête pour une raison quelconque (y compris des échecs de communication, des problèmes réseau ou des défaillances logicielles), il reprend la diffusion des enregistrements là où il s'était arrêté lors de son redémarrage.

Le connecteur Kafka lit le schéma d'informations au niveau de l'horodatage de début du connecteur Kafka pour récupérer les informations de schéma. Par défaut, Spanner ne peut pas lire le schéma d'informations lorsque les horodatages de lecture sont antérieurs à la durée de conservation de la version, qui est fixée par défaut à une heure. Si vous souhaitez démarrer le connecteur il y a plus d'une heure, vous devez augmenter la durée de conservation de la version de la base de données.

Configurer le connecteur Kafka

Créer un flux de modifications

Pour savoir comment créer un flux de modifications, consultez Créer un flux de modifications. Pour passer aux étapes suivantes, vous devez disposer d'une instance Spanner avec un flux de modifications configuré.

Notez que si vous souhaitez que les colonnes modifiées et non modifiées soient renvoyées à chaque événement de modification de données, utilisez le type de capture de valeur NEW_ROW. Pour en savoir plus, consultez la section Type de capture de valeur.

Installer le fichier JAR du connecteur Kafka

Une fois Zookeeper, Kafka et Kafka Connect installés, les tâches restantes pour déployer un connecteur Kafka consistent à télécharger l'archive de plug-ins du connecteur, à extraire les fichiers JAR dans votre environnement Kafka Connect et à ajouter le répertoire contenant ces fichiers au fichier plugin.path de Kafka Connect. Vous devez ensuite redémarrer votre processus Kafka Connect pour récupérer les nouveaux fichiers JAR.

Si vous utilisez des conteneurs immuables, vous pouvez extraire des images des images de conteneurs de Debezium pour Zoookeeper, Kafka et Kafka Connect. Le connecteur Spanner est préinstallé sur l'image Kafka Connect.

Pour en savoir plus sur l'installation des fichiers JAR du connecteur Kafka basé sur Debezium, consultez la page Installer Debezium.

Configurer le connecteur Kafka

Voici un exemple de configuration d'un connecteur Kafka qui se connecte à un flux de modifications appelé changeStreamAll dans la base de données users dans l'instance test-instance et dans le projet test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Cette configuration contient les éléments suivants:

  • Nom du connecteur lorsqu'il est enregistré auprès d'un service Kafka Connect.

  • Nom de cette classe de connecteur Spanner.

  • ID du projet.

  • ID d'instance Spanner.

  • ID de la base de données Spanner.

  • Nom du flux de modifications.

  • Objet JSON de la clé de compte de service.

  • (Facultatif) Rôle de base de données Spanner à utiliser.

  • Nombre maximal de tâches.

Pour obtenir la liste complète des propriétés du connecteur, consultez Propriétés de configuration du connecteur Kafka.

Ajouter la configuration du connecteur à Kafka Connect

Pour lancer l'exécution d'un connecteur Spanner:

  1. Créez une configuration pour le connecteur Spanner.

  2. Utilisez l'API REST Kafka Connect pour ajouter cette configuration de connecteur à votre cluster Kafka Connect.

Vous pouvez envoyer cette configuration à l'aide d'une commande POST à un service Kafka Connect en cours d'exécution. Par défaut, le service Kafka Connect s'exécute sur le port 8083. Le service enregistre la configuration et démarre la tâche de connecteur qui se connecte à la base de données Spanner et diffuse les enregistrements d'événements de modification dans les sujets Kafka.

Voici un exemple de commande POST:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Exemple de réponse réussie:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Mettre à jour la configuration du connecteur Kafka

Pour mettre à jour la configuration du connecteur, envoyez une commande PUT au service Kafka Connect en cours d'exécution avec le même nom de connecteur.

Supposons qu'un connecteur soit en cours d'exécution avec la configuration de la section précédente. Voici un exemple de commande PUT:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Exemple de réponse réussie:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Arrêter le connecteur Kafka

Pour arrêter le connecteur, envoyez au service Kafka Connect en cours d'exécution une commande DELETE portant le même nom de connecteur.

Supposons qu'un connecteur soit en cours d'exécution avec la configuration de la section précédente. Voici un exemple de commande DELETE:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Exemple de réponse réussie:

HTTP/1.1 204 No Content

Surveiller le connecteur Kafka

En plus des métriques standards Kafka Connect et Debezium, le connecteur Kafka exporte ses propres métriques:

  • MilliSecondsLowWatermark: filigrane bas actuel de la tâche du connecteur, en millisecondes. Le filigrane faible décrit le moment T auquel le connecteur a la certitude d'avoir transmis tous les événements dont l'horodatage est inférieur à T.

  • MilliSecondsLowWatermarkLag: délai du filigrane faible par rapport à l'heure actuelle, en millisecondes. Diffusion de tous les événements dont l'horodatage est inférieur à T

  • LatencyLowWatermark<Variant>MilliSeconds: délai en millisecondes du filigrane faible par rapport à l'heure actuelle. Les variantes P50, P95, P99, Moyen, Min et Max sont fournies.

  • LatencySpanner<Variant>MilliSeconds: latence de Spanner-commit-timestamp-to-connector-read. Les variantes P50, P95, P99, Moyenne, Min, Max sont fournies.

  • LatencyReadToEmit<Variant>MilliSeconds: latence de Spanner-read-timestamp-to-connector-emit. Les variantes P50, P95, P99, Moyen, Min et Max sont fournies.

  • LatencyCommitToEmit<Variant>tMilliSeconds: latence du connecteur Spanner-commit-timestamp-to-connector-emit. Les variantes P50, P95, P99, Moyen, Min et Max sont fournies.

  • LatencyCommitToPublish<Variant>MilliSeconds: latence de Spanner-commit-timestamp-to Kafka-publish-timestamp. Les variantes P50, P95, P99, Moyenne, Min, Max sont fournies.

  • NumberOfChangeStreamPartitionsDetected: nombre total de partitions détectées par la tâche de connecteur en cours.

  • NumberOfChangeStreamQueriesIssued: nombre total de requêtes de flux de modifications émises par la tâche en cours.

  • NumberOfActiveChangeStreamQueries: nombre de requêtes de flux de modifications actives détectées par la tâche de connecteur en cours.

  • SpannerEventQueueCapacity: capacité totale de StreamEventQueue, une file d'attente qui stocke les éléments reçus des requêtes de flux de modifications.

  • SpannerEventQueueCapacity: capacité restante pour StreamEventQueue.

  • TaskStateChangeEventQueueCapacity: capacité totale de TaskStateChangeEventQueue, une file d'attente qui stocke les événements se produisant dans le connecteur.

  • RemainingTaskStateChangeEventQueueCapacity: capacité restante pour TaskStateChangeEventQueue.

  • NumberOfActiveChangeStreamQueries: nombre de requêtes de flux de modifications actives détectées par la tâche de connecteur en cours.

Propriétés de configuration du connecteur Kafka

Les propriétés de configuration suivantes sont obligatoires pour le connecteur:

  • name: nom unique du connecteur Toute tentative d'enregistrement portant le même nom entraînera un échec. Cette propriété est requise par tous les connecteurs Kafka Connect.

  • connector.class: nom de la classe Java du connecteur. Utilisez toujours la valeur io.debezium.connector.spanner.SpannerConnector pour le connecteur Kafka.

  • tasks.max: nombre maximal de tâches à créer pour ce connecteur.

  • gcp.spanner.project.id : ID du projet

  • gcp.spanner.instance.id: ID de l'instance Spanner

  • gcp.spanner.database.id: ID de la base de données Spanner

  • gcp.spanner.change.stream: nom du flux de modifications Spanner

  • gcp.spanner.credentials.json: objet JSON de la clé de compte de service.

  • gcp.spanner.credentials.path: chemin d'accès à l'objet JSON de clé de compte de service. Obligatoire si le champ ci-dessus n'est pas renseigné.

  • gcp.spanner.database.role : rôle de base de données Spanner à utiliser. Cette opération n'est requise que lorsque le flux de modifications est sécurisé par un contrôle des accès précis. Le rôle de base de données doit disposer du droit SELECT sur le flux de modifications et du droit EXECUTE sur la fonction de lecture du flux de modifications. Pour en savoir plus, consultez la page Contrôle précis des accès pour les flux de modifications.

Les propriétés de configuration avancées suivantes ont des valeurs par défaut qui fonctionnent dans la plupart des situations et doivent donc rarement être spécifiées dans la configuration du connecteur:

  • gcp.spanner.low-watermark.enabled: indique si le filigrane faible est activé pour le connecteur. La valeur par défaut est "false".

  • gcp.spanner.low-watermark.update-period.ms: intervalle auquel le filigrane faible est mis à jour. La valeur par défaut est 1 000 ms.

  • heartbeat.interval.ms: intervalle de pulsation de Spanner. La valeur par défaut est 300 000 (cinq minutes).

  • gcp.spanner.start.time: heure de début du connecteur La valeur par défaut est l'heure actuelle.

  • gcp.spanner.end.time: heure de fin du connecteur La valeur par défaut est l'infini.

  • tables.exclude.list: tableaux pour lesquels exclure les événements de modification. La valeur par défaut est vide.

  • tables.include.list: tables pour lesquelles des événements de modification doivent être inclus. Si elles ne sont pas renseignées, toutes les tables sont incluses. La valeur par défaut est vide.

  • gcp.spanner.stream.event.queue.capacity: capacité de la file d'attente des événements Spanner. La valeur par défaut est 10 000.

  • connector.spanner.task.state.change.event.queue.capacity: capacité de la file d'attente des événements de changement d'état de la tâche. La valeur par défaut est 1 000.

  • connector.spanner.max.missed.heartbeats: nombre maximal de pulsations manquées pour une requête de flux de modifications avant qu'une exception ne soit générée. Valeur par défaut : 10

  • scaler.monitor.enabled: indique si l'autoscaling des tâches est activé. Valeur par défaut : "false".

  • tasks.desired.partitions: nombre privilégié de partitions de flux de modifications par tâche. Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 2.

  • tasks.min: nombre minimal de tâches Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 1.

  • connector.spanner.sync.topic: nom du sujet de synchronisation (sujet de connecteur interne utilisé pour stocker les communications entre les tâches). La valeur par défaut est _sync_topic_spanner_connector_connectorname si l'utilisateur n'a pas fourni de nom.

  • connector.spanner.sync.poll.duration: durée du sondage sur le sujet de synchronisation. La valeur par défaut est 500 ms.

  • connector.spanner.sync.request.timeout.ms: délai avant expiration des requêtes adressées au sujet de synchronisation. La valeur par défaut est 5 000 ms.

  • connector.spanner.sync.delivery.timeout.ms: délai avant expiration de la publication dans le sujet de synchronisation. La valeur par défaut est 15 000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: intervalle auquel les décalages sont appliqués pour le sujet de synchronisation. La valeur par défaut est 60 000 ms.

  • connector.spanner.sync.publisher.wait.timeout: intervalle auquel les messages sont publiés sur le sujet de synchronisation. La valeur par défaut est 5 ms.

  • connector.spanner.rebalancing.topic: nom du sujet de rééquilibrage. La rubrique relative au rééquilibrage est une rubrique de connecteur interne utilisée pour déterminer l'activité des tâches. La valeur par défaut est _rebalancing_topic_spanner_connector_connectorname si l'utilisateur n'a pas fourni de nom.

  • connector.spanner.rebalancing.poll.duration: durée de l'interrogation du sujet de rééquilibrage. La valeur par défaut est 5 000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: délai avant expiration pour la validation des décalages pour le sujet de rééquilibrage. La valeur par défaut est 5 000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: intervalle auquel les décalages sont appliqués pour le sujet de synchronisation. La valeur par défaut est 60 000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: délai d'attente d'une tâche avant le traitement d'un événement de rééquilibrage. La valeur par défaut est 1 000 ms.

Pour obtenir une liste encore plus détaillée des propriétés du connecteur configurable, consultez le dépôt GitHub.

Limites