Utiliser Pub/Sub Lite avec l'API Apache Kafka

Cette page explique comment utiliser Pub/Sub Lite Kafka Kafka pour publier des messages et en recevoir de Pub/Sub Lite.

La bibliothèque Java Pub/Sub Lite Kafka est une bibliothèque Java qui facilite la collaboration entre les utilisateurs de la bibliothèque cliente Java Apache Kafka. Pub/Sub Lite. Pour ce faire, mettre en œuvre l'API Producer et l'API Consumer.

Cela est possible, car les sujets Pub/Sub Lite, tels que les sujets Apache Kafka, sont des journaux partitionnés qui suivent la progression du consommateur avec des décalages numériques.

Bien que les deux systèmes soient similaires, il existe quelques différences pratiques notables:

  • Un sujet Pub/Sub Lite équivaut à un sujet Kafka. Cependant, un sujet Lite dispose d'un débit configurable et d'une capacité de stockage sur chaque partition de sujets, tandis que la capacité du sujet Kafka est déterminée par les configurations de cluster Kafka.
  • Un abonnement Pub/Sub Lite équivaut à un groupe de consommateurs Kafka. Un abonnement Lite est une ressource Google Cloud Platform nommée de premier ordre qui représente les messages des partitions de sujet Lite à partir desquelles les abonnés peuvent lire. De même, un groupe de consommateurs Kafka comprend des consommateurs pouvant lire des données à partir de partitions d'un sujet Kafka.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder au sélecteur de projet

  3. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Activez l'API Pub/Sub Lite.

    Activer l'API

  5. Créez un compte de service :

    1. Dans Cloud Console, accédez à la page Créer un compte de service.

      Accéder à la page "Créer un compte de service"
    2. Sélectionnez un projet.
    3. Dans le champ Nom du compte de service, saisissez un nom. Cloud Console remplit le champ ID du compte de service en fonction de ce nom.

      Dans le champ Description du compte de service, saisissez une description. Exemple : Service account for quickstart.

    4. Cliquez sur Create (Créer).
    5. Cliquez sur le champ Sélectionner un rôle.

      Dans la section Accès rapide, cliquez sur Basique, puis sur Propriétaire.

    6. Cliquez sur Continuer.
    7. Cliquez sur OK pour terminer la création du compte de service.

      Ne fermez pas la fenêtre de votre navigateur. Vous en aurez besoin lors de la tâche suivante.

  6. Créez une clé de compte de service :

    1. Dans Cloud Console, cliquez sur l'adresse e-mail du compte de service que vous avez créé.
    2. Cliquez sur Clés.
    3. Cliquez sur Ajouter une clé, puis sur Créer une clé.
    4. Cliquez sur Create (Créer). Un fichier de clé JSON est téléchargé sur votre ordinateur.
    5. Cliquez sur Close (Fermer).
  7. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour qu'elle pointe vers le chemin du fichier JSON contenant la clé de votre compte de service. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

Installer le module

Java

Si vous utilisez Maven sans BOM, ajoutez les éléments suivants à vos dépendances :

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>pubsublite-kafka</artifactId>
  <version>0.4.0</version>
</dependency>

Si vous utilisez Gradle, ajoutez les éléments suivants à vos dépendances :

compile 'com.google.cloud:pubsublite-kafka:0.4.0'

Si vous utilisez sbt, ajoutez les éléments suivants à vos dépendances :

libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "0.4.0"

Si vous utilisez IntelliJ ou Eclipse, vous pouvez ajouter des bibliothèques clientes à votre projet à l'aide des plug-ins IDE suivants :

Les plug-ins offrent des fonctionnalités supplémentaires, telles que la gestion des clés pour les comptes de service. Reportez-vous à la documentation de chaque plug-in pour plus de détails.

Créer un sujet Pub/Sub Lite

Créez un sujet Lite en procédant comme suit:

  1. Dans Cloud Console, accédez à la page Sujets Lite.

    Accéder à la page Sujets Lite

  2. Cliquez sur Créer un sujet Lite.

  3. Sélectionnez une région et une zone.

  4. Dans la section Nom, saisissez your-lite-topic comme ID du sujet Lite. Le nom du sujet Lite inclut l'ID du sujet Lite, la zone et le numéro du projet.

  5. Cliquez sur Create (Créer).

Créer un abonnement Pub/Sub Lite

Pour créer un abonnement Lite, procédez comme suit:

  1. Dans Cloud Console, accédez à la page Abonnements Lite.

    Accéder à la page "Abonnements Lite"

  2. Cliquez sur Créer un abonnement Lite.

  3. Dans le champ ID de l'abonnement Lite, saisissez your-lite-subscription.

  4. Sélectionnez un sujet Lite pour recevoir des messages de celui-ci.

  5. Dans la section Critère de distribution, sélectionnez Distribuer les messages après leur stockage.

  6. Cliquez sur Create (Créer).

L'abonnement Lite se trouve dans la même zone que le sujet Lite.

Publier des messages dans Pub/Sub Lite

Créez un producteur Pub/Sub Lite pour envoyer des données à votre sujet Lite. Cette classe met en œuvre org.apache.kafka.clients.Producer, la même interface que KafkaProducer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic.
    String topicId = "your-topic-id";
    // Using the project number is required for constructing a Pub/Sub Lite
    // topic path that the Kafka producer can use.
    long projectNumber = Long.parseLong("123456789");

    producerExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void producerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws InterruptedException, ExecutionException {
    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    ProducerSettings producerSettings =
        ProducerSettings.newBuilder().setTopicPath(topicPath).build();

    List<Future<RecordMetadata>> futures = new ArrayList<>();
    try (Producer<byte[], byte[]> producer = producerSettings.instantiate()) {
      for (long i = 0L; i < 10L; i++) {
        String key = "demo";
        Future<RecordMetadata> future =
            producer.send(
                new ProducerRecord(
                    topicPath.toString(), key.getBytes(), ("message-" + i).getBytes()));
        futures.add(future);
      }
      for (Future<RecordMetadata> future : futures) {
        RecordMetadata meta = future.get();
        System.out.println(meta.offset());
      }
    }
    System.out.printf("Published 10 messages to %s%n", topicPath.toString());
  }
}

Recevoir des messages de Pub/Sub Lite

Créez un consommateur Pub/Sub Lite pour recevoir des données de votre abonnement Lite. Cette classe met en œuvre org.apache.kafka.clients.consumer.Consumer, la même interface que KafkaConsumer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class ConsumerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic and subscription.
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    // Using the project number here is required for constructing a Pub/Sub Lite
    // topic path that the Kafka consumer can use.
    long projectNumber = Long.parseLong("123456789");

    consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
  }

  public static void consumerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {

    CloudZone location = CloudZone.of(CloudRegion.of(cloudRegion), zoneId);

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    SubscriptionPath subscription =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(50 * 1024 * 1024L)
            // 10,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(10000L)
            .build();

    ConsumerSettings settings =
        ConsumerSettings.newBuilder()
            .setSubscriptionPath(subscription)
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .setAutocommit(true)
            .build();

    Set<ConsumerRecord<byte[], byte[]>> hashSet = new HashSet<>();
    try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
      // The consumer can only subscribe to the topic that it is associated to.
      // If this is the only subscriber for this subscription, it will take up
      // to 90s for the subscriber to warm up.
      consumer.subscribe(Arrays.asList(topicPath.toString()));
      while (true) {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          long offset = record.offset();
          String value = Base64.getEncoder().encodeToString(record.value());
          hashSet.add(record);
          System.out.printf("Received %s: %s%n", offset, value);
        }
        // Early exit. Remove entirely to keep the consumer alive indefinitely.
        if (hashSet.size() >= 10) {
          System.out.println("Received 10 messages.");
          break;
        }
      }
    }
  }
}

Nettoyer

Pour éviter que les ressources utilisées dans ce guide de démarrage rapide soient facturées sur votre compte Google Cloud, procédez comme suit :

  1. Dans Cloud Console, accédez à la page Sujets Lite.

    Accéder à la page Sujets Lite

  2. Cliquez sur your-lite-topic.

  3. Sur la page Détails du sujet Lite, cliquez sur Supprimer.

  4. Dans le champ qui s'affiche, saisissez delete pour confirmer la suppression du sujet Lite.

  5. Cliquez sur Supprimer.

  6. Répétez ces étapes pour votre abonnement Lite.

Étape suivante