Publica mensajes en temas Lite

En esta página, se explica cómo publicar mensajes en los temas de Lite. Puedes publicar mensajes con la biblioteca cliente de Pub/Sub Lite para Java.

Después de publicar mensajes y crear una suscripción Lite en un tema de Lite, puedes recibir mensajes de la suscripción de Lite.

Formato del mensaje

Un mensaje consiste en campos con los datos y los metadatos del mensaje. Especifica lo siguiente en el mensaje:

La biblioteca cliente asigna automáticamente el mensaje a una partición y el servicio de Pub/Sub Lite agrega los siguientes campos al mensaje:

  • Un ID de mensaje único dentro de la partición
  • Una marca de tiempo para cuando el servicio de Pub/Sub Lite almacena el mensaje en la partición

Publica mensajes

Para publicar mensajes, solicita una conexión de transmisión al tema Lite y, luego, envía mensajes mediante la conexión de transmisión.

En el siguiente ejemplo, se muestra cómo publicar mensajes en un tema Lite:

Java

Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Java en las bibliotecas cliente de Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PublisherExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;

    publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
  }

  // Publish messages to a topic.
  public static void publisherExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder().setTopicPath(topicPath).build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<PublishMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      for (String id : ackIds) {
        // Decoded metadata contains partition and offset.
        metadata.add(PublishMetadata.decode(id));
      }
      System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
        System.out.println("Publisher is shut down.");
      }
    }
  }
}

La biblioteca cliente envía mensajes y maneja errores de forma asincrónica. Si se produce un error, la biblioteca cliente envía el mensaje de nuevo.

  1. El servicio de Pub/Sub Lite cierra la transmisión.
  2. La biblioteca cliente almacena en búfer los mensajes y restablece una conexión con el tema de Lite.
  3. La biblioteca cliente envía los mensajes en orden.

Después de que publicas un mensaje, el servicio de Pub/Sub Lite almacena el mensaje en una partición y le muestra el ID de mensaje al publicador.

Usa claves de ordenamiento

Si los mensajes tienen la misma clave de ordenamiento, la biblioteca cliente asigna los mensajes a la misma partición. La clave de ordenamiento debe ser una string de al menos 1,024 bytes.

La clave de ordenamiento está en el campo key de un mensaje. Puedes configurar claves de ordenamiento con la biblioteca cliente.

Java

Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Java en las bibliotecas cliente de Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.concurrent.ExecutionException;

public class PublishWithOrderingKeyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  // Publish a message to a topic with an ordering key.
  public static void publishWithOrderingKeyExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-ordering-key";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Messages of the same ordering key will always get published to the
            // same partition. When OrderingKey is unset, messages can get published
            // to different partitions if more than one partition exists for the topic.
            .setOrderingKey("testing")
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    PublishMetadata metadata = PublishMetadata.decode(ackId);
    System.out.println("Published a message with ordering key:\n" + metadata);
  }
}

Puedes enviar varios mensajes a la misma partición mediante claves de ordenamiento, de modo que los suscriptores reciban los mensajes en orden. La biblioteca cliente puede asignar varias claves de orden a la misma partición.

Usa atributos

Los atributos de mensajes son pares clave-valor con metadatos sobre el mensaje. Los atributos pueden ser strings de texto o bytes.

Los atributos están en el campo attributes de un mensaje. Puedes configurar atributos con la biblioteca cliente.

Java

Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Java en las bibliotecas cliente de Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.concurrent.ExecutionException;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  // Publish messages to a topic with custom attributes.
  public static void publishWithCustomAttributesExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-custom-attributes";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Add two sets of custom attributes to the message.
            .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    PublishMetadata metadata = PublishMetadata.decode(ackId);
    System.out.println("Published a message with custom attributes:\n" + metadata);
  }
}

Los atributos pueden indicar cómo procesar un mensaje. Los suscriptores pueden analizar el campo attributes de un mensaje y procesarlo según sus atributos.

Agrupa los mensajes en lotes

La biblioteca cliente publica mensajes en lotes. Los lotes más grandes usan menos recursos de procesamiento, pero aumentan la latencia. Puedes cambiar el tamaño del lote con la configuración del lote.

En la siguiente tabla, se enumeran los parámetros de configuración de lotes que puedes configurar:

Configuración Descripción Default
Tamaño de la solicitud El tamaño máximo, en bytes, del lote. 3.5 MiB
Cantidad de mensajes La cantidad máxima de mensajes en un lote. 1,000 mensajes
Retraso de publicación La cantidad de tiempo, en milisegundos, entre agregar el mensaje a un lote y enviar el lote al tema Lite. 50 milésimas de segundo

Puedes establecer la configuración de lotes con la biblioteca cliente.

Java

Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Java en las bibliotecas cliente de Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.TopicPaths;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;

public class PublishWithBatchSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;

    publishWithBatchSettingsExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
  }

  // Publish messages to a topic with batch settings.
  public static void publishWithBatchSettingsExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
      throws StatusException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPaths.newBuilder()
            .setProjectNumber(ProjectNumber.of(projectNumber))
            .setZone(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setTopicName(TopicName.of(topicId))
            .build();
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Batch settings control how the publisher batches messages
      long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
      long messageCountBatchSize = 100L; // default : 1000L message
      Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms

      // Publish request get triggered based on request size, messages count & time since last
      // publish, whichever condition is met first.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder()
              .setRequestByteThreshold(requestBytesThreshold)
              .setElementCountThreshold(messageCountBatchSize)
              .setDelayThreshold(publishDelayThreshold)
              .build();

      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder()
              .setTopicPath(topicPath)
              .setBatchingSettings(batchingSettings)
              .build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<PublishMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      System.out.println("Published " + ackIds.size() + " messages with batch settings.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
      }
    }
  }
}

Cuando se inicia una aplicación de publicador, la biblioteca cliente crea un lote para cada partición en un tema de Lite. Por ejemplo, si un tema Lite tiene dos particiones, los publicadores crean dos lotes y envían cada lote a una partición.

Después de publicar un mensaje, la biblioteca cliente lo almacena en búfer hasta que el lote supera el tamaño máximo de solicitud, la cantidad máxima de mensajes o el retraso de publicación.

Ordena mensajes

Los temas Lite ordenan los mensajes de cada partición según los mensajes que publicas. Para asignar mensajes a la misma partición, usa una clave de ordenamiento.

Pub/Sub Lite entrega los mensajes de una partición en orden y los suscriptores pueden procesarlos en orden. Para obtener más información, consulta Recibe mensajes.