Recevoir des messages en mode pull

Cloud Pub/Sub est compatible avec la distribution de messages en mode push et pull. Pour obtenir une présentation et une comparaison des abonnements en mode pull et push, consultez la page Présentation des abonnements. Le présent document décrit la distribution des messages en mode pull. Pour plus d'informations sur la distribution en mode push, consultez le Guide pour les abonnés – Mode push.

Mode pull asynchrone

L'utilisation du mode pull asynchrone augmente le débit de votre application, car celle-ci ne se bloque pas en cas de nouveaux messages. Les messages peuvent être reçus dans votre application à l'aide d'un programme d'écoute de messages à exécution longue et faire l'objet d'une confirmation individuelle, comme indiqué dans l'exemple ci-dessous. Les clients Java, Python, .NET, Go et Ruby utilisent l'API du service streamingPull pour implémenter efficacement l'API cliente asynchrone.

Toutes les bibliothèques clientes ne sont pas compatibles avec le mode pull asynchrone pour les messages. Pour en savoir plus sur le mode pull synchrone, consultez la section Mode pull synchrone.

Pour plus d'informations, consultez la documentation de référence sur les API pour votre langage de programmation.

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName);
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Go.

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subName)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	msg.Ack()
	fmt.Printf("Got message: %q\n", string(msg.Data))
	mu.Lock()
	defer mu.Unlock()
	received++
	if received == 10 {
		cancel()
	}
})
if err != nil {
	return err
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Java.

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

ProjectSubscriptionName subscriptionName =
    ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Node.js.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Python.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Traiter les attributs personnalisés

Cet exemple montre comment extraire des messages de manière asynchrone et récupérer les attributs personnalisés à partir des métadonnées :

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Python.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Écouter les erreurs

Cet exemple montre comment gérer les erreurs survenant lors de l'abonnement à des messages :

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Go.

// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err := client.Subscription(subName).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Go.

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Node.js.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function(message) {
  // Do something with the message
  console.log(`Message: ${message}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Create an event handler to handle errors
const errorHandler = function(error) {
  // Do something with the error
  console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);

setTimeout(() => {
  subscription.removeListener(`message`, messageHandler);
  subscription.removeListener(`error`, errorHandler);
}, timeout * 1000);

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Python.

from google.cloud import pubsub_v1

# TODO project           = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pubsub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
try:
    # When timeout is unspecified, the result method waits indefinitely.
    future.result(timeout=30)
except Exception as e:
    print(
        'Listening for messages on {} threw an Exception: {}.'.format(
            subscription_name, e))

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Go.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => ex
  puts "Exception #{ex.inspect}: #{ex.message}"
  raise "Stopped listening for messages."
end

Contrôler le flux de messages

Votre client abonné peut traiter et accuser réception des messages plus lentement que Cloud Pub/Sub ne les envoie au client. Dans ce cas :

  • Il se peut qu'un client ait de nombreux messages en attente, car il n'est pas en mesure de traiter le volume de messages entrants, mais qu'un autre client du réseau ait la capacité de le faire. Le deuxième client pourrait réduire le nombre de messages en attente de l'abonnement, mais il n'a pas la possibilité de le faire, car le premier client conserve un bail pour les messages qu'il reçoit. Cela réduit le taux global de traitement, car les messages restent bloqués au niveau du premier client.

  • Comme la bibliothèque cliente prolonge de manière répétée le délai de confirmation pour les messages en attente, ces messages continuent d'utiliser de la mémoire, du processeur et de la bande passante. En tant que tel, le client abonné risque de manquer de ressources (telles que la mémoire). Cela peut avoir un impact négatif sur le débit et la latence du traitement des messages.

Pour atténuer les problèmes ci-dessus, utilisez les fonctions de contrôle de flux de l'abonné pour contrôler le débit de réception des messages. Ces fonctionnalités sont illustrées dans les exemples suivants :

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName,
    settings: new SubscriberClient.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        AckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandingByteCount: 10240)
    });
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Go.

sub := client.Subscription(subName)
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Java.

FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder()
        .setMaxOutstandingElementCount(10_000L)
        .setMaxOutstandingRequestBytes(1_000_000_000L)
        .build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Node.js.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const subscriptionName = 'my-sub';
// const maxInProgress = 5;
// const maxBytes = 10000;

const topic = pubsub.topic(topicName);

const options = {
  flowControl: {
    maxBytes: maxBytes,
    maxMessages: maxInProgress,
  },
};

const subscription = topic.subscription(subscriptionName, options);

// Creates a new subscription
// Note that flow control configurations are not persistent
const [newSubscription] = await subscription.get({
  autoCreate: true,
});
console.log(
  `Subscription ${
    newSubscription.name
  } created with a maximum of ${maxInProgress} unprocessed messages.`
);

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Python.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Plus généralement, la nécessité de contrôler le flux indique que les messages sont publiés plus rapidement qu'ils ne sont consultés. S'il s'agit d'un état persistant plutôt que d'une augmentation transitoire du volume de messages, envisagez d'augmenter le nombre d'instances du client abonné.

Contrôle de simultanéité

La simultanéité n'est pas disponible avec tous les langages de programmation. Pour les implémentations de langage prenant en charge les threads parallèles, tels que Java et Go, les bibliothèques clientes font un choix par défaut pour le nombre de threads. Ce choix peut ne pas être optimal pour votre application. Par exemple, si vous constatez que votre application d'abonné ne parvient pas à absorber le volume de messages entrants, mais qu'elle n'est pas liée au processeur, vous devez augmenter le nombre de threads. Pour les opérations de traitement de messages nécessitant une utilisation intensive du processeur, il peut être approprié de réduire le nombre de threads.

L'exemple suivant montre comment contrôler la simultanéité dans un abonné :

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Java.

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

La simultanéité n'est pas disponible avec tous les langages de programmation. Reportez-vous à la documentation de référence des API pour en savoir plus.

StreamingPull

Le service Cloud Pub/Sub dispose de deux API pour récupérer les messages :

Dans la mesure du possible, les bibliothèques clientes Cloud utilisent StreamingPull, un RPC en continu bidirectionnel, afin d'assurer un débit maximal et une latence minimale. Bien que vous ne vous servirez probablement jamais directement de l'API StreamingPull, il est important de comprendre ses propriétés principales et ce qui la distingue de la méthode pull plus traditionnelle.

La méthode pull repose sur un modèle de requête/réponse :

  1. L'application envoie une requête de messages.
  2. Le serveur répond par zéro message ou plus et ferme la connexion.

L'API du service StreamingPull s'appuie sur une connexion bidirectionnelle persistante pour recevoir plusieurs messages dès qu'ils sont disponibles, envoyer les confirmations et modifier les délais de ces dernières :

  1. Le client envoie une requête au service pour établir une connexion.
  2. Le client utilise cette connexion pour échanger les données de message.
  3. Le client ou le serveur met fin à la requête (c'est pourquoi on parle de connexion bidirectionnelle).

StreamingPull a un taux d'erreur de 100 % (systématiquement)

Les flux StreamingPull se terminent toujours par un code d'erreur pouvant faire l'objet d'une nouvelle tentative. Notez que, contrairement aux RPC classiques, l'erreur est simplement une indication qu'un flux a été interrompu, et non que les requêtes échouent. Par conséquent, bien que l'API StreamingPull ait un taux d'erreur de 100 % qui peut sembler surprenant, cela tient à sa conception. Pour diagnostiquer les erreurs StreamingPull, nous vous recommandons de vous concentrer sur les métriques d'opérations de messages StreamingPull, plutôt que sur les métriques de requêtes StreamingPull.

StreamingPull : traiter un volume important de petits messages en attente

La pile gRPC de StreamingPull est optimisée pour un débit élevé et assure donc la mise en mémoire tampon des messages. Cela peut avoir des conséquences si vous essayez de traiter un volume important de petits messages en attente (plutôt qu'un flux régulier de nouveaux messages). Dans ces conditions, il est possible que les messages soient distribués plusieurs fois et que la charge ne soit pas équilibrée entre les clients.

Le tampon entre le service Cloud Pub/Sub et l'espace utilisateur de la bibliothèque cliente est d'environ 10 Mo. Pour comprendre l'impact de ce tampon sur le comportement de la bibliothèque cliente, prenons l'exemple ci-dessous :

  • 10 000 messages de 1 Ko sont en attente sur un abonnement.
  • Il faut 1 seconde à une instance de client à thread unique pour traiter un message de manière séquentielle.
  • La première instance de client à établir une connexion StreamingPull avec le service pour cet abonnement disposera d'un tampon contenant les 10 000 messages.
  • Il faut 10 000 secondes (presque 3 heures) pour traiter le tampon.
  • Pendant ce temps, certains messages dépassent le délai de confirmation et sont renvoyés au même client, ce qui entraîne des doubles.
  • Lorsque plusieurs instances de client sont en cours d'exécution, les messages bloqués dans le tampon sont disponibles uniquement pour la première instance.

Cette situation ne se produira pas si les messages arrivent à un rythme régulier au lieu d'être distribués en un seul lot volumineux. Le service ne reçoit jamais les 10 Mo de messages en même temps. Il est donc en mesure d'équilibrer la charge de messages efficacement entre plusieurs abonnés.

Pour remédier à cette situation, utilisez un abonnement en mode push ou une API pull, actuellement disponible dans certaines bibliothèques clientes Cloud (consultez la section "Mode pull synchrone") et dans toutes les bibliothèques clientes de l'API. Pour en savoir plus, consultez la documentation sur les bibliothèques clientes.

Mode pull synchrone

Il existe des cas où le mode pull asynchrone n'est pas parfaitement adapté à votre application. Par exemple, la logique de l'application peut s'appuyer sur un modèle d'interrogation pour récupérer des messages ou imposer une limitation précise du nombre de messages récupérés par le client à un moment donné. Pour être compatible avec de telles applications, le service et la plupart des bibliothèques clientes utilisent une méthode pull synchrone.

Vous trouverez ci-dessous un exemple de code permettant d'extraire et de confirmer un nombre fixe de messages :

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient =
    SubscriberServiceApiClient.Create();
// Pull messages from server,
// allowing an immediate response if there are no messages.
PullResponse response = subscriberClient.Pull(
    subscriptionName, returnImmediately: true, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
    string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
    Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
}
// If acknowledgement required, send to server.
if (acknowledge)
{
    subscriberClient.Acknowledge(subscriptionName,
        response.ReceivedMessages.Select(msg => msg.AckId));
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Java.

SubscriberStubSettings subscriberStubSettings =
    SubscriberStubSettings.newBuilder()
        .setTransportChannelProvider(
            SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                .setMaxInboundMessageSize(20 << 20) // 20MB
                .build())
        .build();

try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Node.js.

// Imports the Google Cloud client library
const pubsub = require('@google-cloud/pubsub');

const client = new pubsub.v1.SubscriberClient();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectName = 'your-project';
// const subscriptionName = 'your-subscription';

const formattedSubscription = client.subscriptionPath(
  projectName,
  subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
  subscription: formattedSubscription,
  maxMessages: maxMessages,
};

let isProcessed = false;

// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
  console.log(`Processing "${message.message.data}"...`);

  setTimeout(() => {
    console.log(`Finished procesing "${message.message.data}".`);
    isProcessed = true;
  }, 30000);
}

// The subscriber pulls a specified number of messages.
const [response] = await client.pull(request);
// Obtain the first message.
const message = response.receivedMessages[0];
// Send the message to the worker function.
worker(message);

let waiting = true;
while (waiting) {
  await new Promise(r => setTimeout(r, 10000));
  // If the message has been processed..
  if (isProcessed) {
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
    };

    //..acknowledges the message.
    await client.acknowledge(ackRequest);
    console.log(`Acknowledged: "${message.message.data}".`);
    // Exit after the message is acknowledged.
    waiting = false;
    console.log(`Done.`);
  } else {
    // If the message is not yet processed..
    const modifyAckRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
      ackDeadlineSeconds: newAckDeadlineSeconds,
    };

    //..reset its ack deadline.
    await client.modifyAckDeadline(modifyAckRequest);

    console.log(
      `Reset ack deadline for "${
        message.message.data
      }" for ${newAckDeadlineSeconds}s.`
    );
  }
}

PHP

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Node.js.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Protocole

Requête :

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
"returnImmediately": "false",
"maxMessages": "1"
}

Réponse :

200 OK
{
"receivedMessages": [
{
  "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
  "message": {
    "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
    "messageId": "19917247034"
  }
}
]
}
Requête :
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
"ackIds": [
"dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
]
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Python.

import logging
import multiprocessing
import random
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

NUM_MESSAGES = 2
ACK_DEADLINE = 30
SLEEP_TIME = 10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
    """Simulates a long-running process."""
    RUN_TIME = random.randint(1, 60)
    logger.info('{}: Running {} for {}s'.format(
        time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
    time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
    process = multiprocessing.Process(target=worker, args=(message,))
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is still running, reset the ack deadline as
        # specified by ACK_DEADLINE once every while as specified
        # by SLEEP_TIME.
        if process.is_alive():
            # `ack_deadline_seconds` must be between 10 to 600.
            subscriber.modify_ack_deadline(
                subscription_path,
                [ack_id],
                ack_deadline_seconds=ACK_DEADLINE)
            logger.info('{}: Reset ack deadline for {} for {}s'.format(
                time.strftime("%X", time.gmtime()),
                msg_data, ACK_DEADLINE))

        # If the processs is finished, acknowledges using `ack_id`.
        else:
            subscriber.acknowledge(subscription_path, [ack_id])
            logger.info("{}: Acknowledged {}".format(
                time.strftime("%X", time.gmtime()), msg_data))
            processes.pop(process)

    # If there are still processes running, sleeps the thread.
    if processes:
        time.sleep(SLEEP_TIME)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby se trouvant sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

Notez que pour obtenir une faible latence de distribution des messages avec une méthode pull synchrone, il est important de disposer de nombreuses requêtes pull en attente simultanément. Au fur et à mesure que le débit du sujet augmente, davantage de requêtes pull sont nécessaires. En général, la méthode pull asynchrone est préférable pour les applications sensibles à la latence.

Toutes les bibliothèques clientes ne sont pas compatibles avec le mode pull synchrone pour un nombre fixe de messages. Pour plus de détails, consultez la documentation de référence sur les API.

Scaling

Vous devrez peut-être mettre en place un système de scaling pour que votre application d'abonné puisse absorber le volume de messages. La procédure dépend de votre environnement, mais repose généralement sur les métriques de messages en attente fournies par le service Stackdriver Monitoring. Si vous souhaitez en savoir plus sur la procédure à suivre pour Compute Engine, consultez la page Effectuer un scaling basé sur les métriques Stackdriver Monitoring.

Accédez à la section Cloud Pub/Sub de la page GCP Metrics List pour connaître les métriques pouvant être surveillées par programme.

Enfin, comme pour tous les services distribués, attendez-vous à réessayer occasionnellement chaque requête.

Gérer les messages en double et forcer les tentatives

Si vous ne confirmez pas la réception d'un message avant l'expiration de son délai de confirmation, Cloud Pub/Sub le renvoie. En conséquence, Cloud Pub/Sub peut envoyer des messages en double. Avec Stackdriver, surveillez les opérations de confirmation avec le code de réponse expired pour détecter cette condition. Une méthode permettant d'obtenir ces données est la métrique Acknowledge message operations (Opérations de message de confirmation), groupées par response_code (code_réponse).

Utilisation de Stackdriver pour rechercher des délais de confirmation de messages expirés

Pour réduire le taux de duplication, prolongez le délai du message :

  • Les bibliothèques clientes gèrent automatiquement la prolongation du délai, mais des limites par défaut s'appliquent au délai maximal de prolongation que vous pouvez configurer.
  • Si vous créez votre propre bibliothèque cliente, utilisez la méthode modifyAckDeadline pour prolonger le délai de confirmation.

D'un autre côté, si vous souhaitez forcer Cloud Pub/Sub à envoyer à nouveau un message, définissez modifyAckDeadline sur 0.

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Documentation sur Cloud Pub/Sub