Connecter Pub/Sub à Apache Kafka

Ce document explique comment intégrer Apache Kafka et Pub/Sub à l'aide du connecteur Kafka de groupe Pub/Sub.

À propos du connecteur Kafka de groupe Pub/Sub

Apache Kafka est une plate-forme Open Source de diffusion d'événements. Il est couramment utilisé dans les architectures distribuées pour permettre la communication entre des composants faiblement couplés. Pub/Sub est un service géré permettant d'envoyer et de recevoir des messages de manière asynchrone. Comme avec Kafka, vous pouvez utiliser Pub/Sub pour communiquer entre les composants de votre architecture cloud.

Le connecteur Kafka de groupe Pub/Sub vous permet d'intégrer ces deux systèmes. Les connecteurs suivants sont inclus dans le fichier JAR du connecteur:

  • Le connecteur de récepteur lit les enregistrements d'un ou plusieurs sujets Kafka et les publie dans Pub/Sub.
  • Le connecteur source lit les messages d'un sujet Pub/Sub et les publie sur Kafka.

Voici quelques scénarios d'utilisation du connecteur Kafka de groupe Pub/Sub:

  • Vous migrez une architecture basée sur Kafka vers Google Cloud.
  • Vous disposez d'un système d'interface qui stocke les événements dans Kafka en dehors de Google Cloud, mais vous utilisez également Google Cloud pour exécuter certains de vos services de backend, qui doivent recevoir les événements Kafka.
  • Vous collectez les journaux à partir d'une solution Kafka sur site et les envoyez à Google Cloud pour l'analyse de données.
  • Vous disposez d'un système d'interface qui utilise Google Cloud, mais vous stockez également des données sur site à l'aide de Kafka.

Le connecteur nécessite Kafka Connect, un framework permettant de diffuser des données en flux continu entre Kafka et d'autres systèmes. Pour utiliser le connecteur, vous devez exécuter Kafka Connect en même temps que votre cluster Kafka.

Dans ce document, nous partons du principe que vous connaissez à la fois Kafka et Pub/Sub. Avant de lire ce document, nous vous recommandons de suivre l'un des guides de démarrage rapide de Pub/Sub.

Le connecteur Pub/Sub n'est pas compatible avec l'intégration entre les LCA Google Cloud IAM et Kafka Connect.

Premiers pas avec le connecteur

Cette section vous guide à travers les tâches suivantes:

  1. Configurez le connecteur Kafka de groupe Pub/Sub.
  2. Envoyer des événements de Kafka vers Pub/Sub
  3. Envoyez des messages de Pub/Sub à Kafka.

Prérequis

Installer Kafka

Suivez le guide de démarrage rapide d'Apache Kafka pour installer une application Kafka à nœud unique sur votre ordinateur local. Procédez comme suit dans le guide de démarrage rapide:

  1. Téléchargez la dernière version de Kafka et extrayez-la.
  2. Démarrez l'environnement Kafka.
  3. Créez un sujet Kafka.

Authentifier

Le connecteur Kafka de groupe Pub/Sub doit s'authentifier auprès de Pub/Sub pour envoyer et recevoir des messages Pub/Sub. Pour configurer l'authentification, procédez comme suit:

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Installez Google Cloud CLI.
  3. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  4. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  5. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login
  6. Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • en remplaçant PROJECT_ID par l'ID de votre projet :
    • Remplacez EMAIL_ADDRESS par votre adresse e-mail.
    • Remplacez ROLE par chaque rôle individuel.
  7. Installez Google Cloud CLI.
  8. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  9. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  10. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login
  11. Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • en remplaçant PROJECT_ID par l'ID de votre projet :
    • Remplacez EMAIL_ADDRESS par votre adresse e-mail.
    • Remplacez ROLE par chaque rôle individuel.

Télécharger le fichier JAR du connecteur

Téléchargez le fichier JAR du connecteur sur votre ordinateur local. Pour en savoir plus, consultez la section Obtenir le connecteur dans le fichier README de GitHub.

Copier les fichiers de configuration du connecteur

  1. Clonez ou téléchargez le dépôt GitHub du connecteur.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copiez le contenu du répertoire config dans le sous-répertoire config de votre installation de Kafka.

    cp config/* [path to Kafka installation]/config/
    

Ces fichiers contiennent les paramètres de configuration du connecteur.

Mettre à jour la configuration de Kafka Connect

  1. Accédez au répertoire contenant le binaire Kafka Connect que vous avez téléchargé.
  2. Dans le répertoire binaire de Kafka Connect, ouvrez le fichier nommé config/connect-standalone.properties dans un éditeur de texte.
  3. Si l'élément plugin.path property est commenté, annulez la mise en commentaire.
  4. Mettez à jour le fichier plugin.path property pour inclure le chemin d'accès au fichier JAR du connecteur.

    Exemple :

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Définissez la propriété offset.storage.file.filename sur un nom de fichier local. En mode autonome, Kafka utilise ce fichier pour stocker les données de décalage.

    Exemple :

    offset.storage.file.filename=/tmp/connect.offsets
    

Transférer des événements de Kafka vers Pub/Sub

Cette section explique comment démarrer le connecteur récepteur, publier des événements sur Kafka, puis lire les messages transférés à partir de Pub/Sub.

  1. Utilisez la Google Cloud CLI pour créer un sujet Pub/Sub avec un abonnement.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Remplacez les éléments suivants :

    • PUBSUB_TOPIC: nom d'un sujet Pub/Sub pour recevoir les messages de Kafka.
    • PUBSUB_SUBSCRIPTION: nom d'un abonnement Pub/Sub pour le sujet.
  2. Ouvrez le fichier /config/cps-sink-connector.properties dans un éditeur de texte. Ajoutez des valeurs aux propriétés suivantes, qui sont marquées "TODO" dans les commentaires:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    Remplacez les éléments suivants :

    • KAFKA_TOPICS: liste de sujets Kafka à lire, séparés par une virgule.
    • PROJECT_ID: projet Google Cloud contenant votre sujet Pub/Sub.
    • PUBSUB_TOPIC: sujet Pub/Sub qui recevra les messages de Kafka.
  3. Dans le répertoire Kafka, exécutez la commande suivante:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Suivez les étapes du guide de démarrage rapide d'Apache Kafka pour écrire des événements dans votre sujet Kafka.

  5. Utilisez la gcloud CLI pour lire les événements à partir de Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Transférer des messages de Pub/Sub vers Kafka

Cette section explique comment démarrer le connecteur source, publier des messages sur Pub/Sub et lire les messages transférés à partir de Kafka.

  1. Utilisez la gcloud CLI pour créer un sujet Pub/Sub avec un abonnement.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Remplacez les éléments suivants :

    • PUBSUB_TOPIC: nom d'un sujet Pub/Sub.
    • PUBSUB_SUBSCRIPTION: nom d'un abonnement Pub/Sub.
  2. Ouvrez le fichier nommé /config/cps-source-connector.properties dans un éditeur de texte. Ajoutez des valeurs pour les propriétés suivantes, qui sont marquées "TODO" dans les commentaires:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    Remplacez les éléments suivants :

    • KAFKA_TOPIC: les sujets Kafka qui recevront les messages Pub/Sub.
    • PROJECT_ID: projet Google Cloud contenant votre sujet Pub/Sub.
    • PUBSUB_TOPIC: sujet Pub/Sub.
  3. Dans le répertoire Kafka, exécutez la commande suivante:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Utilisez la gcloud CLI pour publier un message sur Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Lisez le message de Kafka. Suivez les étapes du guide de démarrage rapide d'Apache Kafka pour lire les messages du sujet Kafka.

Conversion par SMS

Un enregistrement Kafka contient une clé et une valeur, qui sont des tableaux d'octets de longueur variable. Un enregistrement Kafka peut éventuellement comporter des en-têtes, qui sont des paires clé/valeur. Un message Pub/Sub se compose de deux parties principales: le corps du message et zéro, un ou plusieurs attributs clé-valeur.

Kafka Connect utilise des convertisseurs pour sérialiser les clés et les valeurs vers et depuis Kafka. Pour contrôler la sérialisation, définissez les propriétés suivantes dans les fichiers de configuration du connecteur:

  • key.converter: convertisseur utilisé pour sérialiser les clés d'enregistrement.
  • value.converter: convertisseur utilisé pour sérialiser les valeurs d'enregistrement.

Le corps d'un message Pub/Sub est un objet ByteString. La conversion la plus efficace consiste donc à copier directement la charge utile. Pour cette raison, nous vous recommandons d'utiliser un convertisseur qui produit des types de données primitifs (schéma entier, flottant, chaîne ou octets) dans la mesure du possible, afin d'éviter la désérialisation et la sérialisation du même corps de message.

Conversion de Kafka vers Pub/Sub

Le connecteur de récepteur convertit les enregistrements Kafka en messages Pub/Sub comme suit:

  • La clé d'enregistrement Kafka est stockée en tant qu'attribut nommé "key" dans le message Pub/Sub.
  • Par défaut, le connecteur supprime tous les en-têtes dans l'enregistrement Kafka. Toutefois, si vous définissez l'option de configuration headers.publish sur true, le connecteur écrit les en-têtes en tant qu'attributs Pub/Sub. Le connecteur ignore les en-têtes qui dépassent les limites Pub/Sub des attributs de message.
  • Pour les schémas d'entiers, de valeurs flottantes, de chaînes et d'octets, le connecteur transmet les octets de la valeur de l'enregistrement Kafka directement dans le corps du message Pub/Sub.
  • Pour les schémas de structure, le connecteur écrit chaque champ en tant qu'attribut du message Pub/Sub. Par exemple, si le champ est { "id"=123 }, le message Pub/Sub résultant possède un attribut "id"="123". La valeur du champ est toujours convertie en chaîne. Les types map et struct ne sont pas acceptés comme types de champ dans une structure.
  • Pour les schémas de mappage, le connecteur écrit chaque paire clé/valeur en tant qu'attribut du message Pub/Sub. Par exemple, si le mappage est {"alice"=1,"bob"=2}, le message Pub/Sub résultant possède deux attributs, "alice"="1" et "bob"="2". Les clés et les valeurs sont converties en chaînes.

Les schémas de structure et de carte présentent des comportements supplémentaires:

  • Vous pouvez éventuellement spécifier un champ de structure ou une clé de mappage spécifique comme corps du message en définissant la propriété de configuration messageBodyName. La valeur du champ ou de la clé est stockée en tant que ByteString dans le corps du message. Si vous ne définissez pas messageBodyName, le corps du message est vide pour les schémas de structure et de mappage.

  • Pour les valeurs de tableau, le connecteur n'accepte que les types de tableaux primitifs. La séquence de valeurs dans le tableau est concaténée en un seul objet ByteString.

Conversion de Pub/Sub à Kafka

Le connecteur source convertit les messages Pub/Sub en enregistrements Kafka comme suit:

  • Clé d'enregistrement Kafka: par défaut, la clé est définie sur null. Vous pouvez éventuellement spécifier un attribut de message Pub/Sub à utiliser comme clé en définissant l'option de configuration kafka.key.attribute. Dans ce cas, le connecteur recherche un attribut portant ce nom et définit la clé d'enregistrement sur la valeur de l'attribut. Si l'attribut spécifié n'est pas présent, la clé d'enregistrement est définie sur null.

  • Valeur de l'enregistrement Kafka Le connecteur écrit la valeur de l'enregistrement comme suit:

    • Si le message Pub/Sub ne comporte aucun attribut personnalisé, le connecteur écrit le corps du message Pub/Sub directement dans la valeur de l'enregistrement Kafka en tant que type byte[], à l'aide du convertisseur spécifié par value.converter.

    • Si le message Pub/Sub comporte des attributs personnalisés et que kafka.record.headers est défini sur false, le connecteur écrit une structure dans la valeur d'enregistrement. La structure contient un champ pour chaque attribut et un champ nommé "message" dont la valeur est le corps du message Pub/Sub (stocké en octets):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      Dans ce cas, vous devez utiliser un value.converter compatible avec les schémas struct, tel que org.apache.kafka.connect.json.JsonConverter.

    • Si le message Pub/Sub comporte des attributs personnalisés et que kafka.record.headers est défini sur true, le connecteur écrit les attributs sous la forme d'en-têtes d'enregistrement Kafka. Il écrit le corps du message Pub/Sub directement dans la valeur de l'enregistrement Kafka en tant que type byte[], à l'aide du convertisseur spécifié par value.converter.

  • En-têtes d'enregistrement Kafka Par défaut, les en-têtes sont vides, sauf si vous définissez kafka.record.headers sur true.

Options de configuration

En plus des configurations fournies par l'API Kafka Connect, le connecteur Kafka de groupe Pub/Sub accepte les configurations suivantes.

Options de configuration du connecteur de récepteur

Le connecteur de récepteur est compatible avec les options de configuration suivantes.

Paramètre Type de données Description
connector.class String Obligatoire. Classe Java du connecteur. La valeur du connecteur de récepteur Pub/Sub doit être com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.
cps.endpoint String

Point de terminaison Pub/Sub à utiliser.

Valeur par défaut : "pubsub.googleapis.com:443"

cps.project String Obligatoire. Le compte Google Cloud qui contient le sujet Pub/Sub.
cps.topic String Obligatoire. Sujet Pub/Sub dans lequel publier les enregistrements Kafka.
gcp.credentials.file.path String Facultatif. Chemin d'accès à un fichier qui stocke les identifiants Google Cloud pour authentifier Pub/Sub Lite.
gcp.credentials.json String Facultatif. Blob JSON contenant Google Cloud pour authentifier Pub/Sub Lite.
headers.publish Boolean

Lorsque la valeur est true, incluez tous les en-têtes d'enregistrements Kafka en tant qu'attributs de message Pub/Sub.

Valeur par défaut : false

maxBufferBytes Long

Nombre maximal d'octets à recevoir sur une partition Kafka de sujet avant de les publier dans Pub/Sub.

Valeur par défaut: 10000000.

maxBufferSize Integer

Nombre maximal d'enregistrements à recevoir sur une partition de sujet Kafka avant de les publier dans Pub/Sub.

Valeur par défaut: 100.

maxDelayThresholdMs Integer

Délai d'attente maximal en millisecondes pour atteindre maxBufferSize ou maxBufferBytes avant de publier des enregistrements en attente dans Pub/Sub.

Valeur par défaut: 100.

maxOutstandingMessages Long

Nombre maximal d'enregistrements pouvant être en attente, y compris les lots incomplets et en attente, avant que l'éditeur ne bloque d'autres publications.

Valeur par défaut : Long.MAX_VALUE

maxOutstandingRequestBytes Long

Nombre maximal d'octets pouvant être en attente, y compris pour les lots incomplets et en attente, avant que l'éditeur ne bloque d'autres publications.

Valeur par défaut : Long.MAX_VALUE

maxRequestTimeoutMs Integer

Délai avant expiration des requêtes de publication individuelles sur Pub/Sub, en millisecondes.

Valeur par défaut: 10 000.

maxTotalTimeoutMs Integer

Délai total, en millisecondes, de publication d'un appel dans Pub/Sub, y compris les nouvelles tentatives.

Valeur par défaut: 60 000.

metadata.publish Boolean

Lorsque la valeur est true, incluez le sujet, la partition, le décalage et l'horodatage Kafka en tant qu'attributs de message Pub/Sub.

Valeur par défaut : false

messageBodyName String

Lorsque vous utilisez un schéma de valeur de structure ou de mappage, spécifie le nom d'un champ ou d'une clé à utiliser comme corps du message Pub/Sub. Consultez la page Conversion de Kafka vers Pub/Sub.

Valeur par défaut : "cps_message_body"

orderingKeySource String

Spécifie comment définir la clé de tri dans le message Pub/Sub. Les valeurs possibles sont les suivantes :

  • none: ne définissez pas la clé de tri.
  • key: utilisez la clé d'enregistrement Kafka comme clé de tri.
  • partition: utilise le numéro de partition, converti en chaîne, comme clé de tri. N'utilisez ce paramètre que pour les sujets à faible débit ou ceux comportant des milliers de partitions.

Valeur par défaut : none

topics String Obligatoire. Liste de sujets Kafka à lire, séparés par une virgule.

Options de configuration du connecteur source

Le connecteur source est compatible avec les options de configuration ci-dessous.

Paramètre Type de données Description
connector.class String Obligatoire. Classe Java du connecteur. Pour le connecteur source Pub/Sub, la valeur doit être com.google.pubsub.kafka.source.CloudPubSubSourceConnector.
cps.endpoint String

Point de terminaison Pub/Sub à utiliser.

Valeur par défaut : "pubsub.googleapis.com:443"

cps.makeOrderingKeyAttribute Boolean

Lorsque la valeur est true, écrivez la clé de tri dans l'enregistrement Kafka, en utilisant le même format que les attributs du message Pub/Sub. Consultez la section Conversion de Pub/Sub en enregistrements Kafka.

Valeur par défaut : false

cps.maxBatchSize Integer

Nombre maximal de messages à traiter par lot par demande d'extraction d'extraction vers Pub/Sub.

Valeur par défaut: 100

cps.project String Obligatoire. Projet Google Cloud qui contient le sujet Pub/Sub.
cps.subscription String Obligatoire. Nom de l'abonnement Pub/Sub à partir duquel extraire les messages.
gcp.credentials.file.path String Facultatif. Chemin d'accès à un fichier qui stocke les identifiants Google Cloud pour authentifier Pub/Sub Lite.
gcp.credentials.json String Facultatif. Blob JSON contenant Google Cloud pour authentifier Pub/Sub Lite.
kafka.key.attribute String

Attribut de message Pub/Sub à utiliser comme clé pour les messages publiés sur Kafka. Si elle est définie sur "orderingKey", utilisez la clé de tri du message. Si la valeur est null, les enregistrements Kafka ne possèdent pas de clé.

Valeur par défaut : null

kafka.partition.count Integer

Nombre de partitions Kafka pour le sujet Kafka dans lequel les messages sont publiés. Ce paramètre est ignoré si le schéma de partition est "kafka_partitioner".

Par défaut : 1.

kafka.partition.scheme String

Schéma d'attribution d'un message à une partition dans Kafka. Il peut s'agir de l'une des valeurs suivantes:

  • round_robin: attribuez des partitions à tour de rôle.
  • hash_key: trouver la partition en hachant la clé d'enregistrement.
  • hash_value: trouver la partition en hachant la valeur de l'enregistrement.
  • kafka_partitioner: déléguez la logique de partitionnement au producteur Kafka. Par défaut, le producteur Kafka détecte automatiquement le nombre de partitions et effectue un mappage de partition basé sur un hachage de type Murmur ou un round robin (à tour de rôle), selon qu'une clé d'enregistrement est fournie ou non.
  • ordering_key: utilise le code de hachage de la clé de tri d'un message. Si aucune clé de tri n'est présente, utilisez round_robin.

Valeur par défaut : round_robin

kafka.record.headers Boolean

Si la valeur est true, écrivez les attributs de message Pub/Sub en tant qu'en-têtes Kafka.

kafka.topic String Obligatoire. Le sujet Kafka qui reçoit les messages de Pub/Sub.

Assistance

Si vous avez besoin d'aide, créez une demande d'assistance. Pour les questions d'ordre général et les discussions, créez un problème dans le dépôt GitHub.

Étapes suivantes