Recevoir des messages provenant d'abonnements Lite

Cette page explique comment recevoir des messages à partir d'abonnements Lite. Vous pouvez recevoir des messages avec la bibliothèque cliente Pub/Sub Lite pour Java.

Les abonnements Lite connectent des sujets Lite aux applications d'abonnés. Les abonnés reçoivent les messages provenant des abonnements Lite. Les abonnés reçoivent chaque message envoyé au sujet Lite par les applications d'éditeur, y compris les messages envoyés par les éditeurs avant la création de l'abonnement Lite.

Avant de recevoir des messages d'un abonnement Lite, créez un sujet Lite, créez un abonnement Lite au sujet Lite et publiez des messages dans le sujet Lite.

Recevoir des messages

Pour recevoir des messages d'un abonnement Lite, demandez les messages à l'abonnement Lite. La bibliothèque cliente se connecte automatiquement aux partitions du sujet Lite associé à l'abonnement Lite. Si plusieurs clients clients sont instanciés, les messages sont répartis entre tous les clients. Le nombre de partitions dans le sujet détermine le nombre maximal de clients abonnés qui peuvent se connecter simultanément à un abonnement.

L'exemple suivant montre comment recevoir des messages provenant d'abonnements Lite :

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
	// published messages when running this sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
	timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
	flag.Parse()

	ctx := context.Background()
	subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)

	// Configure flow control settings. These settings apply per partition.
	// The message stream is paused based on the maximum size or number of
	// messages that the subscriber has already received, whichever condition is
	// met first.
	settings := pscompat.ReceiveSettings{
		// 10 MiB. Must be greater than the allowed size of the largest message
		// (1 MiB).
		MaxOutstandingBytes: 10 * 1024 * 1024,
		// 1,000 outstanding messages. Must be > 0.
		MaxOutstandingMessages: 1000,
	}

	// Create the subscriber client.
	subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
	if err != nil {
		log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
	}

	// Listen for messages until the timeout expires.
	log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
	cctx, cancel := context.WithTimeout(ctx, *timeout)
	defer cancel()
	var receiveCount int32

	// Receive blocks until the context is cancelled or an error occurs.
	if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		// NOTE: May be called concurrently; synchronize access to shared memory.
		atomic.AddInt32(&receiveCount, 1)

		// Metadata decoded from the message ID contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(msg.ID)
		if err != nil {
			log.Fatalf("Failed to parse %q: %v", msg.ID, err)
		}

		fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
		msg.Ack()
	}); err != nil {
		log.Fatalf("SubscriberClient.Receive error: %v", err)
	}

	fmt.Printf("Received %d messages\n", receiveCount)
}

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java dans la section Bibliothèques clientes de Pub/Sub Lite.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId)
      throws ApiException {

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
          System.out.println("Data : " + message.getData().toStringUtf8());
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

Avant d'exécuter cet exemple, suivez les instructions de configuration de Python dans la section Bibliothèques clientes de Pub/Sub Lite.

from concurrent.futures._base import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90

location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.")
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

La bibliothèque cliente établit des connexions bidirectionnelles en streaming avec chacune des partitions du sujet Lite.

  1. L'abonné demande des connexions aux partitions.

  2. Le service Pub/Sub Lite envoie les messages à l'abonné.

Une fois que l'abonné a traité le message, il doit confirmer le message. La bibliothèque cliente traite et confirme les messages de manière asynchrone dans un rappel. Pour limiter le nombre de messages non confirmés que l'abonné peut stocker en mémoire, configurez les paramètres de contrôle de flux.

Si plusieurs abonnés reçoivent des messages en provenance du même abonnement Lite, le service Pub/Sub Lite connecte chaque abonné à une proportion égale de partitions. Par exemple, si deux abonnés utilisent le même abonnement Lite et que celui-ci est associé à un sujet Lite ayant deux partitions, chaque abonné reçoit les messages en provenance de l'une des partitions.

Accuser réception des messages

Pour confirmer un message, envoyez un acccusé de réception à l'abonnement Lite.

Java

Pour envoyer un accusé de réception, utilisez la méthode AckReplyConsumer.ack().

Python

Pour envoyer un accusé de réception, utilisez la méthode Message.ack().

Les abonnés doivent accuser réception de chaque message. Les abonnés reçoivent d'abord le message non confirmé le plus ancien, suivi de chaque message suivant. Si un abonné ignore un message, accuse réception des messages suivants et se reconnecte, l'abonné reçoit le message non confirmé et tous les messages ultérieurs confirmés.

Les abonnements Lite n'ont pas de délai d'accusé de réception et le service Pub/Sub Lite ne renvoie pas les messages non confirmés via une connexion en streaming ouverte.

Utiliser le contrôle de flux

Une fois que le service Pub/Sub Lite a distribué des messages aux abonnés, ceux-ci stockent en mémoire les messages non confirmés. Vous pouvez limiter le nombre de messages en attente que les abonnés peuvent stocker en mémoire à l'aide des paramètres de contrôle de flux. Les paramètres de contrôle de flux s'appliquent à chaque partition à partir de laquelle un abonné reçoit des messages.

Vous pouvez configurer les paramètres de contrôle de flux suivants :

  • La taille des messages en attente. Taille maximale des messages en attente, en octets. La taille maximale doit être supérieure à la taille du message le plus volumineux.
  • Le nombre de messages. Nombre maximal de messages en attente.

La taille d'un message est visible dans le champ size_bytes. Vous pouvez configurer les paramètres de contrôle de flux à l'aide de la bibliothèque cliente.

Java

Pour configurer les paramètres de contrôle de flux, utilisez les méthodes suivantes de la classe FlowControlRequest.Builder :

Python

Pour configurer les paramètres de contrôle de flux, définissez les paramètres suivants dans la classe FlowControlSettings :

  • bytes_outstanding

  • messages_outstanding

Par exemple, si le nombre maximal de messages est de 100 et que l'abonné se connecte à 10 partitions, il ne peut pas recevoir plus de 100 messages depuis l'une des 10 partitions. Le nombre total de messages en attente peut être supérieur à 100, mais l'abonné ne peut pas stocker plus de 100 messages de chaque partition.