Créer des connexions de flux de modifications à Kafka

Cette page explique comment utiliser le connecteur Kafka pour consommer et transférer des données de flux de modifications Spanner.

Concepts fondamentaux

Ce qui suit décrit les concepts fondamentaux du connecteur Kafka.

Debezium

Debezium est un projet Open Source qui fournit une plate-forme de flux de données à faible latence pour la capture des données modifiées.

Connecteur Kafka

Le connecteur Kafka fournit une abstraction par rapport à l'API Spanner pour publier les flux de modifications Spanner dans Kafka. Avec ce connecteur, vous n'avez pas à gérer le cycle de vie des partitions des flux de modifications, qui est nécessaire lorsque vous utilisez directement l'API Spanner.

Le connecteur Kafka génère un événement de modification pour chaque mod d'enregistrements de modifications de données et envoie les enregistrements d'événements de modification en aval dans un sujet Kafka distinct pour chaque table suivie par un flux de modifications. Un mod d'enregistrement de modifications de données représente une modification unique (insertion, mise à jour ou suppression) capturée. Un seul enregistrement de modification de données peut contenir plusieurs mod.

Sortie du connecteur Kafka

Le connecteur Kafka transfère les enregistrements de flux de modifications directement dans un sujet Kafka distinct. Le nom du sujet de sortie doit être connector_name.table_name. Si le sujet n'existe pas, le connecteur Kafka crée automatiquement un sujet sous ce nom.

Vous pouvez également configurer des transformations de routage des sujets afin de réacheminer des enregistrements vers les sujets que vous spécifiez. Si vous souhaitez utiliser le routage des sujets, désactivez la fonctionnalité de faible filigrane.

Commande d'enregistrements

Les enregistrements sont classés par code temporel de commit et par clé primaire dans les sujets Kafka. Les enregistrements appartenant à différentes clés primaires ne présentent pas de garanties de tri. Les enregistrements ayant la même clé primaire sont stockés dans la même partition de sujet Kafka. Si vous souhaitez traiter des transactions entières, vous pouvez également utiliser les champs server_transaction_id et number_of_records_in_transaction de l'enregistrement de modification des données pour assembler une transaction Spanner.

Événements de modification

Le connecteur Kafka génère un événement de modification de données pour chaque opération INSERT, UPDATE et DELETE. Chaque événement contient une clé et des valeurs pour la ligne modifiée.

Vous pouvez utiliser les convertisseurs Kafka Connect pour générer des événements de modification des données aux formats Protobuf, AVRO, JSON ou JSON Schemaless. Si vous utilisez un convertisseur Kafka Connect qui génère des schémas, l'événement contient des schémas distincts pour la clé et les valeurs. Sinon, l'événement ne contient que la clé et les valeurs.

Le schéma de la clé ne change jamais. Le schéma des valeurs est une combinaison de toutes les colonnes suivies par le flux de modifications depuis l'heure de début du connecteur.

Si vous configurez le connecteur pour générer des événements JSON, l'événement de modification de sortie contient cinq champs:

  • Le premier champ schema spécifie un schéma Kafka Connect qui décrit le schéma de clé Spanner.

  • Le premier champ payload présente la structure décrite par le champ schema précédent et contient la clé de la ligne modifiée.

  • Le deuxième champ schema spécifie le schéma Kafka Connect qui décrit le schéma de la ligne modifiée.

  • Le deuxième champ payload présente la structure décrite par le champ schema précédent et contient les données réelles de la ligne modifiée.

  • Le champ source est obligatoire. Il décrit les métadonnées sources de l'événement.

Voici un exemple d'événement de modification des données:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": {
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": {
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c",
  "ts_ms": 1559033904863 //
}

Filigrane faible

Le filigrane faible décrit l'heure T à laquelle le connecteur Kafka aura la garantie d'avoir été diffusé en streaming et publié sur un sujet Kafka pour tous les événements dont l'horodatage est inférieur à T.

Vous pouvez activer le filigrane faible dans le connecteur Kafka à l'aide du paramètre gcp.spanner.low-watermark.enabled. Ce paramètre est désactivé par défaut. Si le filigrane faible est activé, le champ low_watermark de l'enregistrement de modification des données du flux de modifications est renseigné avec l'horodatage actuel à faible filigrane du connecteur Kafka.

Si aucun enregistrement n'est généré, le connecteur Kafka envoie régulièrement des "battements de cœur" en filigrane aux sujets de sortie Kafka détectés par le connecteur.

Ces pulsations en filigrane sont des enregistrements vides, à l'exception du champ low_watermark. Vous pouvez ensuite utiliser le filigrane faible pour effectuer des agrégations temporelles. Par exemple, vous pouvez utiliser le filigrane faible pour classer les événements par horodatage de commit sur les clés primaires.

Sujets des métadonnées

Le connecteur Kafka, ainsi que le framework Kafka Connect, créent plusieurs sujets de métadonnées pour stocker les informations liées au connecteur. Il est déconseillé de modifier la configuration ou le contenu de ces sujets de métadonnées.

Les sujets des métadonnées sont les suivants:

  • _consumer_offsets: sujet créé automatiquement par Kafka. Stocke les décalages pour les consommateurs créés dans le connecteur Kafka.
  • _kafka-connect-offsets: sujet créé automatiquement par Kafka Connect. Stocke les décalages des connecteurs.
  • _sync_topic_spanner_connector_connectorname: sujet créé automatiquement par le connecteur. Stocke les métadonnées concernant les partitions de flux de modifications.
  • _rebalancing_topic_spanner_connector_connectorname: sujet créé automatiquement par le connecteur. Permet de déterminer l'activité des tâches du connecteur.
  • _debezium-heartbeat.connectorname: sujet utilisé pour traiter les pulsations des flux de modifications Spanner.

Environnement d'exécution du connecteur Kafka

Vous trouverez ci-dessous une description de l'environnement d'exécution du connecteur Kafka.

Évolutivité

Le connecteur Kafka est évolutif horizontalement et s'exécute sur une ou plusieurs tâches réparties sur plusieurs nœuds de calcul Kafka Connect.

Garanties de distribution des messages

Le connecteur Kafka est compatible avec la garantie de distribution "au moins une fois".

Tolérance aux pannes

Le connecteur Kafka résiste aux pannes. Lorsque le connecteur Kafka lit les modifications et produit des événements, il enregistre le dernier horodatage de commit traité pour chaque partition du flux de modifications. Si le connecteur Kafka s'arrête pour une raison quelconque (y compris en cas d'échec de communication, de réseau ou de logiciel), il continue de diffuser les enregistrements là où il s'était arrêté au redémarrage.

Le connecteur Kafka lit le schéma d'informations au niveau de l'horodatage de début du connecteur Kafka pour récupérer les informations de schéma. Par défaut, Spanner ne peut pas lire le schéma d'informations au niveau des horodatages de lecture antérieurs à la durée de conservation de la version, qui est définie par défaut sur une heure. Si vous souhaitez démarrer le connecteur il y a plus d'une heure, vous devez augmenter la durée de conservation de la version de la base de données.

Configurer le connecteur Kafka

Créer un flux de modifications

Pour en savoir plus sur la création d'un flux de modifications, consultez l'article Créer un flux de modifications. Pour passer aux étapes suivantes, vous devez disposer d'une instance Spanner avec un flux de modifications configuré.

Notez que si vous souhaitez que les colonnes modifiées et non modifiées soient renvoyées à chaque événement de modification des données, utilisez le type de capture de valeur NEW_ROW. Pour en savoir plus, consultez la section Type de capture de valeur.

Installer le fichier JAR du connecteur Kafka

Une fois Zookeeper, Kafka et Kafka Connect installés, les tâches restantes pour déployer un connecteur Kafka consistent à télécharger l'archive de plug-ins du connecteur, à extraire les fichiers JAR dans votre environnement Kafka Connect et à ajouter le répertoire contenant les fichiers JAR dans le plugin.path de Kafka Connect. Vous devez ensuite redémarrer votre processus Kafka Connect pour récupérer les nouveaux fichiers JAR.

Si vous travaillez avec des conteneurs immuables, vous pouvez extraire des images à partir des images de conteneurs Debezium pour Zoookeeper, Kafka et Kafka Connect. Le connecteur Spanner est préinstallé sur l'image Kafka Connect.

Pour en savoir plus sur l'installation de fichiers JAR de connecteur Kafka basés sur Debezium, consultez la page Installer Debezium.

Configurer le connecteur Kafka

Voici un exemple de configuration d'un connecteur Kafka qui se connecte à un flux de modifications appelé changeStreamAll dans la base de données users dans l'instance test-instance et dans le projet test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Cette configuration contient les éléments suivants:

  • Nom du connecteur lorsqu'il est enregistré auprès d'un service Kafka Connect.

  • Nom de cette classe de connecteur Spanner.

  • ID du projet.

  • ID de l'instance Spanner.

  • ID de la base de données Spanner.

  • Nom du flux de modifications.

  • Objet JSON de la clé du compte de service.

  • (Facultatif) Rôle de base de données Spanner à utiliser.

  • Nombre maximal de tâches.

Pour obtenir la liste complète des propriétés d'un connecteur, consultez l'article Propriétés de configuration d'un connecteur Kafka.

Ajouter la configuration du connecteur à Kafka Connect

Pour commencer à exécuter un connecteur Spanner, procédez comme suit:

  1. Créez une configuration pour le connecteur Spanner.

  2. Utilisez l'API REST de Kafka Connect pour ajouter cette configuration de connecteur à votre cluster Kafka Connect.

Vous pouvez envoyer cette configuration à un service Kafka Connect en cours d'exécution à l'aide d'une commande POST. Par défaut, le service Kafka Connect s'exécute sur le port 8083. Le service enregistre la configuration et démarre la tâche de connecteur qui se connecte à la base de données Spanner et transmet les enregistrements d'événements de modification aux sujets Kafka.

Voici un exemple de commande POST:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Exemple de réponse réussie:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Mettre à jour la configuration du connecteur Kafka

Pour mettre à jour la configuration du connecteur, envoyez une commande PUT au service Kafka Connect en cours d'exécution avec le même nom de connecteur.

Supposons qu'un connecteur s'exécute avec la configuration de la section précédente. Voici un exemple de commande PUT:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Exemple de réponse réussie:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Arrêter le connecteur Kafka

Pour arrêter le connecteur, envoyez une commande DELETE au service Kafka Connect en cours d'exécution avec le même nom de connecteur.

Supposons qu'un connecteur s'exécute avec la configuration de la section précédente. Voici un exemple de commande DELETE:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Exemple de réponse réussie:

HTTP/1.1 204 No Content

Surveiller le connecteur Kafka

En plus des métriques standards de Kafka Connect et Debezium, le connecteur Kafka exporte ses propres métriques:

  • MilliSecondsLowWatermark: filigrane faible actuel de la tâche de connecteur, en millisecondes. Le filigrane faible décrit l'heure T à laquelle le connecteur a la garantie d'avoir transmis tous les événements dont l'horodatage est inférieur à T.

  • MilliSecondsLowWatermarkLag: retard du filigrane faible derrière l'heure actuelle, en millisecondes. Diffusion de tous les événements dont l'horodatage est inférieur à T

  • LatencyLowWatermark<Variant>MilliSeconds: retard du filigrane faible par rapport à l'heure actuelle, en millisecondes. Nous fournissons des variantes P50, P95, P99, Moyenne, Min et Max.

  • LatencySpanner<Variant>MilliSeconds: latence de Spanner-commit-timestamp-to-connector-read. Les variantes P50, P95, P99, Moyenne, Min et Max sont proposées.

  • LatencyReadToEmit<Variant>MilliSeconds: latence de Spanner-read-timestamp-to-connector-emit. Nous fournissons des variantes P50, P95, P99, Moyenne, Min et Max.

  • LatencyCommitToEmit<Variant>tMilliSeconds: latence de Spanner-commit-timestamp-to-connector-emit. Nous fournissons des variantes P50, P95, P99, Moyenne, Min et Max.

  • LatencyCommitToPublish<Variant>MilliSeconds: latence de Spanner-commit-timestamp-to Kafka-publish-timestamp. Les variantes P50, P95, P99, Moyenne, Min et Max sont proposées.

  • NumberOfChangeStreamPartitionsDetected: nombre total de partitions détectées par la tâche de connecteur en cours.

  • NumberOfChangeStreamQueriesIssued: nombre total de requêtes de flux de modifications émises par la tâche en cours.

  • NumberOfActiveChangeStreamQueries: nombre actif de requêtes de flux de modifications détectées par la tâche de connecteur en cours.

  • SpannerEventQueueCapacity: capacité totale de StreamEventQueue, une file d'attente qui stocke les éléments reçus des requêtes de flux de modifications.

  • SpannerEventQueueCapacity: capacité restante de StreamEventQueue.

  • TaskStateChangeEventQueueCapacity: capacité totale de TaskStateChangeEventQueue, une file d'attente qui stocke les événements qui se produisent dans le connecteur.

  • RemainingTaskStateChangeEventQueueCapacity: capacité restante de TaskStateChangeEventQueue.

  • NumberOfActiveChangeStreamQueries: nombre actif de requêtes de flux de modifications détectées par la tâche de connecteur en cours.

Propriétés de configuration du connecteur Kafka

Les propriétés de configuration suivantes sont obligatoires pour le connecteur:

  • name: nom unique du connecteur. Toute nouvelle tentative d'enregistrement portant le même nom entraînera un échec. Cette propriété est requise par tous les connecteurs Kafka Connect.

  • connector.class: nom de la classe Java du connecteur. Utilisez toujours la valeur io.debezium.connector.spanner.SpannerConnector pour le connecteur Kafka.

  • tasks.max: nombre maximal de tâches à créer pour ce connecteur

  • gcp.spanner.project.id : ID du projet

  • gcp.spanner.instance.id: ID de l'instance Spanner

  • gcp.spanner.database.id: ID de la base de données Spanner

  • gcp.spanner.change.stream: nom du flux de modifications Spanner

  • gcp.spanner.credentials.json: objet JSON de clé de compte de service.

  • gcp.spanner.credentials.path: chemin d'accès à l'objet JSON de clé de compte de service. Obligatoire si le champ ci-dessus n'est pas renseigné.

  • gcp.spanner.database.role : rôle de base de données Spanner à utiliser. Cela n'est nécessaire que lorsque le flux de modifications est sécurisé par un contrôle des accès ultraprécis. Le rôle de base de données doit disposer du droit SELECT sur le flux de modifications et du droit EXECUTE sur la fonction de lecture du flux de modifications. Pour en savoir plus, consultez la section Contrôle précis des contrôle des accès pour les flux de modifications.

Les propriétés de configuration avancée suivantes possèdent des valeurs par défaut qui fonctionnent dans la plupart des situations et doivent donc rarement être spécifiées dans la configuration du connecteur:

  • gcp.spanner.low-watermark.enabled: indique si le filigrane faible est activé pour le connecteur. La valeur par défaut est "false".

  • gcp.spanner.low-watermark.update-period.ms: intervalle auquel le filigrane faible est mis à jour. La valeur par défaut est 1 000 ms.

  • heartbeat.interval.ms: intervalle de pulsation Spanner. La valeur par défaut est 300000 (cinq minutes).

  • gcp.spanner.start.time: heure de début du connecteur. La valeur par défaut est l'heure actuelle.

  • gcp.spanner.end.time: heure de fin du connecteur. La valeur par défaut est l'infini.

  • tables.exclude.list: tables pour lesquelles exclure les événements de modification. La valeur par défaut est vide.

  • tables.include.list: tables pour lesquelles vous souhaitez inclure les événements de modification. Si ce champ n'est pas renseigné, toutes les tables sont incluses. La valeur par défaut est vide.

  • gcp.spanner.stream.event.queue.capacity: capacité de la file d'attente d'événements Spanner. La valeur par défaut est 10 000.

  • connector.spanner.task.state.change.event.queue.capacity: la capacité de la file d'attente des événements de modification de l'état des tâches La valeur par défaut est 1 000.

  • connector.spanner.max.missed.heartbeats: nombre maximal de pulsations manquées pour une requête de flux de modifications avant qu'une exception ne soit générée. Valeur par défaut : 10

  • scaler.monitor.enabled: indique si l'autoscaling des tâches est activé. Valeur par défaut : "false".

  • tasks.desired.partitions: nombre souhaité de partitions de flux de modifications par tâche. Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 2.

  • tasks.min: nombre minimal de tâches. Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 1.

  • connector.spanner.sync.topic: nom du sujet de synchronisation, un sujet de connecteur interne permettant de stocker les communications entre les tâches. La valeur par défaut est _sync_topic_spanner_connector_connectorname si l'utilisateur n'a pas fourni de nom.

  • connector.spanner.sync.poll.duration: durée du sondage pour le sujet de synchronisation. La valeur par défaut est 500 ms.

  • connector.spanner.sync.request.timeout.ms: délai avant expiration des requêtes envoyées au sujet de synchronisation. La valeur par défaut est 5 000 ms.

  • connector.spanner.sync.delivery.timeout.ms: délai avant expiration de la publication dans le sujet de synchronisation. La valeur par défaut est 15 000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: intervalle selon lequel les décalages sont validés pour le sujet de synchronisation. La valeur par défaut est 60 000 ms.

  • connector.spanner.sync.publisher.wait.timeout: intervalle auquel les messages sont publiés dans le sujet de synchronisation. La valeur par défaut est 5 ms.

  • connector.spanner.rebalancing.topic: nom du sujet de rééquilibrage. Le sujet de rééquilibrage est un sujet de connecteur interne qui permet de déterminer l'activité des tâches. La valeur par défaut est _rebalancing_topic_spanner_connector_connectorname si l'utilisateur n'a pas fourni de nom.

  • connector.spanner.rebalancing.poll.duration: durée de l'interrogation pour le sujet de rééquilibrage. La valeur par défaut est 5 000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: délai avant expiration de la validation des décalages pour le sujet de rééquilibrage. La valeur par défaut est 5 000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: intervalle selon lequel les décalages sont validés pour le sujet de synchronisation. La valeur par défaut est 60 000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: temps d'attente d'une tâche avant le traitement d'un événement de rééquilibrage. La valeur par défaut est 1 000 ms.

Pour obtenir une liste encore plus détaillée des propriétés du connecteur configurable, consultez le dépôt GitHub.

Limites