Connecter Pub/Sub Lite à Apache Kafka

Ce document explique comment intégrer Apache Kafka et Pub/Sub Lite à l'aide du connecteur Kafka de groupe Pub/Sub.

À propos du connecteur Kafka de groupe Pub/Sub

Apache Kafka est une plate-forme Open Source de streaming d'événements. Il est couramment utilisé dans les architectures distribuées pour permettre la communication entre des composants faiblement couplés. Pub/Sub Lite est un service géré permettant d'envoyer et de recevoir des messages de manière asynchrone. Comme avec Kafka, vous pouvez utiliser Pub/Sub Lite pour communiquer entre les composants de votre architecture cloud.

Le connecteur Kafka de groupe Pub/Sub vous permet d'intégrer ces deux systèmes. Les connecteurs suivants sont inclus dans le fichier JAR du connecteur:

  • Le connecteur de récepteur lit les enregistrements d'un ou plusieurs sujets Kafka et les publie dans Pub/Sub Lite.
  • Le connecteur source lit les messages d'un sujet Pub/Sub Lite et les publie dans Kafka.

Voici quelques scénarios dans lesquels vous pouvez utiliser le connecteur Kafka de groupe Pub/Sub:

  • Vous migrez une architecture basée sur Kafka vers Google Cloud.
  • Vous disposez d'un système frontend qui stocke les événements dans Kafka en dehors de Google Cloud, mais vous utilisez également Google Cloud pour exécuter certains de vos services de backend, qui doivent recevoir les événements Kafka.
  • Vous collectez des journaux à partir d'une solution Kafka sur site, puis vous les envoyez à Google Cloud à des fins d'analyse de données.
  • Vous disposez d'un système frontend qui utilise Google Cloud, mais vous stockez également des données sur site à l'aide de Kafka.

Le connecteur nécessite Kafka Connect, un framework permettant de diffuser des données entre Kafka et d'autres systèmes. Pour utiliser le connecteur, vous devez exécuter Kafka Connect en même temps que votre cluster Kafka.

Dans ce document, nous partons du principe que vous connaissez à la fois Kafka et Pub/Sub Lite. Pour commencer à utiliser Pub/Sub Lite, consultez la page Publier et recevoir des messages dans Pub/Sub Lite à l'aide de la console Google Cloud.

Premiers pas avec le connecteur Kafka de groupe Pub/Sub

Cette section vous guide tout au long des tâches suivantes:

  1. Configurer le connecteur Kafka du groupe Pub/Sub
  2. Envoyer des événements depuis Kafka vers Pub/Sub Lite
  3. Envoyez des messages depuis Pub/Sub Lite vers Kafka.

Prérequis

Installer Kafka

Suivez le guide de démarrage rapide d'Apache Kafka pour installer une solution Kafka à nœud unique sur votre machine locale. Effectuez les étapes suivantes dans le guide de démarrage rapide:

  1. Téléchargez la dernière version de Kafka et extrayez-la.
  2. Démarrez l'environnement Kafka.
  3. Créez un sujet Kafka.

Authentifier

Le connecteur Kafka du groupe Pub/Sub doit s'authentifier auprès de Pub/Sub pour envoyer et recevoir des messages Pub/Sub. Pour configurer l'authentification, procédez comme suit:

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Installez Google Cloud CLI.
  3. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  4. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  5. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login
  6. Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • en remplaçant PROJECT_ID par l'ID de votre projet :
    • Remplacez EMAIL_ADDRESS par votre adresse e-mail.
    • Remplacez ROLE par chaque rôle individuel.
  7. Installez Google Cloud CLI.
  8. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  9. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  10. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login
  11. Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • en remplaçant PROJECT_ID par l'ID de votre projet :
    • Remplacez EMAIL_ADDRESS par votre adresse e-mail.
    • Remplacez ROLE par chaque rôle individuel.

Télécharger le fichier JAR du connecteur

Téléchargez le fichier JAR du connecteur sur votre ordinateur local. Pour en savoir plus, consultez la section Acquérir le connecteur dans le fichier README de GitHub.

Copier les fichiers de configuration du connecteur

  1. Clonez ou téléchargez le dépôt GitHub pour le connecteur.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copiez le contenu du répertoire config dans le sous-répertoire config de votre installation de Kafka.

    cp config/* [path to Kafka installation]/config/
    

Ces fichiers contiennent les paramètres de configuration du connecteur.

Mettre à jour votre configuration Kafka Connect

  1. Accédez au répertoire contenant le binaire Kafka Connect que vous avez téléchargé.
  2. Dans le répertoire binaire Kafka Connect, ouvrez le fichier nommé config/connect-standalone.properties dans un éditeur de texte.
  3. Si l'élément plugin.path property est mis en commentaire, annulez la mise en commentaire.
  4. Mettez à jour plugin.path property pour inclure le chemin d'accès au fichier JAR du connecteur.

    Exemple :

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Définissez la propriété offset.storage.file.filename sur un nom de fichier local. En mode autonome, Kafka utilise ce fichier pour stocker les données de décalage.

    Exemple :

    offset.storage.file.filename=/tmp/connect.offsets
    

Transférer des événements de Kafka vers Pub/Sub Lite

Cette section explique comment démarrer le connecteur de récepteur, publier des événements dans Kafka, puis lire les messages transférés à partir de Pub/Sub Lite.

  1. Utilisez la Google Cloud CLI pour créer une réservation Pub/Sub Lite.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    Remplacez les éléments suivants :

    • RESERVATION_NAME: nom de la réservation Pub/Sub Lite.
    • LOCATION: emplacement de la réservation.
  2. Utilisez la Google Cloud CLI pour créer un sujet Pub/Sub Lite avec un abonnement.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    Remplacez les éléments suivants :

    • LITE_TOPIC: nom du sujet Pub/Sub Lite destiné à recevoir des messages de Kafka.
    • LOCATION: emplacement du sujet La valeur doit correspondre à l'emplacement de la réservation.
    • RESERVATION_NAME: nom de la réservation Pub/Sub Lite.
    • LITE_SUBSCRIPTION: nom d'un abonnement Pub/Sub Lite pour le sujet.
  3. Ouvrez le fichier /config/pubsub-lite-sink-connector.properties dans un éditeur de texte. Ajoutez des valeurs pour les propriétés suivantes, qui sont marquées comme "TODO" dans les commentaires:

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC
    

    Remplacez les éléments suivants :

    • KAFKA_TOPICS: liste de sujets Kafka à lire, séparés par une virgule.
    • PROJECT_ID: projet Google Cloud contenant votre sujet Pub/Sub Lite.
    • LOCATION: emplacement du sujet Pub/Sub Lite.
    • LITE_TOPIC: sujet Pub/Sub Lite destiné à recevoir les messages de Kafka.
  4. À partir du répertoire Kafka, exécutez la commande suivante:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. Suivez les étapes du guide de démarrage rapide d'Apache Kafka pour écrire des événements dans votre sujet Kafka.

  6. Abonnez-vous à l'abonnement Pub/Sub Lite à l'aide de l'une des méthodes présentées dans la section Recevoir des messages d'abonnements Lite.

Transférer des messages de Pub/Sub Lite vers Kafka

Cette section explique comment démarrer le connecteur source, publier des messages sur Pub/Sub Lite et lire les messages transférés à partir de Kafka.

  1. Utilisez la Google Cloud CLI pour créer une réservation Pub/Sub Lite.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    Remplacez les éléments suivants :

    • RESERVATION_NAME: nom de la réservation Pub/Sub Lite.
    • LOCATION: emplacement de la réservation.
  2. Utilisez la Google Cloud CLI pour créer un sujet Pub/Sub Lite avec un abonnement.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    Remplacez les éléments suivants :

    • LITE_TOPIC: nom du sujet Pub/Sub Lite.
    • LOCATION: emplacement du sujet La valeur doit correspondre à l'emplacement de la réservation.
    • RESERVATION_NAME: nom de la réservation Pub/Sub Lite.
    • LITE_SUBSCRIPTION: nom d'un abonnement Pub/Sub Lite pour le sujet.
  3. Ouvrez le fichier nommé /config/pubsub-lite-source-connector.properties dans un éditeur de texte. Ajoutez des valeurs pour les propriétés suivantes, qui sont marquées "TODO" dans les commentaires:

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION
    

    Remplacez les éléments suivants :

    • KAFKA_TOPIC: sujets Kafka qui doivent recevoir les messages Pub/Sub.
    • PROJECT_ID: projet Google Cloud contenant votre sujet Pub/Sub.
    • LOCATION: emplacement du sujet Pub/Sub Lite.
    • LITE_SUBSCRIPTION: sujet Pub/Sub Lite
  4. À partir du répertoire Kafka, exécutez la commande suivante:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. Publiez des messages dans le sujet Pub/Sub Lite à l'aide de l'une des méthodes présentées dans la section Publier des messages dans des sujets Lite.

  6. Lisez le message de Kafka. Suivez les étapes du guide de démarrage rapide d'Apache Kafka pour lire les messages du sujet Kafka.

Conversion par SMS

Un enregistrement Kafka contient une clé et une valeur, qui sont des tableaux d'octets de longueur variable. Un enregistrement Kafka peut également comporter des en-têtes, qui sont des paires clé/valeur. Un message Pub/Sub Lite comporte les champs suivants:

  • key: clé Message (bytes)
  • data: données du message (bytes)
  • attributes: zéro, un ou plusieurs attributs. Chaque attribut est une carte (key,values[]). Un même attribut peut avoir plusieurs valeurs.
  • event_time: code temporel d'événement facultatif fourni par l'utilisateur.

Kafka Connect utilise des convertisseurs pour sérialiser les clés et les valeurs vers et depuis Kafka. Pour contrôler la sérialisation, définissez les propriétés suivantes dans les fichiers de configuration du connecteur:

  • key.converter: convertisseur utilisé pour sérialiser les clés d'enregistrement.
  • value.converter: convertisseur utilisé pour sérialiser les valeurs d'enregistrement.

Conversion de Kafka vers Pub/Sub Lite

Le connecteur de récepteur convertit les enregistrements Kafka en messages Pub/Sub Lite comme suit.

Enregistrement Kafka (SinkRecord) Message Pub/Sub Lite
Clé key
Valeur data
En-têtes attributes
Code temporel eventTime
Type d'horodatage attributes["x-goog-pubsublite-source-kafka-event-time-type"]
Thème attributes["x-goog-pubsublite-source-kafka-topic"]
Partition attributes["x-goog-pubsublite-source-kafka-offset"]
Décalage (offset) attributes["x-goog-pubsublite-source-kafka-partition"]

Les clés, les valeurs et les en-têtes sont encodés comme suit:

  • Les schémas Null sont traités comme des schémas de chaîne.
  • Les charges utiles d'octets sont écrites directement sans conversion.
  • Les charges utiles de chaîne, d'entiers et à virgule flottante sont encodées dans une séquence d'octets UTF-8.
  • Toutes les autres charges utiles sont encodées dans un type de tampon de protocole Value, puis converties en chaîne d'octets.
    • Les champs de chaîne imbriqués sont encodés dans un tampon de protocole Value.
    • Les champs d'octets imbriqués sont encodés dans un tampon de protocole Value contenant les octets encodés en base64.
    • Les champs numériques imbriqués sont encodés en double dans un tampon de protocole Value.
    • Les cartes comportant des clés de tableau, de carte ou de structure ne sont pas acceptées.

Conversion de Pub/Sub Lite vers Kafka

Le connecteur source convertit les messages Pub/Sub Lite en enregistrements Kafka comme suit:

Message Pub/Sub Lite Enregistrement Kafka (SourceRecord)
key Clé
data Valeur
attributes En-têtes
event_time Timestamp. Si event_time n'est pas présent, l'heure de publication est utilisée.

Options de configuration

En plus des configurations fournies par l'API Kafka Connect, le connecteur accepte les configurations Pub/Sub Lite suivantes.

Options de configuration du connecteur de récepteur

Le connecteur de récepteur accepte les options de configuration suivantes.

Paramètre Type de données Description
connector.class String Obligatoire. Classe Java du connecteur. Pour le connecteur de récepteur Pub/Sub Lite, la valeur doit être com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector.
gcp.credentials.file.path String Facultatif. Chemin d'accès à un fichier qui stocke les identifiants Google Cloud pour authentifier Pub/Sub Lite.
gcp.credentials.json String Facultatif. Blob JSON contenant Google Cloud pour l'authentification de Pub/Sub Lite.
pubsublite.location String Obligatoire. L'emplacement du sujet Pub/Sub Lite.
pubsublite.project String Obligatoire. Le compte Google Cloud contenant le sujet Pub/Sub Lite.
pubsublite.topic String Obligatoire. Sujet Pub/Sub Lite dans lequel publier les enregistrements Kafka.
topics String Obligatoire. Liste de sujets Kafka à lire, séparés par une virgule.

Options de configuration du connecteur source

Le connecteur source accepte les options de configuration suivantes.

Paramètre Type de données Description
connector.class String Obligatoire. Classe Java du connecteur. Pour le connecteur source Pub/Sub Lite, la valeur doit être com.google.pubsublite.kafka.source.PubSubLiteSourceConnector.
gcp.credentials.file.path String Facultatif. Chemin d'accès à un fichier qui stocke les identifiants Google Cloud pour authentifier Pub/Sub Lite.
gcp.credentials.json String Facultatif. Blob JSON contenant Google Cloud pour l'authentification de Pub/Sub Lite.
kafka.topic String Obligatoire. Sujet Kafka qui reçoit les messages de Pub/Sub Lite.
pubsublite.location String Obligatoire. L'emplacement du sujet Pub/Sub Lite.
pubsublite.partition_flow_control.bytes Long

Nombre maximal d'octets en attente par partition Pub/Sub Lite.

Valeur par défaut: 20 000 000

pubsublite.partition_flow_control.messages Long

Nombre maximal de messages en attente par partition Pub/Sub Lite.

Valeur par défaut : Long.MAX_VALUE

pubsublite.project String Obligatoire. Le projet Google Cloud contenant le sujet Pub/Sub Lite.
pubsublite.subscription String Obligatoire. Nom de l'abonnement Pub/Sub Lite à partir duquel extraire les messages.

Étapes suivantes