Commander des messages

Cloud Pub/Sub est un service de distribution de messages hautement disponible et évolutif. Cependant, l'ordre dans lequel les messages sont reçus par les abonnés n'est pas garanti. Cela peut être considéré comme un inconvénient, mais il existe en fait très peu de cas d'utilisation nécessitant un ordonnancement strict des messages.

Dans ce document, nous expliquons en quoi consiste réellement l'ordonnancement des messages et les concessions qu'il implique. Nous nous intéressons également aux cas d'utilisation et aux techniques permettant de traiter les messages selon un certain ordre dans votre workflow actuel lorsque vous passez à Cloud Pub/Sub.

Qu'est-ce que l'ordonnancement ?

À première vue, l'ordonnancement des messages est un concept simple. Le schéma ci-dessous donne un aperçu de ce qui se passe lorsqu'un message est acheminé par un service de distribution de messages :

Quel ordonnancement ?

L'éditeur envoie un message sur un sujet dans Cloud Pub/Sub. Le message est ensuite remis à l'abonné via un abonnement. En présence d'un seul éditeur synchrone, d'un seul abonné synchrone et d'un seul serveur de messagerie synchrone, fonctionnant tous sur une couche de transport synchrone, la notion d'ordonnancement semble simple : les messages sont triés en fonction du moment où l'éditeur parvient à les publier, selon un délai absolu ou une séquence.

Même dans ce cas simple, garantir l'ordonnancement des messages imposerait d'importantes contraintes au niveau du débit. Le seul moyen de garantir réellement l'ordonnancement des messages serait de les remettre un à un à l'abonné via le service de distribution de messages et d'attendre que l'abonné confirme la réception et le traitement du message actuel (généralement par un accusé de réception envoyé au service) pour lui remettre le message suivant. Dans ce cas de figure, toute évolutivité du débit serait exclue. Une autre solution serait que le service garantisse uniquement que la première remise d'un message s'effectue dans l'ordre, permettant ainsi les tentatives de redistribution à tout moment. Cela permettrait d'envoyer de nombreux messages à la fois à l'abonné. Toutefois, même si les contraintes sont assouplies comme nous venons de l'évoquer, le concept d'ordonnancement est moins clair lorsque vous avez plusieurs éditeurs, services de distribution des messages et abonnés.

L'ordonnancement est-il réalisé ?

Définir l'ordonnancement des messages peut être compliqué, selon les éditeurs et les abonnés. Tout d'abord, il est possible que plusieurs abonnés traitent des messages via un seul abonnement :

Plusieurs abonnés

Dans ce cas, même si l'ordonnancement des messages s'effectue au niveau de l'abonnement, il n'y a aucune garantie quant à l'ordre dans lequel les messages seront traités par les abonnés. Si l'ordre de traitement est important, les abonnés devront se coordonner via un système de stockage ACID tel que Cloud Datastore ou Cloud SQL.

De même, l'ordonnancement peut s'avérer compliqué lorsqu'il y a plusieurs éditeurs sur le même sujet :

Plusieurs éditeurs

Comment attribuer un ordre aux messages publiés par différents éditeurs ? Soit les éditeurs doivent coordonner leurs activités, soit le service de distribution de messages doit associer une information d'ordonnancement à chaque message entrant. Chacun des messages devra inclure cette information d'ordonnancement. Il pourra s'agir d'un horodatage (il faudra alors que tous les serveurs obtiennent cet horodatage de la même source afin d'éviter toute dérive d'horloge) ou d'un numéro de séquence (obtenu auprès d'une source unique avec les garanties ACID). D'autres systèmes de messagerie qui assurent l'ordonnancement des messages requièrent des paramètres qui limitent effectivement le système à plusieurs éditeurs envoyant des messages via un serveur unique à un seul abonné.

Extension du nœud de service de distribution de messages

Si le service de distribution de messages abstrait utilisé dans les exemples ci-dessus était un serveur unique et synchrone, le service lui-même pourrait alors garantir l'ordonnancement des messages. Toutefois, un service de distribution de messages tel que Cloud Pub/Sub ne repose pas sur un serveur unique, ni sur un seul rôle serveur. En fait, il y a plusieurs couches entre vos éditeurs et abonnés et le système Cloud Pub/Sub lui-même. Voici un diagramme plus détaillé qui illustre le fonctionnement en place lorsque Cloud Pub/Sub est le système de distribution des messages :

Distribution des messages

Comme vous pouvez le constater, un message peut emprunter de nombreux chemins pour passer de l'éditeur à l'abonné. L'avantage d'une telle architecture est qu'elle est hautement disponible (comme elle repose sur plusieurs serveurs, aucune panne ne provoque des retards à l'échelle du système) et évolutive (les messages peuvent être distribués sur plusieurs serveurs pour maximiser le débit). Les avantages de systèmes distribués comme celui-ci ont été déterminants pour les produits Google tels que la recherche, Google Ads et Gmail, qui s'appuient sur les mêmes systèmes que ceux exécutant Cloud Pub/Sub.

Comment dois-je gérer l'ordonnancement ?

Nous espérons que vous comprenez désormais pourquoi l'ordonnancement des messages est assez complexe et pourquoi Cloud Pub/Sub ne le considère pas nécessaire. Pour que nous puissions assurer la disponibilité et l'évolutivité du système, il est important de réduire la dépendance à l'ordonnancement. Cette dépendance peut prendre plusieurs formes, comme illustré ci-dessous avec quelques cas d'utilisation et solutions types.

L'ordonnancement n'a aucune importance

Cas d'utilisation types : file d'attente de tâches indépendantes, collecte de statistiques sur les événements

Les cas d'utilisation dans lesquels l'ordonnancement n'a aucune importance sont parfaits pour Cloud Pub/Sub. Par exemple, si des tâches indépendantes doivent être exécutées par les abonnés, chaque tâche correspond à un message reçu par l'abonné qui doit effectuer l'action requise. Autre exemple, si vous souhaitez collecter des statistiques sur toutes les actions réalisées par les clients sur votre serveur, vous pouvez publier un message pour chaque événement, puis faire en sorte que les abonnés compilent les messages et mettent à jour les résultats dans un espace de stockage persistant.

L'ordonnancement dans le résultat final est important

Cas d'utilisation types : journaux, mises à jour d'état

Dans les cas d'utilisation de cette catégorie, l'ordre dans lequel les messages sont traités n'a pas d'importance. Tout ce qui compte est que le résultat final soit trié correctement. Prenons l'exemple d'un journal compilé qui est traité et stocké sur le disque. Les événements du journal proviennent de plusieurs éditeurs. Dans ce cas, l'ordre réel dans lequel les événements du journal sont traités n'a pas d'importance. Tout ce qui compte est que le résultat final soit trié chronologiquement. Par conséquent, vous pouvez associer un horodatage à chaque événement de l'éditeur et faire en sorte que l'abonné stocke les messages dans un datastore sous-jacent (tel que Cloud Datastore) permettant le stockage ou la récupération des données triées chronologiquement.

La même option fonctionne pour les mises à jour d'état nécessitant l'accès à l'état le plus récent uniquement. Par exemple, dans le suivi des prix de différents stocks, l'historique n'a pas d'importance. Seul le prix le plus récent compte. Vous pouvez associer un horodatage à chaque prix de stock et n'enregistrer que ceux dont la valeur est plus récente que celle actuellement enregistrée.

L'ordre dans lequel les messages sont traités est important

Cas d'utilisation types : données transactionnelles pour lesquelles des seuils doivent être appliqués

La dépendance totale à l'ordre dans lequel les messages sont traités est le cas le plus compliqué. Toute solution qui impose un ordonnancement strict des messages se fera au détriment des performances et du débit. Elle ne doit être adoptée que lorsqu'elle s'avère absolument nécessaire et quand vous êtes certain que vous n'aurez pas à évoluer vers un grand nombre de messages par seconde. Pour que les messages soient traités dans l'ordre, un abonné a deux possibilités :

  • Il doit connaître la liste complète des messages en attente et l'ordre dans lequel ils doivent être traités.

  • Il doit pouvoir déterminer à partir de tous les messages reçus s'il y a des messages qu'il n'a pas encore eus et qu'il doit traiter en priorité.

Vous pouvez utiliser la première méthode en attribuant à chaque message un identifiant unique et en enregistrant dans un emplacement persistant (tel que Cloud Datastore) l'ordre de traitement des messages. L'abonné consulte l'espace de stockage persistant pour connaître le prochain message qu'il doit traiter et s'assure qu'il traite ce message avant les autres messages reçus apparaissant dans l'ordonnancement complet. À ce stade, il est utile de se servir de l'espace de stockage persistant comme file d'attente des messages et de ne pas dépendre de la distribution des messages via Cloud Pub/Sub.

La deuxième méthode est possible si vous vous servez de Cloud Monitoring pour garder une trace de la métrique pubsub.googleapis.com/subscription/oldest_unacked_message_age (consultez la page sur les métriques acceptées pour obtenir une description). L'abonné met temporairement tous les messages dans un espace de stockage persistant et confirme leur réception. Il vérifie périodiquement l'âge des messages les plus anciens qui n'ont pas fait l'objet d'un accusé de réception et le compare aux données d'horodatage de l'éditeur figurant dans les messages stockés. Tous les messages publiés avant le plus ancien des messages n'ayant pas fait l'objet d'un accusé de réception ont été reçus. Par conséquent, ils peuvent être supprimés de l'espace de stockage persistant et traités dans l'ordre.

Sinon, si vous avez un seul éditeur synchrone et un seul abonné, vous pouvez garantir l'ordonnancement à l'aide d'un numéro de séquence. Cette approche nécessite l'utilisation d'un compteur persistant. Pour chaque message, l'éditeur effectue les opérations suivantes :

Node.js

Pour savoir comment créer un client Cloud Pub/Sub, reportez-vous à la page Bibliothèques clientes Cloud Pub/Sub.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const attributes = {
  // Pub/Sub messages are unordered, so assign an order ID and manually order messages
  counterId: `${getPublishCounterValue()}`,
};

// Publishes the message
const messageId = await pubsub
  .topic(topicName)
  .publish(dataBuffer, attributes);
// Update the counter value
setPublishCounterValue(parseInt(attributes.counterId, 10) + 1);
console.log(`Message ${messageId} published.`);
return messageId;

L'abonné effectue les tâches suivantes :

Node.js

Pour savoir comment créer un client Cloud Pub/Sub, reportez-vous à la page Bibliothèques clientes Cloud Pub/Sub.

const outstandingMessages = {};

async function listenForOrderedMessages(subscriptionName, timeout) {
  // Imports the Google Cloud client library
  const {PubSub} = require('@google-cloud/pubsub');

  // Creates a client
  const pubsub = new PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Create an event handler to handle messages
  const messageHandler = function(message) {
    // Buffer the message in an object (for later ordering)
    outstandingMessages[message.attributes.counterId] = message;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on(`message`, messageHandler);
  await new Promise(r => setTimeout(r, timeout * 1000));
  subscription.removeListener(`message`, messageHandler);

  // Pub/Sub messages are unordered, so here we manually order messages by
  // their "counterId" attribute which was set when they were published.
  const outstandingIds = Object.keys(outstandingMessages).map(counterId =>
    Number(counterId, 10)
  );
  outstandingIds.sort();

  outstandingIds.forEach(counterId => {
    const counter = getSubscribeCounterValue();
    const message = outstandingMessages[counterId];

    if (counterId < counter) {
      // The message has already been processed
      message.ack();
      delete outstandingMessages[counterId];
    } else if (counterId === counter) {
      // Process the message
      console.log(
        `* %d %j %j`,
        message.id,
        message.data.toString(),
        message.attributes
      );
      setSubscribeCounterValue(counterId + 1);
      message.ack();
      delete outstandingMessages[counterId];
    } else {
      // Have not yet processed the message on which this message is dependent
      return false;
    }
  });
}

Ces solutions introduisent une latence dans la publication et le traitement des messages. Une étape synchrone est nécessaire dans l'éditeur pour que l'ordonnancement des messages soit établi. Au niveau de l'abonné, un délai doit être défini pour les messages non triés afin que l'ordonnancement soit forcé.

Résumé

Lors de la première utilisation d'un service de distribution de messages, la transmission des messages dans l'ordre semble être une caractéristique intéressante. Cela simplifie le code permettant de traiter les messages lorsque l'ordre est important. Toutefois, le prix à payer pour assurer leur ordonnancement se révèle élevé en termes de disponibilité et d'évolutivité, quel que soit le système de distribution de messages utilisé. Pour les produits Google basés sur la même infrastructure dans Cloud Pub/Sub, la disponibilité et l'évolutivité sont d'une importance capitale. C'est pourquoi le service n'assure pas l'ordonnancement des messages. Dans la mesure du possible, concevez vos applications de sorte qu'elles ne dépendent pas de l'ordre des messages. Vous pourrez ensuite la faire évoluer facilement grâce au scaling Cloud Pub/Sub pour transmettre tous vos messages de manière rapide et fiable.

Si vous choisissez d'utiliser l'ordonnancement des messages dans Cloud Pub/Sub, reportez-vous aux pages consacrées à Cloud Datastore et à Cloud SQL pour en savoir plus sur l'application des stratégies décrites dans ce document.

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Documentation sur Cloud Pub/Sub