Trier les messages

Le tri des messages est une fonctionnalité de Pub/Sub qui vous permet de recevoir les messages de vos clients abonnés dans l'ordre dans lequel ils ont été publiés par les clients de l'éditeur.

Par exemple, supposons qu'un client éditeur d'une région publie les messages 1, 2 et 3 dans l'ordre. Avec le tri des messages, le client abonné reçoit les messages publiés dans le même ordre. Pour être distribués dans l'ordre, le client éditeur doit publier les messages dans la même région.

Le tri des messages est une fonctionnalité utile pour des scénarios tels que la capture de modifications de base de données, le suivi de session utilisateur et les applications de streaming, dans lesquels il est important de préserver la chronologie des événements.

Cette page explique le concept d'ordonnancement des messages et explique comment configurer vos clients abonnés pour les recevoir dans l'ordre. Pour configurer vos clients éditeur pour l'ordre des messages, consultez Publier un message à l'aide de clés de tri.

Présentation du tri des messages

L'ordre dans Pub/Sub est déterminé par les éléments suivants:

  • Clé de tri: chaîne utilisée dans les métadonnées des messages Pub/Sub et représente l'entité pour laquelle les messages doivent être commandés. La clé de tri ne doit pas dépasser 1 Ko. Pour recevoir un ensemble de messages triés dans une région, vous devez publier tous les messages ayant la même clé de tri dans la même région. Les numéros client et la clé primaire d'une ligne dans une base de données sont des exemples de clés de tri.

    Le débit en publication pour chaque clé de tri est limité à 1 Mbit/s. Le débit de toutes les clés de tri sur un sujet est limité au quota disponible dans une région de publication. Cette limite peut être augmentée jusqu'à plusieurs unités de Go/s.

    Une clé de tri n'est pas équivalente à une partition dans un système de messagerie basé sur des partitions, car les clés de tri sont censées avoir une cardinalité beaucoup plus élevée que les partitions.

  • Activer le tri des messages: il s'agit d'un paramètre d'abonnement. Lorsque le tri des messages est activé pour un abonnement, les clients abonnés reçoivent les messages publiés dans la même région avec la même clé de tri, dans l'ordre dans lequel ils ont été reçus par le service. Vous devez activer ce paramètre dans l'abonnement.

    Supposons que vous ayez deux abonnements A et B associés au même sujet T. L'abonnement A est configuré avec l'option de tri des messages activée, et l'abonnement B est configuré sans cette option. Dans cette architecture, les abonnements A et B reçoivent le même ensemble de messages du sujet T. Si vous publiez des messages avec des clés de tri dans la même région, l'abonnement A les reçoit dans l'ordre de leur publication. En revanche, l'abonnement B reçoit les messages sans ordre attendu.

En général, si votre solution nécessite que les clients éditeurs envoient des messages à la fois ordonnés et non ordonnés, créez des sujets distincts, un pour les messages ordonnés et l'autre pour les messages non ordonnés.

Éléments à prendre en compte lors de l'utilisation de la messagerie ordonnée

La liste suivante contient des informations importantes sur le comportement des messages ordonnés dans Pub/Sub:

  • Classement par clé: les messages publiés avec la même clé de tri doivent être reçus dans l'ordre. Supposons que, pour trier la clé A, vous publiez les messages 1, 2 et 3. Lorsque la commande est activée, l'article 1 doit être livré avant le 2 et le deuxième doit être livré avant le 3.

  • Ordre interclé: les messages publiés avec des clés de tri différentes ne sont pas censés être reçus dans l'ordre. Supposons que vous ayez les clés de tri A et B. Pour le tri de la clé A, les messages 1 et 2 sont publiés dans l'ordre. Pour la clé de tri B, les messages 3 et 4 sont publiés dans l'ordre. Toutefois, le message 1 peut arriver avant ou après le message 4.

  • Nouvelle distribution des messages: Pub/Sub distribue chaque message au moins une fois. Par conséquent, le service Pub/Sub peut redistribuer les messages. Les renvois d'un message déclenchent la redistribution de tous les messages suivants pour cette clé, même ceux confirmés. Supposons qu'un client abonné reçoive les messages 1, 2 et 3 pour une clé de tri spécifique. Si le message 2 est redistribué (parce que le délai de confirmation a expiré ou que l'accusé de réception au mieux ne persiste pas dans Pub/Sub), le message 3 est également redistribué. Si le tri des messages et un sujet de lettres mortes sont activés sur un abonnement, ce comportement peut ne pas être vrai, car Pub/Sub transfère au mieux les messages vers des sujets de lettres mortes.

  • Retards d'accusé de réception et sujets de lettres mortes: les messages non confirmés pour une clé de tri donnée peuvent potentiellement retarder la distribution des messages pour d'autres clés de tri, en particulier lors des redémarrages de serveur ou des modifications du trafic. Pour maintenir l'ordre dans ces événements, assurez la confirmation de tous les messages en temps opportun. S'il n'est pas possible d'obtenir une confirmation en temps opportun, envisagez d'utiliser un file d'attente de lettres mortes pour éviter que les messages restent indéfiniment en attente. Sachez que l'ordre peut ne pas être préservé lorsque les messages sont écrits dans un file d'attente de lettres mortes.

  • Affinité des messages (clients streamingPull): les messages associés à la même clé sont généralement distribués au même client abonné streamingPull. L'affinité est attendue lorsque des messages sont en attente pour une clé de tri pour un client abonné spécifique. S'il n'y a pas de messages en attente, l'affinité peut changer pour l'équilibrage de charge ou les déconnexions du client.

    Pour assurer un traitement fluide même en cas de modifications d'affinité potentielles, il est essentiel de concevoir votre application streamingPull de sorte qu'elle puisse traiter les messages dans n'importe quel client pour une clé de tri donnée.

  • Intégration à Dataflow: n'activez pas le tri des messages pour les abonnements lorsque vous configurez Dataflow avec Pub/Sub. Dataflow dispose de son propre mécanisme d'ordonnancement des messages, qui assure l'ordre chronologique de tous les messages dans le cadre des opérations de fenêtrage. Cette méthode de tri diffère de l'approche de Pub/Sub de tri basée sur les clés. L'utilisation de clés de tri avec Dataflow peut potentiellement réduire les performances du pipeline.

  • Scaling automatique: la diffusion ordonnée de Pub/Sub peut s'adapter à des milliards de clés de tri. Un plus grand nombre de clés de tri permet une distribution plus parallèle aux abonnés, car le tri s'applique à tous les messages ayant la même clé de tri.

La livraison de la commande s'accompagne de quelques compromis. Par rapport à la distribution non ordonnée, la distribution ordonnée peut diminuer légèrement la disponibilité de la publication et augmenter la latence de distribution des messages de bout en bout. Dans le cas d'une distribution ordonnée, le basculement nécessite une coordination pour s'assurer que les messages sont écrits et lus dans le bon ordre.

Pour en savoir plus sur l'utilisation de l'ordre des messages, consultez les articles suivants concernant les bonnes pratiques:

Comportement des clients abonnés pour l'ordre des messages

Les clients abonnés reçoivent les messages dans l'ordre de leur publication dans une région spécifique. Pub/Sub est compatible avec différentes façons de recevoir des messages, telles que les clients abonnés connectés aux abonnements pull et push. Les bibliothèques clientes utilisent streamingPull (à l'exception de PHP).

Pour en savoir plus sur ces types d'abonnements, consultez Choisir un type d'abonnement.

Les sections suivantes décrivent la signification de la réception de messages dans l'ordre pour chaque type de client abonné.

Clients abonnés StreamingPull

Lorsque vous utilisez les bibliothèques clientes avec streamingPull, vous devez spécifier un rappel utilisateur qui s'exécute chaque fois qu'un message est reçu par un client abonné. Avec les bibliothèques clientes, pour n'importe quelle clé de tri donnée, le rappel est exécuté pour finaliser les messages dans le bon ordre. Si les messages sont confirmés dans ce rappel, tous les calculs d'un message sont effectués dans l'ordre. Toutefois, si le rappel de l'utilisateur planifie d'autres tâches asynchrones sur les messages, le client abonné doit s'assurer que les tâches asynchrones sont effectuées dans l'ordre. Une option consiste à ajouter les messages à une file d'attente de travail locale qui sont traités dans l'ordre.

Clients abonnés en mode récupération

Pour les clients abonnés connectés à des abonnements pull, l'ordre des messages Pub/Sub est compatible avec les éléments suivants:

  • Tous les messages d'une clé de tri dans PullResponse sont dans le bon ordre dans la liste.

  • Un seul lot de messages peut être en attente pour une clé de tri à la fois.

L'exigence selon laquelle un seul lot de messages peut être en attente à la fois est nécessaire pour maintenir la distribution ordonnée, car le service Pub/Sub ne peut pas garantir le succès ni la latence de la réponse qu'il envoie à la demande d'extraction d'un abonné.

Clients d'abonnés push

Les restrictions de poussée sont encore plus strictes que celles en mode pull. Pour un abonnement push, Pub/Sub n'accepte qu'un seul message en attente à la fois pour chaque clé de commande. Chaque message est envoyé à un point de terminaison push en tant que requête distincte. Ainsi, l'envoi des requêtes en parallèle aurait le même problème que la distribution de plusieurs lots de messages pour la même clé de tri pour extraire simultanément les abonnés. Les abonnements push peuvent ne pas être un bon choix pour les sujets dans lesquels des messages sont fréquemment publiés avec la même clé de tri ou pour lesquels la latence est extrêmement importante.

Exporter les clients abonnés

Les abonnements exportés prennent en charge les messages ordonnés. Pour les abonnements BigQuery, les messages ayant la même clé de tri sont écrits dans leur table BigQuery dans l'ordre. Pour les abonnements Cloud Storage, il est possible que les messages ayant la même clé de tri ne soient pas tous écrits dans le même fichier. Dans le même fichier, les messages pour une clé de tri sont dans l'ordre. En cas de répartition sur plusieurs fichiers, les messages ultérieurs d'une clé de tri peuvent apparaître dans un fichier dont le nom est antérieur à l'horodatage figurant dans le nom du fichier contenant les messages précédents.

Activer le tri des messages

Pour recevoir les messages dans l'ordre, définissez la propriété de tri des messages sur l'abonnement à partir duquel vous recevez les messages. La réception des messages dans l'ordre peut augmenter la latence. Vous ne pouvez pas modifier la propriété de tri des messages après avoir créé un abonnement.

Vous pouvez définir la propriété de tri des messages lorsque vous créez un abonnement à l'aide de la console Google Cloud, de Google Cloud CLI ou de l'API Pub/Sub.

Console

Pour créer un abonnement avec la propriété de tri des messages, procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page Abonnements.

Accéder aux abonnements

  1. Cliquez sur Créer un abonnement.

  2. Saisissez un ID d'abonnement.

  3. Choisissez un sujet à partir duquel vous souhaitez recevoir des messages.

  4. Dans la section Tri des messages, sélectionnez Trier les messages avec une clé de tri.

  5. Cliquez sur Créer.

gcloud

Pour créer un abonnement avec la propriété de tri des messages, utilisez la commande gcloud pubsub subscriptions create et l'option --enable-message-ordering:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

Remplacez SUBSCRIPTION_ID par l'ID de l'abonnement.

Si la requête aboutit, la ligne de commande affiche une confirmation :

Created subscription [SUBSCRIPTION_ID].

REST

Pour créer un abonnement avec la propriété de tri des messages, envoyez une requête PUT comme suit:

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet avec le sujet
  • SUBSCRIPTION_ID : ID de l'abonnement

Dans le corps de la requête, spécifiez les éléments suivants :

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

Remplacez TOPIC_ID par l'ID du sujet à associer à l'abonnement.

Si la requête aboutit, la réponse est l'abonnement au format JSON :

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_message_ordering(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createWithOrdering(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                 topic,
		AckDeadline:           20 * time.Second,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

topic        = pubsub.topic topic_id
subscription = topic.subscribe subscription_id,
                               message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

Étapes suivantes