Nachrichten von Lite-Abos empfangen

Auf dieser Seite wird erläutert, wie Sie Nachrichten von Lite-Abos erhalten. Sie können Nachrichten mit der Pub/Sub Lite-Clientbibliothek für Java empfangen.

Lite-Abonnements verbinden Lite-Themen mit Abonnentenanwendungen; Abonnenten erhalten Nachrichten von Lite-Abonnements. Abonnenten erhalten alle Nachrichten, die Publisher-Anwendungen an das Lite-Thema senden, einschließlich der Nachrichten, die Publisher vor dem Erstellen des Lite-Abonnements senden.

Bevor Sie Nachrichten von einem Lite-Abo erhalten, erstellen Sie ein Lite-Thema, erstellen Sie ein Lite-Abo für das Lite-Thema und veröffentlichen Sie Nachrichten für das Lite-Thema.

Nachrichten erhalten

Sie können Nachrichten aus dem Lite-Abo anfordern, wenn Sie Nachrichten von einem Lite-Abo erhalten möchten. Die Clientbibliothek stellt automatisch eine Verbindung zu den Partitionen im Lite-Thema her, das mit dem Lite-Abo verbunden ist.

Im folgenden Beispiel wird gezeigt, wie Sie Nachrichten von Lite-Abos erhalten:

Java

Bevor Sie dieses Beispiel ausführen, folgen Sie den Schritten zur Einrichtung von Java in Pub/Sub Lite-Clientbibliotheken.

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId)
      throws StatusException {

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + message.getMessageId());
          System.out.println("Data : " + message.getData().toStringUtf8());
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Die Clientbibliothek stellt für jede der Partitionen im Lite-Thema bidirektionale Streaming-Verbindungen her.

  1. Der Abonnent fordert Verbindungen zu den Partitionen an.

  2. Der Pub/Sub Lite-Dienst stellt die Nachrichten an den Abonnenten zu.

Nachdem der Abonnent die Nachricht verarbeitet hat, muss er die Nachricht bestätigen. Die Clientbibliothek verarbeitet und bestätigt Nachrichten in einem Rückruf asynchron. Konfigurieren Sie die Einstellungen für die Ablaufsteuerung, um die Anzahl der nicht bestätigten Nachrichten zu begrenzen, die der Abonnent im Speicher speichern kann.

Wenn mehrere Abonnenten Nachrichten aus demselben Lite-Abo empfangen, verbindet der Pub/Sub Lite-Dienst jeden Abonnenten mit einem gleich großen Teil der Partitionen. Wenn z. B. zwei Abonnenten dasselbe Lite-Abo verwenden und das Lite-Abo mit einem Lite-Thema mit zwei Partitionen verknüpft ist, empfängt jeder Abonnent Nachrichten von einer der Partitionen.

Nachrichten bestätigen

Um eine Nachricht zu bestätigen, senden Sie eine Bestätigung an das Lite-Abo.

Java

Verwenden Sie zum Senden einer Bestätigung die Methode AckReplyConsumer.ack().

Jede Nachricht bestätigen Abonnenten erhalten zuerst die älteste unbestätigte Nachricht, gefolgt von jeder nachfolgenden Nachricht. Wenn ein Abonnent eine Nachricht überspringt, die nachfolgenden Nachrichten bestätigt und dann die Verbindung wiederhergestellt, erhält der Abonnent die nicht bestätigte Nachricht und jede nachfolgende bestätigte Nachricht.

Bei Lite-Abos gibt es keine Bestätigungsfrist und der Pub/Sub Lite-Dienst sendet unbestätigte Nachrichten nicht über eine offene Streamingverbindung noch einmal.

Ablaufsteuerung verwenden

Nachdem der Pub/Sub Lite-Dienst Nachrichten an Abonnenten zugestellt hat, speichern die Abonnenten nicht bestätigte Nachrichten im Speicher. Sie können die Anzahl der ausstehenden Nachrichten, die Abonnenten im Speicher speichern können, über die Einstellungen für die Ablaufsteuerung beschränken. Die Einstellungen für die Ablaufsteuerung gelten für jede Partition, von der ein Abonnent Nachrichten empfängt.

Sie können die folgenden Einstellungen für die Ablaufsteuerung konfigurieren:

  • Größe der ausstehenden Nachrichten: Die maximale Größe der ausstehenden Nachrichten in Byte. Die maximale Größe muss größer als die Größe der größten Nachricht sein.
  • Zahl der Nachrichten Die maximale Anzahl der ausstehenden Nachrichten.

Die Größe einer Nachricht wird im Feld size_bytes angegeben. Sie können die Ablaufsteuerungseinstellungen mit der Clientbibliothek konfigurieren.

Java

Verwenden Sie zum Konfigurieren der Einstellungen für die Ablaufsteuerung die folgenden Methoden in der Klasse FlowControlRequest.Builder:

Wenn die maximale Anzahl von Nachrichten beispielsweise 100 beträgt und der Abonnent eine Verbindung zu 10 Partitionen herstellt, kann der Abonnent nicht mehr als 100 Nachrichten von jeder der 10 Partitionen empfangen. Die Gesamtzahl der ausstehenden Nachrichten kann größer als 100 sein, der Abonnent kann jedoch nicht mehr als 100 Nachrichten von jeder Partition speichern.