Créer des connexions de flux de modifications vers Kafka

Cette page explique comment utiliser le connecteur Kafka pour exploiter et transférer les données des flux de modifications Spanner.

Concepts fondamentaux

Vous trouverez ci-dessous une description des concepts fondamentaux du connecteur Kafka.

Debezium

Debezium est un projet Open Source qui fournit une plate-forme de flux de données à faible latence pour la capture des données modifiées.

Connecteur Kafka

Le connecteur Kafka fournit une abstraction sur l'API Spanner pour publier des flux de modifications Spanner sur Kafka. Avec ce connecteur, vous n'avez pas à gérer le cycle de vie de la partition des flux de modifications, ce qui est nécessaire lorsque vous utilisez directement l'API Spanner.

Le connecteur Kafka génère un événement de modification pour chaque mod d'enregistrement de modification de données et envoie les enregistrements d'événements de modification en aval dans un sujet Kafka distinct pour chaque table faisant l'objet d'un suivi du flux de modifications. Un mod d'enregistrement des modifications de données représente une modification unique (insertion, mise à jour ou suppression) qui a été capturée. Un même enregistrement de modification de données peut contenir plusieurs mods.

Sortie du connecteur Kafka

Le connecteur Kafka transfère directement les enregistrements des flux de modifications dans un sujet Kafka distinct. Le nom du sujet de sortie doit être connector_name.table_name. Si le sujet n'existe pas, le connecteur Kafka crée automatiquement un sujet sous ce nom.

Vous pouvez également configurer des transformations de routage de sujet pour réacheminer les enregistrements vers les sujets que vous spécifiez. Si vous souhaitez utiliser le routage des sujets, désactivez la fonctionnalité de faible filigrane.

Tri des enregistrements

Les enregistrements sont classés par code temporel de commit pour chaque clé primaire les sujets Kafka. Les enregistrements appartenant à différentes clés primaires garanties de commande. Les enregistrements ayant la même clé primaire sont stockés même partition du sujet Kafka. Pour traiter l'intégralité d'une transaction, vous pouvez utilisez également l'enregistrement des modifications de données server_transaction_id et number_of_records_in_transaction sur assembler une transaction Spanner.

Événements de modification

Le connecteur Kafka génère un événement de modification des données pour chaque INSERT, UPDATE, et DELETE. Chaque événement contient une clé et des valeurs pour la ligne modifiée.

Vous pouvez utiliser les convertisseurs Kafka Connect pour générer des événements de modification de données dans Formats Protobuf, AVRO, JSON ou JSON Schemaless. Si vous utilisez un Convertisseur Kafka Connect qui génère des schémas, l'événement contient des schémas distincts pour la clé et les valeurs. Sinon, l'événement ne contient la clé et les valeurs.

Le schéma de la clé ne change jamais. Le schéma des valeurs est fusion de toutes les colonnes suivies par le flux de modifications depuis le l'heure de début du connecteur.

Si vous configurez le connecteur pour générer des événements JSON, l'événement de modification de sortie contient cinq champs:

  • Le premier champ schema spécifie un schéma Kafka Connect qui décrit le schéma de clé Spanner.

  • Le premier champ payload présente la structure décrite dans le champ schema précédent et contient la clé de la ligne qui a été modifiée.

  • Le deuxième champ schema spécifie le schéma Kafka Connect qui décrit le schéma de la ligne modifiée.

  • Le deuxième champ payload présente la structure décrite dans le champ schema précédent, et il contient les données réelles de la ligne modifiée.

  • Le champ source est obligatoire. Il décrit les métadonnées de la source de l'événement.

Voici un exemple d'événement de modification de données:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

Filigrane faible

Le filigrane bas décrit l'heure T à laquelle le connecteur Kafka est certain d'avoir diffusé et publié sur un sujet Kafka tous les événements avec un code temporel < T.

Vous pouvez activer le filigrane faible dans le connecteur Kafka à l'aide du Paramètre gcp.spanner.low-watermark.enabled. Ce paramètre est désactivé par défaut. Si le filigrane faible est activé, le champ low_watermark dans les données du flux de modifications l'enregistrement de modification est renseigné avec le filigrane faible actuel du connecteur Kafka du code temporel.

Si aucun enregistrement n'est produit, le connecteur Kafka envoie périodiquement filigrane "pulsations de cœur" aux sujets de sortie Kafka détectés par le connecteur.

Ces pulsations de filigrane sont des enregistrements vides, à l'exception de low_watermark. Vous pouvez ensuite utiliser le filigrane faible pour effectuer des agrégations temporelles. Par exemple, vous pouvez utiliser le filigrane faible pour classer les événements par commit dans les clés primaires.

Sujets des métadonnées

Le connecteur Kafka et le framework Kafka Connect créent plusieurs des sujets de métadonnées pour stocker les informations liées aux connecteurs. Il est déconseillé de modifier la configuration ou le contenu de ces sujets de métadonnées.

Voici les sujets de métadonnées:

  • _consumer_offsets: sujet créé automatiquement par Kafka. Stocke les décalages des consommateurs créés dans le connecteur Kafka.
  • _kafka-connect-offsets: sujet créé automatiquement par Kafka Connect. Stocke les décalages du connecteur.
  • _sync_topic_spanner_connector_connectorname: sujet créé automatiquement par le connecteur. Stocke les métadonnées concernant les partitions du flux de modifications.
  • _rebalancing_topic_spanner_connector_connectorname: sujet créé automatiquement par le connecteur. Permet de déterminer l'activité des tâches du connecteur.
  • _debezium-heartbeat.connectorname: sujet utilisé pour traiter les pulsations des flux de modifications Spanner.

Environnement d'exécution du connecteur Kafka

La section suivante décrit l'environnement d'exécution du connecteur Kafka.

Évolutivité

Le connecteur Kafka est évolutif horizontalement et s'exécute sur une ou plusieurs tâches réparties sur plusieurs nœuds de calcul Kafka Connect.

Garanties de distribution des messages

Le connecteur Kafka offre une garantie de distribution de type "au moins une fois".

Tolérance aux pannes

Le connecteur Kafka tolère les défaillances. Lorsque le connecteur Kafka lit les modifications produit des événements, il enregistre le dernier code temporel de commit traité pour chaque modification partition par flux. Si le connecteur Kafka s'arrête pour une raison quelconque (y compris de communication, de réseau ou de logiciels), au redémarrage le connecteur Kafka continue la diffusion des enregistrements là où il s'était arrêté.

Le connecteur Kafka lit le schéma d'informations au démarrage du connecteur Kafka pour récupérer les informations de schéma. Par défaut, Spanner ne peut pas lire le schéma d'informations au moment de la lecture la durée de conservation de la version, qui est par défaut d'une heure. Si vous souhaitez démarrer le connecteur avant au bout d'une heure, vous devez augmenter la durée de conservation de la version de la base de données période.

Configurer le connecteur Kafka

Créer un flux de modifications

Pour savoir comment créer un flux de modifications, consultez Créer un flux de modifications. Pour passer aux étapes suivantes, vous devez disposer d'une instance Spanner avec un flux de modifications configuré.

Notez que si vous souhaitez que les colonnes modifiées et non modifiées soient renvoyées à chaque un événement de modification des données, utilisez le type de capture de valeur NEW_ROW. Pour en savoir plus, consultez la section Type de capture de valeur.

Installer le fichier JAR du connecteur Kafka

Avec Zookeeper, Kafka et Kafka Connect installés, les tâches restantes de déploiement d'un connecteur Kafka consistent à télécharger l'archive des plug-ins du connecteur, extrayez les fichiers JAR dans votre environnement Kafka Connect et ajoutez le avec les fichiers JAR dans le fichier plugin.path de Kafka Connect. Vous devez ensuite redémarrer votre processus Kafka Connect pour récupérer les nouveaux fichiers JAR.

Si vous utilisez des conteneurs immuables, vous pouvez extraire des images Images de conteneurs de Debezium pour Zookeeper, Kafka et Kafka Connect. L'image Kafka Connect possède le Connecteur Spanner préinstallé.

Pour en savoir plus sur l'installation des fichiers JAR du connecteur Kafka basé sur Debezium, consultez Installer Debezium

Configurer le connecteur Kafka

Voici un exemple de configuration d'un connecteur Kafka. qui se connecte à un flux de modifications appelé changeStreamAll dans base de données users dans l'instance test-instance et dans le projet test-project.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

Cette configuration contient les éléments suivants:

  • Nom du connecteur lorsqu'il est enregistré auprès d'un service Kafka Connect.

  • Nom de cette classe de connecteur Spanner.

  • ID du projet.

  • ID d'instance Spanner.

  • ID de la base de données Spanner.

  • Nom du flux de modifications.

  • Objet JSON de la clé de compte de service.

  • (Facultatif) Rôle de base de données Spanner à utiliser.

  • Nombre maximal de tâches.

Pour obtenir la liste complète des propriétés du connecteur, consultez Propriétés de configuration du connecteur Kafka.

Ajouter la configuration du connecteur à Kafka Connect

Pour lancer l'exécution d'un connecteur Spanner:

  1. Créez une configuration pour le connecteur Spanner.

  2. Utilisez l'API REST Kafka Connect pour l'ajouter. la configuration du connecteur à votre cluster Kafka Connect.

Vous pouvez envoyer cette configuration à l'aide d'une commande POST à une connexion Kafka en cours d'exécution Google Cloud. Par défaut, le service Kafka Connect s'exécute sur le port 8083. Le service enregistre la configuration et lance la tâche de connecteur qui se connecte à la base de données Spanner et transmet les enregistrements d'événements de modification Sujets Kafka.

Voici un exemple de commande POST:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

Exemple de réponse réussie:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Mettre à jour la configuration du connecteur Kafka

Pour mettre à jour la configuration du connecteur, envoyez une commande PUT au gestionnaire Service Kafka Connect portant le même nom de connecteur.

Supposons qu'un connecteur s'exécute avec la configuration de la section précédente. Voici un exemple de commande PUT:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Exemple de réponse réussie:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Arrêter le connecteur Kafka

Pour arrêter le connecteur, envoyez une commande DELETE au gestionnaire Service Kafka Connect portant le même nom de connecteur.

Supposons qu'un connecteur s'exécute avec la configuration de la section précédente. Voici un exemple de commande DELETE:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

Exemple de réponse réussie:

HTTP/1.1 204 No Content

Surveiller le connecteur Kafka

En plus des métriques standards Kafka Connect et Debezium, le connecteur Kafka exporte ses propres métriques:

  • MilliSecondsLowWatermark: filigrane bas actuel de la tâche du connecteur, en millisecondes. La un filigrane faible décrit l'instant T auquel le connecteur est certain tous les événements dont le code temporel est inférieur à T

  • MilliSecondsLowWatermarkLag: décalage du filigrane bas par rapport à l'heure actuelle, en millisecondes. tous les événements dont le code temporel est inférieur à T

  • LatencyLowWatermark<Variant>MilliSeconds: délai du filigrane faible par rapport à l'heure actuelle en millisecondes. Les variantes P50, P95, P99, Moyen, Min et Max sont fournies.

  • LatencySpanner<Variant>MilliSeconds: latence de Spanner-commit-timestamp-to-connector-read. Les variantes P50, P95, P99, Moyenne, Min, Max sont fournies.

  • LatencyReadToEmit<Variant>MilliSeconds: latence de Spanner-read-timestamp-to-connector-emit. Les variantes P50, P95, P99, Moyen, Min et Max sont fournies.

  • LatencyCommitToEmit<Variant>tMilliSeconds: latence du connecteur Spanner-commit-timestamp-to-connector-emit. Les variantes P50, P95, P99, Moyen, Min et Max sont fournies.

  • LatencyCommitToPublish<Variant>MilliSeconds: latence de Spanner-commit-timestamp-to Kafka-publish-timestamp. Les variantes P50, P95, P99, Moyenne, Min, Max sont fournies.

  • NumberOfChangeStreamPartitionsDetected: nombre total de partitions détecté par la tâche de connecteur en cours.

  • NumberOfChangeStreamQueriesIssued: nombre total de requêtes de flux de modifications émis par la tâche en cours.

  • NumberOfActiveChangeStreamQueries: nombre actif de flux de modifications détectées par la tâche de connecteur en cours.

  • SpannerEventQueueCapacity: capacité totale de StreamEventQueue, une file d'attente qui stocke les éléments reçus des requêtes de flux de modifications.

  • SpannerEventQueueCapacity: capacité restante pour StreamEventQueue.

  • TaskStateChangeEventQueueCapacity: capacité totale de TaskStateChangeEventQueue, une file d'attente qui stocke les événements se produisant dans le connecteur.

  • RemainingTaskStateChangeEventQueueCapacity: capacité restante pour TaskStateChangeEventQueue.

  • NumberOfActiveChangeStreamQueries: nombre actif de flux de modifications détectées par la tâche de connecteur en cours.

Propriétés de configuration du connecteur Kafka

Les propriétés de configuration suivantes sont obligatoires pour le connecteur:

  • name: nom unique du connecteur Toute tentative d'enregistrement portant le même nom entraînera un échec. Cette propriété est requise par tous les connecteurs Kafka Connect.

  • connector.class: nom de la classe Java du connecteur. Utilisez toujours la valeur io.debezium.connector.spanner.SpannerConnector pour le connecteur Kafka.

  • tasks.max: nombre maximal de tâches à créer pour ce connecteur.

  • gcp.spanner.project.id : ID du projet

  • gcp.spanner.instance.id: ID de l'instance Spanner

  • gcp.spanner.database.id: ID de la base de données Spanner

  • gcp.spanner.change.stream: nom du flux de modifications Spanner

  • gcp.spanner.credentials.json: objet JSON de la clé de compte de service.

  • gcp.spanner.credentials.path: chemin d'accès à l'objet JSON de clé de compte de service. Obligatoire si le champ ci-dessus n'est pas renseigné.

  • gcp.spanner.database.role : rôle de base de données Spanner pour utiliser. Cette étape n'est requise que lorsque le flux de modifications est sécurisé par un contrôle des accès précis. Le rôle de base de données doit disposer du droit SELECT sur le le flux de modifications et le droit EXECUTE sur l'accès en lecture du flux de modifications . Pour en savoir plus, consultez la page Contrôle précis des accès aux modifications flux.

Les propriétés de configuration avancées suivantes ont des valeurs par défaut qui fonctionnent dans la plupart situations et doivent donc rarement être spécifiés dans la configuration du connecteur:

  • gcp.spanner.low-watermark.enabled: indique si le filigrane faible est activé pour le connecteur. La valeur par défaut est "false".

  • gcp.spanner.low-watermark.update-period.ms: intervalle auquel le filigrane faible est mis à jour. La valeur par défaut est 1 000 ms.

  • heartbeat.interval.ms: intervalle de pulsation de Spanner. La valeur par défaut est 300 000 (cinq minutes).

  • gcp.spanner.start.time: heure de début du connecteur La valeur par défaut est l'heure actuelle.

  • gcp.spanner.end.time: heure de fin du connecteur La valeur par défaut est l'infini.

  • tables.exclude.list: tableaux pour lesquels exclure les événements de modification. La valeur par défaut est vide.

  • tables.include.list: tables pour lesquelles des événements de modification doivent être inclus. Si elles ne sont pas renseignées, toutes les tables sont incluses. La valeur par défaut est vide.

  • gcp.spanner.stream.event.queue.capacity: capacité de la file d'attente des événements Spanner. La valeur par défaut est 10 000.

  • connector.spanner.task.state.change.event.queue.capacity: capacité de la file d'attente des événements de changement d'état de la tâche. La valeur par défaut est 1 000.

  • connector.spanner.max.missed.heartbeats: nombre maximal de pulsations manquées pour une requête de flux de modifications avant qu'une exception ne soit générée. Valeur par défaut : 10

  • scaler.monitor.enabled: indique si l'autoscaling des tâches est activé. Valeur par défaut : "false".

  • tasks.desired.partitions: nombre privilégié de partitions de flux de modifications par tâche. Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 2.

  • tasks.min: nombre minimal de tâches Ce paramètre est nécessaire pour l'autoscaling des tâches. La valeur par défaut est 1.

  • connector.spanner.sync.topic: nom du sujet de synchronisation (sujet de connecteur interne utilisé pour stocker les communications entre les tâches). La valeur par défaut est _sync_topic_spanner_connector_connectorname si l'utilisateur n'a pas fourni de nom.

  • connector.spanner.sync.poll.duration: durée du sondage sur le sujet de synchronisation. La valeur par défaut est 500 ms.

  • connector.spanner.sync.request.timeout.ms: délai avant expiration des requêtes adressées au sujet de synchronisation. La valeur par défaut est 5 000 ms.

  • connector.spanner.sync.delivery.timeout.ms: délai avant expiration de la publication dans le sujet de synchronisation. La valeur par défaut est 15 000 ms.

  • connector.spanner.sync.commit.offsets.interval.ms: intervalle auquel les décalages sont appliqués pour le sujet de synchronisation. La valeur par défaut est 60 000 ms.

  • connector.spanner.sync.publisher.wait.timeout: intervalle auquel les messages sont publiés sur le sujet de synchronisation. La valeur par défaut est 5 ms.

  • connector.spanner.rebalancing.topic: nom du sujet de rééquilibrage. La rubrique relative au rééquilibrage est une rubrique de connecteur interne utilisée pour déterminer l'activité des tâches. La valeur par défaut est _rebalancing_topic_spanner_connector_connectorname si l'utilisateur n'a pas fourni de nom.

  • connector.spanner.rebalancing.poll.duration: durée de l'interrogation du sujet de rééquilibrage. La valeur par défaut est 5 000 ms.

  • connector.spanner.rebalancing.commit.offsets.timeout: délai avant expiration pour la validation des décalages pour le sujet de rééquilibrage. La valeur par défaut est 5 000 ms.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: intervalle auquel les décalages sont appliqués pour le sujet de synchronisation. La valeur par défaut est 60 000 ms.

  • connector.spanner.rebalancing.task.waiting.timeout: délai d'attente d'une tâche avant le traitement d'un événement de rééquilibrage. La valeur par défaut est 1 000 ms.

Pour obtenir une liste encore plus détaillée des propriétés du connecteur configurable, consultez le dépôt GitHub.

Limites