Nachrichten von Lite-Abos empfangen

Auf dieser Seite wird erläutert, wie Sie Nachrichten von Lite-Abos erhalten. Sie können Nachrichten mit der Pub/Sub Lite-Clientbibliothek für Java empfangen.

Lite-Abonnements verbinden Lite-Themen mit Abonnentenanwendungen; Abonnenten erhalten Nachrichten von Lite-Abonnements. Abonnenten erhalten alle Nachrichten, die Publisher-Anwendungen an das Lite-Thema senden, einschließlich der Nachrichten, die Publisher vor dem Erstellen des Lite-Abonnements senden.

Bevor Sie Nachrichten von einem Lite-Abo erhalten, erstellen Sie ein Lite-Thema, erstellen Sie ein Lite-Abo für das Lite-Thema und veröffentlichen Sie Nachrichten für das Lite-Thema.

Nachrichten erhalten

Sie können Nachrichten aus dem Lite-Abo anfordern, wenn Sie Nachrichten von einem Lite-Abo erhalten möchten. Die Clientbibliothek stellt automatisch eine Verbindung zu den Partitionen im Lite-Thema her, das mit dem Lite-Abo verbunden ist. Wenn mehrere Abonnentenclients instanziiert sind, werden Nachrichten auf alle Clients verteilt. Die Anzahl der Partitionen im Thema bestimmt die maximale Anzahl von Abonnentenclients, die gleichzeitig eine Verbindung mit einem Abo herstellen können.

Das Initialisieren von Abonnenten und das Empfangen von Nachrichten kann bis zu einer Minute dauern. Nach der Initialisierung werden Nachrichten mit minimaler Latenz empfangen.

Im folgenden Beispiel wird gezeigt, wie Sie Nachrichten von Lite-Abos erhalten:

gcloud

Für diesen Befehl ist Python 3.6 oder höher erforderlich und das grpcio-Python-Paket muss installiert sein. Führen Sie für MacOS-, Linux- und Cloud Shell-Nutzer Folgendes aus:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Verwenden Sie den Befehl gcloud pubsub lite-subscriptions subscribe, um Nachrichten zu erhalten:

gcloud pubsub lite-subscriptions subscribe SUBSCRIPTION_ID \
    --location=LITE_LOCATION \
    --auto-ack

Ersetzen Sie Folgendes:

  • SUBSCRIPTION_ID: die ID des Lite-Abos
  • LITE_LOCATION: der Standort des Lite-Abos

Einfach loslegen (Go)

Folgen Sie der Einrichtungsanleitung für Go in Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
	// published messages when running this sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
	timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
	flag.Parse()

	ctx := context.Background()
	subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)

	// Configure flow control settings. These settings apply per partition.
	// The message stream is paused based on the maximum size or number of
	// messages that the subscriber has already received, whichever condition is
	// met first.
	settings := pscompat.ReceiveSettings{
		// 10 MiB. Must be greater than the allowed size of the largest message
		// (1 MiB).
		MaxOutstandingBytes: 10 * 1024 * 1024,
		// 1,000 outstanding messages. Must be > 0.
		MaxOutstandingMessages: 1000,
	}

	// Create the subscriber client.
	subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
	if err != nil {
		log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
	}

	// Listen for messages until the timeout expires.
	log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
	cctx, cancel := context.WithTimeout(ctx, *timeout)
	defer cancel()
	var receiveCount int32

	// Receive blocks until the context is cancelled or an error occurs.
	if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		// NOTE: May be called concurrently; synchronize access to shared memory.
		atomic.AddInt32(&receiveCount, 1)

		// Metadata decoded from the message ID contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(msg.ID)
		if err != nil {
			log.Fatalf("Failed to parse %q: %v", msg.ID, err)
		}

		fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
		msg.Ack()
	}); err != nil {
		log.Fatalf("SubscriberClient.Receive error: %v", err)
	}

	fmt.Printf("Received %d messages\n", receiveCount)
}

Java

Folgen Sie der Einrichtungsanleitung für Java unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, regional);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId, boolean regional)
      throws ApiException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
          System.out.println("Data : " + message.getData().toStringUtf8());
          System.out.println("Ordering key : " + message.getOrderingKey());
          System.out.println("Attributes : ");
          message
              .getAttributesMap()
              .forEach(
                  (key, value) -> {
                    if (key == MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO) {
                      Timestamp ts = MessageTransforms.decodeAttributeEventTime(value);
                      System.out.println(key + " = " + ts.toString());
                    } else {
                      System.out.println(key + " = " + value);
                    }
                  });

          // Acknowledge the message.
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

Folgen Sie der Einrichtungsanleitung für Python unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    MessageMetadata,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

Die Clientbibliothek stellt für jede der Partitionen im Lite-Thema bidirektionale Streaming-Verbindungen her.

  1. Der Abonnent fordert Verbindungen zu den Partitionen an.

  2. Der Pub/Sub Lite-Dienst stellt die Nachrichten an den Abonnenten zu.

Nachdem der Abonnent die Nachricht verarbeitet hat, muss er die Nachricht bestätigen. Die Clientbibliothek verarbeitet und bestätigt Nachrichten in einem Rückruf asynchron. Konfigurieren Sie die Einstellungen für die Ablaufsteuerung, um die Anzahl der nicht bestätigten Nachrichten zu begrenzen, die der Abonnent im Speicher speichern kann.

Wenn mehrere Abonnenten Nachrichten aus demselben Lite-Abo empfangen, verbindet der Pub/Sub Lite-Dienst jeden Abonnenten mit einem gleich großen Teil der Partitionen. Wenn z. B. zwei Abonnenten dasselbe Lite-Abo verwenden und das Lite-Abo mit einem Lite-Thema mit zwei Partitionen verknüpft ist, empfängt jeder Abonnent Nachrichten von einer der Partitionen.

Nachrichten bestätigen

Um eine Nachricht zu bestätigen, senden Sie eine Bestätigung an das Lite-Abo.

Einfach loslegen (Go)

Verwenden Sie zum Senden einer Bestätigung die Methode Message.Ack().

Java

Verwenden Sie zum Senden einer Bestätigung die Methode AckReplyConsumer.ack().

Python

Verwenden Sie zum Senden einer Bestätigung die Methode Message.ack().

Abonnenten müssen jede Nachricht bestätigen. Abonnenten erhalten zuerst die älteste unbestätigte Nachricht, gefolgt von jeder nachfolgenden Nachricht. Wenn ein Abonnent eine Nachricht überspringt, die nachfolgenden Nachrichten bestätigt und dann die Verbindung wiederhergestellt, erhält der Abonnent die nicht bestätigte Nachricht und jede nachfolgende bestätigte Nachricht.

Bei Lite-Abos gibt es keine Bestätigungsfrist und der Pub/Sub Lite-Dienst sendet unbestätigte Nachrichten nicht über eine offene Streamingverbindung noch einmal.

Ablaufsteuerung verwenden

Nachdem der Pub/Sub Lite-Dienst Nachrichten an Abonnenten zugestellt hat, speichern die Abonnenten nicht bestätigte Nachrichten im Speicher. Sie können die Anzahl der ausstehenden Nachrichten, die Abonnenten im Speicher speichern können, über die Einstellungen für die Ablaufsteuerung beschränken. Die Einstellungen für die Ablaufsteuerung gelten für jede Partition, von der ein Abonnent Nachrichten empfängt.

Sie können die folgenden Einstellungen für die Ablaufsteuerung konfigurieren:

  • Größe der ausstehenden Nachrichten: Die maximale Größe der ausstehenden Nachrichten in Byte. Die maximale Größe muss größer als die Größe der größten Nachricht sein.
  • Zahl der Nachrichten Die maximale Anzahl der ausstehenden Nachrichten.

Die Größe einer Nachricht wird im Feld size_bytes angegeben. Sie können die Ablaufsteuerungseinstellungen mit der Clientbibliothek konfigurieren.

Einfach loslegen (Go)

Zum Konfigurieren der Einstellungen für die Ablaufsteuerung übergeben Sie beim Aufrufen von pscompat.NewSubscriberClientWithSettings ReceiveSettings. In ReceiveSettings können Sie die folgenden Parameter festlegen:

  • MaxOutstandingMessages

  • MaxOutstandingBytes

Ein entsprechendes Beispiel finden Sie in diesem Beispiel für die Ablaufsteuerung.

Java

Verwenden Sie zum Konfigurieren der Einstellungen für die Ablaufsteuerung die folgenden Methoden in der Klasse FlowControlRequest.Builder:

Python

Legen Sie die folgenden Parameter in der Klasse FlowControlSettings fest, um die Einstellungen zur Ablaufsteuerung zu konfigurieren:

  • bytes_outstanding

  • messages_outstanding

Wenn die maximale Anzahl von Nachrichten beispielsweise 100 beträgt und der Abonnent eine Verbindung zu 10 Partitionen herstellt, kann der Abonnent nicht mehr als 100 Nachrichten von jeder der 10 Partitionen empfangen. Die Gesamtzahl der ausstehenden Nachrichten kann größer als 100 sein, der Abonnent kann jedoch nicht mehr als 100 Nachrichten von jeder Partition speichern.