Connecter Pub/Sub à Apache Kafka

Ce document explique comment intégrer Apache Kafka et Pub/Sub à l'aide du connecteur Kafka du groupe Pub/Sub.

À propos du connecteur Kafka pour les groupes Pub/Sub

Apache Kafka est une plate-forme Open Source pour les événements de traitement en flux continu. Il est couramment utilisé dans les architectures distribuées pour permettre la communication entre des composants faiblement couplés. Pub/Sub est un service géré permettant d'envoyer et de recevoir des messages de manière asynchrone. Comme avec Kafka, vous pouvez utiliser Pub/Sub pour communiquer entre les composants de votre architecture cloud.

Le connecteur Kafka du groupe Pub/Sub vous permet d'intégrer ces deux systèmes. Les connecteurs suivants sont empaquetés dans le fichier JAR du connecteur:

  • Le connecteur de récepteur lit les enregistrements d'un ou de plusieurs sujets Kafka et les publie sur Pub/Sub.
  • Le connecteur source lit les messages d'un sujet Pub/Sub et les publie dans Kafka.

Voici quelques scénarios dans lesquels vous pouvez utiliser le connecteur Pub/Sub Group Kafka:

  • Vous migrez une architecture basée sur Kafka vers Google Cloud.
  • Vous disposez d'un système de frontend qui stocke des événements dans Kafka en dehors de Google Cloud, mais vous utilisez également Google Cloud pour exécuter certains de vos services backend, qui doivent recevoir les événements Kafka.
  • Vous collectez les journaux d'une solution Kafka sur site et les envoyez à Google Cloud pour l'analyse des données.
  • Vous disposez d'un système de frontend qui utilise Google Cloud, mais vous stockez également des données sur site à l'aide de Kafka.

Le connecteur nécessite Kafka Connect, un framework permettant de diffuser des données en flux continu entre Kafka et d'autres systèmes. Pour utiliser le connecteur, vous devez exécuter Kafka Connect avec votre cluster Kafka.

Dans ce document, nous partons du principe que vous connaissez Kafka et Pub/Sub. Avant de lire ce document, nous vous recommandons de suivre l'un des guides de démarrage rapide de Pub/Sub.

Le connecteur Pub/Sub n'est pas compatible avec l'intégration entre Google Cloud IAM et les LCA Kafka Connect.

Premiers pas avec le connecteur

Cette section vous explique comment effectuer les tâches suivantes:

  1. Configurez le connecteur Kafka du groupe Pub/Sub.
  2. Envoyez des événements de Kafka à Pub/Sub.
  3. Envoyer des messages de Pub/Sub vers Kafka

Prérequis

Installer Kafka

Suivez le démarrage rapide Apache Kafka pour installer Kafka à nœud unique sur votre machine locale. Suivez les étapes du guide de démarrage rapide:

  1. Téléchargez la dernière version de Kafka et extrayez-la.
  2. Démarrez l'environnement Kafka.
  3. Créez un sujet Kafka.

Authentifier

Le connecteur Kafka du groupe Pub/Sub doit s'authentifier auprès de Pub/Sub pour envoyer et recevoir des messages Pub/Sub. Pour configurer l'authentification, procédez comme suit:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your user account:

    gcloud auth application-default login
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your user account:

    gcloud auth application-default login
  11. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Télécharger le fichier JAR du connecteur

Téléchargez le fichier JAR du connecteur sur votre ordinateur local. Pour en savoir plus, consultez la section Acquérir le connecteur dans le fichier de documentation GitHub.

Copier les fichiers de configuration du connecteur

  1. Clonez ou téléchargez le dépôt GitHub du connecteur.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copiez le contenu du répertoire config dans le sous-répertoire config de votre installation Kafka.

    cp config/* [path to Kafka installation]/config/
    

Ces fichiers contiennent les paramètres de configuration du connecteur.

Mettre à jour votre configuration Kafka Connect

  1. Accédez au répertoire contenant le binaire Kafka Connect que vous avez téléchargé.
  2. Dans le répertoire binaire de Kafka Connect, ouvrez le fichier nommé config/connect-standalone.properties dans un éditeur de texte.
  3. Si plugin.path property est commenté, supprimez la mise en commentaire.
  4. Mettez à jour plugin.path property pour inclure le chemin d'accès au fichier JAR du connecteur.

    Exemple :

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Définissez la propriété offset.storage.file.filename sur un nom de fichier local. En mode autonome, Kafka utilise ce fichier pour stocker les données de décalage.

    Exemple :

    offset.storage.file.filename=/tmp/connect.offsets
    

Transférer des événements de Kafka vers Pub/Sub

Cette section explique comment démarrer le connecteur de récepteur, publier des événements dans Kafka, puis lire les messages transférés depuis Pub/Sub.

  1. Utilisez Google Cloud CLI pour créer un sujet Pub/Sub avec un abonnement.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    Remplacez les éléments suivants :

    • PUBSUB_TOPIC: nom d'un sujet Pub/Sub pour recevoir les messages de Kafka.
    • PUBSUB_SUBSCRIPTION: nom d'un abonnement Pub/Sub pour le sujet.
  2. Ouvrez le fichier /config/cps-sink-connector.properties dans un éditeur de texte. Ajoutez des valeurs pour les propriétés suivantes, qui sont marquées "TODO" dans les commentaires:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC

    Remplacez les éléments suivants :

    • KAFKA_TOPICS: liste de sujets Kafka à lire, séparés par une virgule.
    • PROJECT_ID: projet Google Cloud contenant votre sujet Pub/Sub.
    • PUBSUB_TOPIC: sujet Pub/Sub pour recevoir les messages de Kafka.
  3. Dans le répertoire Kafka, exécutez la commande suivante:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Suivez les étapes du guide de démarrage rapide Apache Kafka pour écrire des événements dans votre sujet Kafka.

  5. Utilisez la gcloud CLI pour lire les événements de Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

Transférer des messages de Pub/Sub vers Kafka

Cette section explique comment démarrer le connecteur source, publier des messages dans Pub/Sub et lire les messages transférés à partir de Kafka.

  1. Utilisez la gcloud CLI pour créer un sujet Pub/Sub avec un abonnement.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    Remplacez les éléments suivants :

    • PUBSUB_TOPIC: nom d'un sujet Pub/Sub.
    • PUBSUB_SUBSCRIPTION: nom d'un abonnement Pub/Sub.
  2. Ouvrez le fichier nommé /config/cps-source-connector.properties dans un éditeur de texte. Ajoutez des valeurs pour les propriétés suivantes, qui sont marquées "TODO" dans les commentaires:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION

    Remplacez les éléments suivants :

    • KAFKA_TOPIC: sujets Kafka à recevoir les messages Pub/Sub.
    • PROJECT_ID: projet Google Cloud contenant votre sujet Pub/Sub.
    • PUBSUB_TOPIC: sujet Pub/Sub.
  3. Dans le répertoire Kafka, exécutez la commande suivante:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Utilisez la gcloud CLI pour publier un message dans Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
  5. Lire le message de Kafka. Suivez les étapes du guide de démarrage rapide d'Apache Kafka pour lire les messages du sujet Kafka.

Conversion par SMS

Un enregistrement Kafka contient une clé et une valeur, qui sont des tableaux d'octets de longueur variable. Un enregistrement Kafka peut également comporter des en-têtes, qui sont des paires clé-valeur. Un message Pub/Sub comporte deux parties principales: le corps du message et zéro ou plusieurs attributs clé-valeur.

Kafka Connect utilise des convertisseurs pour sérialiser les clés et les valeurs vers et depuis Kafka. Pour contrôler la sérialisation, définissez les propriétés suivantes dans les fichiers de configuration du connecteur:

  • key.converter: convertisseur utilisé pour sérialiser les clés d'enregistrement.
  • value.converter: convertisseur utilisé pour sérialiser les valeurs d'enregistrement.

Le corps d'un message Pub/Sub est un objet ByteString. La conversion la plus efficace consiste donc à copier directement la charge utile. Pour cette raison, nous vous recommandons d'utiliser un convertisseur qui produit des types de données primitifs (entier, flottant, chaîne ou schéma d'octets) dans la mesure du possible, afin d'éviter de désérialiser et de résérialiser le même corps de message.

Conversion de Kafka vers Pub/Sub

Le connecteur de récepteur convertit les enregistrements Kafka en messages Pub/Sub comme suit:

  • La clé d'enregistrement Kafka est stockée sous la forme d'un attribut nommé "key" dans le message Pub/Sub.
  • Par défaut, le connecteur supprime tous les en-têtes de l'enregistrement Kafka. Toutefois, si vous définissez l'option de configuration headers.publish sur true, le connecteur écrit les en-têtes en tant qu'attributs Pub/Sub. Le connecteur ignore les en-têtes qui dépassent les limites de Pub/Sub concernant les attributs de message.
  • Pour les schémas d'entiers, de nombres à virgule flottante, de chaînes et d'octets, le connecteur transmet les octets de la valeur de l'enregistrement Kafka directement dans le corps du message Pub/Sub.
  • Pour les schémas struct, le connecteur écrit chaque champ en tant qu'attribut du message Pub/Sub. Par exemple, si le champ est { "id"=123 }, le message Pub/Sub obtenu comporte un attribut "id"="123". La valeur du champ est toujours convertie en chaîne. Les types de mappage et de structure ne sont pas acceptés comme types de champ dans une structure.
  • Pour les schémas de mappage, le connecteur écrit chaque paire clé-valeur en tant qu'attribut du message Pub/Sub. Par exemple, si la carte est {"alice"=1,"bob"=2}, le message Pub/Sub obtenu comporte deux attributs, "alice"="1" et "bob"="2". Les clés et les valeurs sont converties en chaînes.

Les schémas struct et map présentent certains comportements supplémentaires:

  • Si vous le souhaitez, vous pouvez spécifier un champ de struct ou une clé de mappage particulier comme corps du message en définissant la propriété de configuration messageBodyName. La valeur du champ ou de la clé est stockée sous la forme d'un ByteString dans le corps du message. Si vous ne définissez pas messageBodyName, le corps du message est vide pour les schémas de struct et de map.

  • Pour les valeurs de tableau, le connecteur n'accepte que les types de tableaux primitifs. La séquence de valeurs du tableau est concaténée dans un seul objet ByteString.

Conversion de Pub/Sub vers Kafka

Le connecteur source convertit les messages Pub/Sub en enregistrements Kafka comme suit:

  • Clé d'enregistrement Kafka: par défaut, la clé est définie sur null. Vous pouvez éventuellement spécifier un attribut de message Pub/Sub à utiliser comme clé en définissant l'option de configuration kafka.key.attribute. Dans ce cas, le connecteur recherche un attribut portant ce nom et définit la clé d'enregistrement sur la valeur de l'attribut. Si l'attribut spécifié n'est pas présent, la clé d'enregistrement est définie sur null.

  • Valeur de l'enregistrement Kafka Le connecteur écrit la valeur de l'enregistrement comme suit:

    • Si le message Pub/Sub ne comporte pas d'attributs personnalisés, le connecteur écrit le corps du message Pub/Sub directement dans la valeur de l'enregistrement Kafka en tant que type byte[], à l'aide du convertisseur spécifié par value.converter.

    • Si le message Pub/Sub comporte des attributs personnalisés et que kafka.record.headers est false, le connecteur écrit une struct dans la valeur de l'enregistrement. La structure contient un champ pour chaque attribut et un champ nommé "message" dont la valeur est le corps du message Pub/Sub (stocké sous forme d'octets):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      Dans ce cas, vous devez utiliser un value.converter compatible avec les schémas struct, tels que org.apache.kafka.connect.json.JsonConverter.

    • Si le message Pub/Sub comporte des attributs personnalisés et que kafka.record.headers est true, le connecteur écrit les attributs en tant qu'en-têtes d'enregistrement Kafka. Il écrit le corps du message Pub/Sub directement dans la valeur d'enregistrement Kafka en tant que type byte[], à l'aide du convertisseur spécifié par value.converter.

  • En-têtes d'enregistrement Kafka Par défaut, les en-têtes sont vides, sauf si vous définissez kafka.record.headers sur true.

Options de configuration

En plus des configurations fournies par l'API Kafka Connect, le connecteur Kafka du groupe Pub/Sub prend en charge la configuration du récepteur et de la source, comme décrit dans les configurations du connecteur Pub/Sub.

Assistance

Si vous avez besoin d'aide, créez une demande d'assistance. Pour les questions et discussions générales, créez un problème dans le dépôt GitHub.

Étape suivante