Publier des messages dans des sujets Lite

Cette page explique comment publier des messages sur des sujets Lite. Vous pouvez publier des messages avec la bibliothèque cliente Pub/Sub Lite pour Java.

Après avoir publié des messages et créé un abonnement Lite à un sujet Lite, vous pouvez recevoir des messages à partir de l'abonnement Lite.

Format du message

Un message est constitué de champs contenant des données du message et des métadonnées. Spécifiez au moins l'un des éléments suivants dans le message :

La bibliothèque cliente attribue automatiquement le message à une partition, et le service Pub/Sub Lite ajoute les champs suivants au message :

  • ID de message unique dans la partition
  • Horodatage correspondant au moment où le service Pub/Sub Lite stocke le message dans la partition

Publier des messages

Pour publier des messages, demandez une connexion en streaming au sujet Lite, puis envoyez des messages via la connexion en streaming.

L'exemple suivant montre comment publier des messages dans un sujet Lite :

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java dans la section Bibliothèques clientes de Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PublisherExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;

    publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
  }

  // Publish messages to a topic.
  public static void publisherExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder().setTopicPath(topicPath).build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<PublishMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      for (String id : ackIds) {
        // Decoded metadata contains partition and offset.
        metadata.add(PublishMetadata.decode(id));
      }
      System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
        System.out.println("Publisher is shut down.");
      }
    }
  }
}

La bibliothèque cliente envoie des messages de manière asynchrone et gère les erreurs. Si une erreur se produit, la bibliothèque cliente envoie à nouveau le message.

  1. Le service Pub/Sub Lite ferme le flux.
  2. La bibliothèque cliente met les messages en mémoire tampon et rétablit une connexion au sujet Lite.
  3. La bibliothèque cliente envoie les messages dans l'ordre.

Après la publication d'un message, le service Pub/Sub Lite stocke le message dans une partition et renvoie l'ID du message à l'éditeur.

Utiliser des clés de tri

Si les messages ont la même clé de tri, la bibliothèque cliente les attribue à la même partition. La clé de tri doit être une chaîne de 1 024 octets maximum.

La clé de tri se trouve dans le champ key d'un message. Vous pouvez définir des clés de tri avec la bibliothèque cliente.

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java dans la section Bibliothèques clientes de Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.concurrent.ExecutionException;

public class PublishWithOrderingKeyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  // Publish a message to a topic with an ordering key.
  public static void publishWithOrderingKeyExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-ordering-key";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Messages of the same ordering key will always get published to the
            // same partition. When OrderingKey is unset, messages can get published
            // to different partitions if more than one partition exists for the topic.
            .setOrderingKey("testing")
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    PublishMetadata metadata = PublishMetadata.decode(ackId);
    System.out.println("Published a message with ordering key:\n" + metadata);
  }
}

Vous pouvez envoyer plusieurs messages à la même partition à l'aide de clés de tri, afin que les abonnés reçoivent les messages dans l'ordre. La bibliothèque cliente peut attribuer plusieurs clés de tri à la même partition.

Utiliser des attributs

Les attributs de message sont des paires clé/valeur avec des métadonnées associées au message. Les attributs peuvent être des chaînes de texte ou d'octets.

Les attributs se trouvent dans le champ attributes d'un message. Vous pouvez définir des attributs avec la bibliothèque cliente.

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java dans la section Bibliothèques clientes de Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.concurrent.ExecutionException;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  // Publish messages to a topic with custom attributes.
  public static void publishWithCustomAttributesExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-custom-attributes";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Add two sets of custom attributes to the message.
            .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    PublishMetadata metadata = PublishMetadata.decode(ackId);
    System.out.println("Published a message with custom attributes:\n" + metadata);
  }
}

Les attributs peuvent indiquer comment traiter un message. Les abonnés peuvent analyser le champ attributes d'un message et le traiter en fonction de ses attributs.

Messages par lot

La bibliothèque cliente publie des messages par lot. Les lots plus volumineux utilisent moins de ressources de calcul, mais augmentent la latence. Vous pouvez modifier la taille du lot à l'aide des paramètres de traitement par lot.

Le tableau suivant répertorie les paramètres de traitement par lot que vous pouvez configurer :

Paramètre Description Par défaut
Taille d'une requête Taille maximale du lot, exprimée en octets. 3,5 Mio
Le nombre de messages Nombre maximal de messages dans un lot. 1 000 messages
Délai de publication Durée, en millisecondes, entre l'ajout du message à un lot et l'envoi du lot au sujet Lite. 50 millisecondes

Vous pouvez configurer les paramètres de traitement par lot avec la bibliothèque cliente.

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java dans la section Bibliothèques clientes de Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;

public class PublishWithBatchSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;

    publishWithBatchSettingsExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
  }

  // Publish messages to a topic with batch settings.
  public static void publishWithBatchSettingsExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Batch settings control how the publisher batches messages
      long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
      long messageCountBatchSize = 100L; // default : 1000L message
      Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms

      // Publish request get triggered based on request size, messages count & time since last
      // publish, whichever condition is met first.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder()
              .setRequestByteThreshold(requestBytesThreshold)
              .setElementCountThreshold(messageCountBatchSize)
              .setDelayThreshold(publishDelayThreshold)
              .build();

      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder()
              .setTopicPath(topicPath)
              .setBatchingSettings(batchingSettings)
              .build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<PublishMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      System.out.println("Published " + ackIds.size() + " messages with batch settings.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
      }
    }
  }
}

Lorsqu'une application d'éditeur démarre, la bibliothèque cliente crée un lot pour chaque partition d'un sujet Lite. Par exemple, si un sujet Lite comporte deux partitions, les éditeurs créent deux lots et envoient chaque lot à une partition.

Après la publication d'un message, la bibliothèque cliente la met en mémoire tampon jusqu'à ce que le lot dépasse la taille maximale de la requête, le nombre maximal de messages ou le délai de publication.

Trier des messages

Les sujets Lite trient les messages dans chaque partition en fonction de leur date de publication. Pour attribuer des messages à la même partition, utilisez une clé de tri.

Pub/Sub Lite distribue les messages d'une partition dans l'ordre, et les abonnés peuvent les traiter dans l'ordre. Pour en savoir plus, consultez la page Recevoir des messages.