Guide de démarrage rapide: publier et recevoir des messages dans Pub/Sub Lite à l'aide de l'API Apache Kafka

Publier et recevoir des messages dans Pub/Sub Lite à l'aide de l'API Apache Kafka

Cette page explique comment utiliser Pub/Sub Lite Kafka Shim pour publier des messages sur Pub/Sub Lite et en recevoir.

Pub/Sub Lite Kafka Shim est une bibliothèque Java qui facilite l'utilisation de la bibliothèque cliente Java Apache Kafka avec Pub/Sub Lite. Pour ce faire, vous devez mettre en œuvre l'API Producer et l'API Consumer.

Cela est possible car les sujets Pub/Sub Lite, tels que les sujets Apache Kafka, sont des journaux partitionnés qui suivent la progression de la clientèle avec des décalages numériques.

Bien que les deux systèmes soient similaires, il existe quelques différences pratiques notables :

  • Un sujet Pub/Sub Lite est équivalent à un sujet Kafka. Cependant, un sujet Lite dispose d'un débit et d'une capacité de stockage configurables sur chaque partition de sujet, tandis que la capacité de sujet Kafka est déterminée par des configurations de cluster Kafka.
  • Un abonnement Pub/Sub Lite équivaut à un groupe de consommateurs Kafka. Un abonnement Lite est une ressource nommée Google Cloud Platform qui est un premier ordre, représentant les messages issus de partitions de sujet Lite que les abonnés peuvent lire. De même, un groupe de consommateurs Kafka est constitué de clients pouvant lire les données des partitions d'un sujet Kafka.

Avant de commencer

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. Installez Google Cloud CLI.
  3. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  4. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  5. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  6. Activer l'API Pub/Sub Lite :

    gcloud services enable pubsublite.googleapis.com
  7. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login
  8. Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • en remplaçant PROJECT_ID par l'ID de votre projet :
    • Remplacez EMAIL_ADDRESS par votre adresse e-mail.
    • Remplacez ROLE par chaque rôle individuel.
  9. Installez Google Cloud CLI.
  10. Pour initialiser gcloudCLI, exécutez la commande suivante :

    gcloud init
  11. Créez ou sélectionnez un projet Google Cloud.

    • Créez un projet Google Cloud :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par le nom du projet Google Cloud que vous créez.

    • Sélectionnez le projet Google Cloud que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre projet Google Cloud.

  12. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  13. Activer l'API Pub/Sub Lite :

    gcloud services enable pubsublite.googleapis.com
  14. Créez des identifiants d'authentification locaux pour votre compte Google :

    gcloud auth application-default login
  15. Attribuez des rôles à votre compte Google. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants : roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • en remplaçant PROJECT_ID par l'ID de votre projet :
    • Remplacez EMAIL_ADDRESS par votre adresse e-mail.
    • Remplacez ROLE par chaque rôle individuel.

Installer la bibliothèque cliente

Les exemples suivants vous montrent comment installer les bibliothèques clientes pubsublite-kafka.

Java

Si vous utilisez Maven sans BOM, ajoutez les éléments suivants à vos dépendances :

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>pubsublite-kafka</artifactId>
  <version>1.1.1</version>
</dependency>

Si vous utilisez Gradle, ajoutez les éléments suivants à vos dépendances :

implementation 'com.google.cloud:pubsublite-kafka:1.1.2'

Si vous utilisez sbt, ajoutez les éléments suivants à vos dépendances :

libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.1.2"

Si vous utilisez Visual Studio Code, IntelliJ ou Eclipse, vous pouvez ajouter des bibliothèques clientes à votre projet à l'aide des plug-ins IDE suivants :

Les plug-ins offrent des fonctionnalités supplémentaires, telles que la gestion des clés pour les comptes de service. Reportez-vous à la documentation de chaque plug-in pour plus de détails.

Créer un sujet et un abonnement Lite

Créez un sujet et un abonnement Lite à l'aide des commandes suivantes.

gcloud pubsub lite-topics create LITE_TOPIC_NAME \
    --location=LITE_LOCATION \
    --partitions=1 \
    --per-partition-bytes=30GiB

gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION_NAME \
    --location=LITE_LOCATION \
    --topic=LITE_TOPIC_NAME \
    --starting-offset=end \
    --delivery-requirement=deliver-after-stored

Remplacez les éléments suivants :

  • LITE_TOPIC_NAME: nom de votre nouveau sujet Lite.
  • LITE_SUBSCRIPTION_NAME: nom de votre nouvel abonnement Lite.
  • LITE_LOCATION: l'emplacement dans lequel vous créez votre sujet Lite et votre abonnement Lite. Choisissez un emplacement Pub/Sub Lite compatible. Spécifiez également une zone pour la région. Exemple : us-central1-a.

Publier des messages dans Pub/Sub Lite

Créez un producteur Pub/Sub Lite pour envoyer des données à votre sujet Lite Cette classe met en œuvre org.apache.kafka.clients.Producer, la même interface que KafkaProducer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic.
    String topicId = "your-topic-id";
    // Using the project number is required for constructing a Pub/Sub Lite
    // topic path that the Kafka producer can use.
    long projectNumber = Long.parseLong("123456789");

    producerExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void producerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws InterruptedException, ExecutionException {
    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    ProducerSettings producerSettings =
        ProducerSettings.newBuilder().setTopicPath(topicPath).build();

    List<Future<RecordMetadata>> futures = new ArrayList<>();
    try (Producer<byte[], byte[]> producer = producerSettings.instantiate()) {
      for (long i = 0L; i < 10L; i++) {
        String key = "demo";
        Future<RecordMetadata> future =
            producer.send(
                new ProducerRecord(
                    topicPath.toString(), key.getBytes(), ("message-" + i).getBytes()));
        futures.add(future);
      }
      for (Future<RecordMetadata> future : futures) {
        RecordMetadata meta = future.get();
        System.out.println(meta.offset());
      }
    }
    System.out.printf("Published 10 messages to %s%n", topicPath.toString());
  }
}

Recevoir des messages de Pub/Sub Lite

Créez un consommateur Pub/Sub Lite pour recevoir des données de votre abonnement Lite. Cette classe met en œuvre org.apache.kafka.clients.consumer.Consumer, la même interface que KafkaConsumer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class ConsumerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic and subscription.
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    // Using the project number here is required for constructing a Pub/Sub Lite
    // topic path that the Kafka consumer can use.
    long projectNumber = Long.parseLong("123456789");

    consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
  }

  public static void consumerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {

    CloudZone location = CloudZone.of(CloudRegion.of(cloudRegion), zoneId);

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    SubscriptionPath subscription =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(50 * 1024 * 1024L)
            // 10,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(10000L)
            .build();

    ConsumerSettings settings =
        ConsumerSettings.newBuilder()
            .setSubscriptionPath(subscription)
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .setAutocommit(true)
            .build();

    Set<ConsumerRecord<byte[], byte[]>> hashSet = new HashSet<>();
    try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
      // The consumer can only subscribe to the topic that it is associated to.
      // If this is the only subscriber for this subscription, it will take up
      // to 90s for the subscriber to warm up.
      consumer.subscribe(Arrays.asList(topicPath.toString()));
      while (true) {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          long offset = record.offset();
          String value = Base64.getEncoder().encodeToString(record.value());
          hashSet.add(record);
          System.out.printf("Received %s: %s%n", offset, value);
        }
        // Early exit. Remove entirely to keep the consumer alive indefinitely.
        if (hashSet.size() >= 10) {
          System.out.println("Received 10 messages.");
          break;
        }
      }
    }
  }
}

Effectuer un nettoyage

Pour éviter que les ressources utilisées sur cette page ne soient facturées sur votre compte Google Cloud, supprimez le projet Cloud contenant les ressources.

  1. Supprimez le sujet et l'abonnement Lite.

    gcloud pubsub lite-topics delete LITE_TOPIC_NAME
    gcloud pubsub lite-subscriptions delete LITE_SUBSCRIPTION_NAME
    

  2. Supprimez le compte de service :
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  3. Facultatif : Révoquez les identifiants d'authentification que vous avez créés et supprimez le fichier d'identifiants local.

    gcloud auth application-default revoke
  4. Facultatif : Révoquez les identifiants de la CLI gcloud.

    gcloud auth revoke

Étapes suivantes