Nachrichten in Lite-Themen veröffentlichen

Auf dieser Seite wird erläutert, wie Sie Nachrichten zu Lite-Themen veröffentlichen. Sie können Nachrichten mit der Pub/Sub Lite-Clientbibliothek für Java veröffentlichen.

Nach dem Veröffentlichen von Nachrichten und Erstellen eines Lite-Abos für ein Lite-Thema können Sie Nachrichten vom Lite-Abo empfangen.

Nachrichtenformat

Eine Nachricht besteht aus Feldern mit den Nachrichtendaten und Metadaten. Geben Sie in der Nachricht Folgendes an:

Die Clientbibliothek weist die Nachricht automatisch einer Partition zu. Der Pub/Sub Lite-Dienst fügt der Nachricht die folgenden Felder hinzu:

  • Eine innerhalb der Partition eindeutige Nachrichten-ID
  • Ein Zeitstempel für den Zeitpunkt, zu dem der Pub/Sub Lite-Dienst die Nachricht in der Partition speichert

Nachrichten veröffentlichen

Wenn Sie Nachrichten veröffentlichen möchten, fordern Sie eine Streamingverbindung an das Lite-Thema an und senden Sie dann Nachrichten über die Streamingverbindung.

Das folgende Beispiel zeigt, wie Sie Nachrichten in einem Lite-Thema veröffentlichen:

Java

Bevor Sie dieses Beispiel ausführen, folgen Sie den Schritten zur Einrichtung von Java in Pub/Sub Lite-Clientbibliotheken.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PublisherExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;

    publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
  }

  // Publish messages to a topic.
  public static void publisherExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
      throws ApiException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setName(TopicName.of(topicId))
            .build();
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder().setTopicPath(topicPath).build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<PublishMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      for (String id : ackIds) {
        // Decoded metadata contains partition and offset.
        metadata.add(PublishMetadata.decode(id));
      }
      System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
        System.out.println("Publisher is shut down.");
      }
    }
  }
}

Die Clientbibliothek sendet asynchron Nachrichten und verarbeitet Fehler. Wenn ein Fehler auftritt, sendet die Clientbibliothek die Nachricht erneut.

  1. Der Pub/Sub Lite-Dienst schließt den Stream.
  2. Die Clientbibliothek puffert die Nachrichten und stellt eine Verbindung zum Lite-Thema wieder her.
  3. Die Clientbibliothek sendet die Nachrichten der Reihe nach.

Nachdem Sie eine Nachricht veröffentlicht haben, speichert der Pub/Sub Lite-Dienst die Nachricht in einer Partition und gibt die Nachrichten-ID an den Publisher zurück.

Sortierschlüssel verwenden

Wenn Nachrichten denselben Reihenfolgeschlüssel haben, weist die Clientbibliothek die Nachrichten derselben Partition zu. Der Sortierschlüssel darf maximal 1.024 Byte lang sein.

Der Sortierschlüssel befindet sich im Feld key einer Nachricht. Sie können Sortierschlüssel mit der Clientbibliothek festlegen.

Java

Bevor Sie dieses Beispiel ausführen, folgen Sie den Schritten zur Einrichtung von Java in Pub/Sub Lite-Clientbibliotheken.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;

public class PublishWithOrderingKeyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  // Publish a message to a topic with an ordering key.
  public static void publishWithOrderingKeyExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws ApiException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-ordering-key";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Messages of the same ordering key will always get published to the
            // same partition. When OrderingKey is unset, messages can get published
            // to different partitions if more than one partition exists for the topic.
            .setOrderingKey("testing")
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    PublishMetadata metadata = PublishMetadata.decode(ackId);
    System.out.println("Published a message with ordering key:\n" + metadata);
  }
}

Mit Reihenfolgeschlüsseln können Sie mehrere Nachrichten an dieselbe Partition senden, sodass Abonnenten die Nachrichten in der Reihenfolge empfangen. Die Clientbibliothek kann einer gleichen Partition mehrere Reihenfolgeschlüssel zuweisen.

Attribute verwenden

Nachrichtenattribute sind Schlüssel/Wert-Paare mit Metadaten zur Nachricht. Die Attribute können Text- oder Byte-Strings sein.

Die Attribute befinden sich im Feld attributes einer Nachricht. Mit der Clientbibliothek können Sie Attribute festlegen.

Java

Bevor Sie dieses Beispiel ausführen, folgen Sie den Schritten zur Einrichtung von Java in Pub/Sub Lite-Clientbibliotheken.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  // Publish messages to a topic with custom attributes.
  public static void publishWithCustomAttributesExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws ApiException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-custom-attributes";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Add two sets of custom attributes to the message.
            .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    PublishMetadata metadata = PublishMetadata.decode(ackId);
    System.out.println("Published a message with custom attributes:\n" + metadata);
  }
}

Attribute können angeben, wie eine Nachricht verarbeitet werden soll. Abonnenten können das Feld attributes einer Nachricht parsen und die Nachricht entsprechend ihren Attributen verarbeiten.

Nachrichten im Batch verarbeiten

Die Clientbibliothek veröffentlicht Nachrichten im Batch. Größere Batches benötigen weniger Rechenressourcen, erhöhen aber die Latenz. Sie können die Batchgröße mit den Batcheinstellungen ändern.

In der folgenden Tabelle sind die Batch-Einstellungen aufgeführt, die Sie konfigurieren können:

Einstellung Beschreibung Default
Größe der Anfrage Die maximale Größe des Batches in Byte. 3,5 MiB
Zahl der Nachrichten Die maximale Anzahl von Nachrichten in einem Batch. 1.000 Nachrichten
Verzögerung bei der Veröffentlichung Die Zeit in Millisekunden zwischen dem Hinzufügen der Nachricht zu einem Batch und dem Senden des Batches an das Lite-Thema. 50 Millisekunden

Sie können Batch-Einstellungen mit der Clientbibliothek konfigurieren.

Java

Bevor Sie dieses Beispiel ausführen, folgen Sie den Schritten zur Einrichtung von Java in Pub/Sub Lite-Clientbibliotheken.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;

public class PublishWithBatchSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;

    publishWithBatchSettingsExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
  }

  // Publish messages to a topic with batch settings.
  public static void publishWithBatchSettingsExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
      throws ApiException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setName(TopicName.of(topicId))
            .build();
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Batch settings control how the publisher batches messages
      long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
      long messageCountBatchSize = 100L; // default : 1000L message
      Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms

      // Publish request get triggered based on request size, messages count & time since last
      // publish, whichever condition is met first.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder()
              .setRequestByteThreshold(requestBytesThreshold)
              .setElementCountThreshold(messageCountBatchSize)
              .setDelayThreshold(publishDelayThreshold)
              .build();

      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder()
              .setTopicPath(topicPath)
              .setBatchingSettings(batchingSettings)
              .build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<PublishMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      System.out.println("Published " + ackIds.size() + " messages with batch settings.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
      }
    }
  }
}

Wenn eine Publisher-Anwendung gestartet wird, erstellt die Client-Bibliothek einen Batch für jede Partition in einem Lite-Thema. Wenn ein Lite-Thema beispielsweise zwei Partitionen hat, erstellen Publisher zwei Batches und senden jeden Batch an eine Partition.

Nachdem Sie eine Nachricht veröffentlicht haben, wird sie von der Clientbibliothek gepuffert, bis der Batch die maximale Anforderungsgröße, die maximale Anzahl von Nachrichten oder die Veröffentlichungsverzögerung überschreitet.

Nachrichten in eine Reihenfolge bringen

Lite-Themen sortieren Nachrichten in jeder Partition, indem Sie die Nachrichten veröffentlichen. Verwenden Sie einen Reihenfolgeschlüssel, um Nachrichten derselben Partition zuzuweisen.

Pub/Sub Lite liefert die Nachrichten aus einer Partition nacheinander und Abonnenten können die Nachrichten der Reihe nach verarbeiten. Weitere Informationen finden Sie unter Nachrichten empfangen.