Trier les messages

Le tri des messages est une fonctionnalité de Pub/Sub qui permet de recevoir dans vos clients abonnés, dans l'ordre de leur publication les clients éditeurs.

Par exemple, supposons qu'un client éditeur dans une région publie les messages 1, 2, et 3 dans l'ordre. Avec le tri des messages, le client abonné reçoit les messages publiés dans le même ordre. Pour être livrés dans l'ordre, l'éditeur client doit publier les messages dans le même région.

Le tri des messages est une fonctionnalité utile pour des scénarios tels que la modification d'une base de données les applications de capture, de suivi des sessions utilisateur et de streaming pour lesquelles la préservation la chronologie des événements est importante.

Cette page explique le concept d'ordre des messages et explique comment configurer votre aux clients abonnés de recevoir les messages dans l'ordre. Pour configurer votre éditeur : pour le tri des messages, reportez-vous à la section Utiliser des clés de tri pour publier message.

Présentation du tri des messages

Le tri dans Pub/Sub est déterminé par les éléments suivants:

  • Clé de classement: chaîne utilisée dans la des métadonnées de message Pub/Sub et représente l'entité pour laquelle les messages doivent être classés. La clé de tri ne doit pas dépasser 1 Ko. À recevoir un ensemble de messages ordonnés dans une région, vous devez publier tous les messages avec la même clé de tri dans la même région. Quelques exemples d'ordre les clés sont les numéros client et la clé primaire d'une ligne dans une base de données.

    Le débit en publication sur chaque clé de tri est limité à 1 Mbit/s. La le débit de toutes les clés de tri associées à un sujet est limité au quota disponibles dans une région de publication. Cette limite peut être augmentée à de nombreuses unités de Go/s.

    Une clé de tri n'est pas équivalente à une partition dans une partition basée sur système de messagerie, car les clés de commande ont généralement que les partitions.

  • Activer le tri des messages: il s'agit d'un paramètre d'abonnement. Lorsqu'un que le tri des messages est activé pour l'abonnement, les clients abonnés reçoivent de messages publiés dans la même région avec la même clé de tri leur ordre de réception par le service. Vous devez activer ce paramètre. dans l'abonnement.

    Supposons que vous ayez deux abonnements A et B associés au même sujet T. L'abonnement A est configuré avec le tri des messages activé et l'abonnement L'option B est configurée sans que le tri des messages soit activé. Dans cette architecture, abonnements A et B reçoivent le même ensemble de messages du sujet T. Si vous publiez des messages avec des clés de tri dans la même région, l'abonnement A reçoivent les messages dans l'ordre de leur publication. En revanche, l'abonnement B reçoit les messages sans ordre attendu.

En général, si votre solution exige que les clients éditeurs envoient à la fois les messages non ordonnés, créez des sujets distincts, un pour les messages ordonnés et le pour les messages non ordonnés.

Éléments à prendre en compte lors de l'utilisation de messages ordonnés

La liste suivante contient des informations importantes sur le comportement d'une messagerie ordonnée dans Pub/Sub:

  • Ordre dans la clé: les messages publiés avec la même clé de tri sont doivent être reçus dans l’ordre. Supposons que pour trier la clé A, publier les messages 1, 2 et 3. Lorsque le tri est activé, 1 devrait être livré avant le 2 et le 2 devrait être livré avant le 3.

  • Ordre inter-clés: les messages publiés avec des clés de tri différentes sont ne devraient pas être reçus dans l'ordre. Supposons que vous ayez trié les clés A et B. Pour la clé de tri A, les messages 1 et 2 sont publiés dans l'ordre. Pour commander clé B, les messages 3 et 4 sont publiés dans l'ordre. Toutefois, le message 1 pourrait arrivent avant ou après le message 4.

  • Renvoi des messages: Pub/Sub distribue chaque message. au moins une fois. Le service Pub/Sub peut redistribuer les messages. Renvois d'un déclenche la nouvelle distribution de tous les messages ultérieurs associés à cette clé, même et reconnues. Supposons qu'un client abonné reçoive les messages 1, 2, et 3 pour une clé de tri spécifique. Si le message 2 est à nouveau distribué (parce que le le délai de confirmation a expiré ou la confirmation la plus optimale possible n'a pas été le message 3 est également redistribué. Si que le tri des messages et un sujet de lettres mortes sont activés sur un abonnement, ce comportement peut ne pas être vrai, car Pub/Sub transfère les messages vers des sujets de lettres mortes de la manière la plus optimale possible.

  • Retards de confirmation et sujets de lettres mortes: messages non confirmés pour une clé de tri donnée peut potentiellement retarder la livraison des messages pour d'autres clés de tri, en particulier lors des redémarrages du serveur ou des changements de trafic. Pour maintenir l'ordre des événements de ce type, veillez à en accuser réception en temps opportun messages. Si un accusé de réception en temps opportun n'est pas possible, envisagez d'utiliser un une file d'attente de lettres mortes pour empêcher la conservation indéfinie des messages. Gardez à l'esprit que risquent de ne pas être conservés lorsque des messages sont écrits dans une file d'attente de lettres mortes.

  • Affinité de message (clients streamingPull): les messages associés à la même clé sont généralement envoyés au même client abonné streamingPull. L'affinité est est attendu lorsque des messages sont en suspens concernant une clé de tri client abonné. S'il n'y a aucun message en attente, il est possible que l'audience d'affinité pour l'équilibrage de charge ou les déconnexions du client.

    Pour garantir un traitement fluide, même en cas de changements d'affinité potentiels, il est essentiel de concevoir votre application streamingPull de sorte qu'elle messages dans n'importe quel client pour une clé de tri donnée.

  • Intégration avec Dataflow: ne pas activer le tri des messages pour des abonnements lors de la configuration de Dataflow Pub/Sub. Dataflow dispose de son propre mécanisme ordre total des messages, garantissant l'ordre chronologique des opérations de fenêtrage. Cette méthode de ordonnancement diffère des l'approche basée sur les clés de tri de Pub/Sub. Utiliser des clés de tri avec Dataflow peut potentiellement réduire les performances du pipeline.

  • Scaling automatique: la diffusion ordonnée de Pub/Sub s'adapte à des milliards de clés de tri. Plus le nombre de clés de tri est élevé, plus distribution parallèle aux abonnés, car l'ordre s'applique à tous les messages avec la même clé de tri.

La livraison commandée s'accompagne de quelques compromis. Par rapport aux produits non commandés la diffusion commandée peut réduire légèrement la disponibilité des publications et augmenter la latence de distribution des messages de bout en bout. Dans le cas de livraison commandée, le basculement nécessite une coordination pour s'assurer que les messages sont écrits et lus dans le bon ordre.

Pour en savoir plus sur l'utilisation du tri des messages, consultez les pages suivantes : sur les bonnes pratiques:

Comportement du client abonné pour le tri des messages

Les clients abonnés reçoivent les messages dans l'ordre de leur publication dans un dans une région spécifique. Pub/Sub accepte différentes manières de recevoir les messages, tels que les clients abonnés connectés aux services pull et push, abonnements. Les bibliothèques clientes utilisent streamingPull (à l'exception de PHP).

Pour en savoir plus sur ces types d'abonnements, consultez Choisir un type d'abonnement.

Les sections suivantes expliquent ce que signifie recevoir les messages dans l'ordre pour chaque type de client abonné.

Clients abonnés StreamingPull

Lorsque vous utilisez les bibliothèques clientes avec streamingPull, vous devez spécifier un nom d'utilisateur rappel qui s'exécute chaque fois qu'un client abonné reçoit un message. Avec les bibliothèques clientes, le rappel est exécuté pour n'importe quelle clé de tri la saisie semi-automatique des messages dans le bon ordre. Si les messages sont confirmé dans ce rappel, tous les calculs d'un message sont effectués dans l'ordre. Toutefois, si le rappel de l'utilisateur planifie d'autres tâches asynchrones sur les messages, le client abonné doit s'assurer que le fonctionnement asynchrone est effectuée dans l’ordre. Une option consiste à ajouter des messages à une file d'attente locale qui sont traités dans l'ordre.

Récupérer les clients abonnés

Pour les clients abonnés connectés aux abonnements pull, Pub/Sub Le tri des messages est compatible avec les éléments suivants:

  • Tous les messages associés à une clé de tri dans la réponse PullResponse sont dans le bon ordre dans la liste.

  • Un seul lot de messages peut être en attente pour une clé de tri à la fois.

L'exigence selon laquelle un seul lot de messages peut être en attente à un est nécessaire pour maintenir une livraison ordonnée, puisque le Pub/Sub ne peut pas garantir la réussite ou la latence de la réponse qu'il envoie. pour la demande d'extraction d'un abonné.

Clients abonnés push

Les restrictions sur la poussée sont encore plus strictes que celles sur la poussée. Pour un abonnement push, Pub/Sub n'accepte qu'un seul en attente pour chaque clé de tri à la fois. Chaque message est envoyé à un un point de terminaison push en tant que requête distincte. Ainsi, l'envoi des requêtes en parallèle engendrerait le même problème que la livraison de plusieurs lots des messages pour la même clé de tri afin d'extraire simultanément les abonnés. Les abonnements push peuvent ne pas être adaptés aux sujets dans lesquels les messages sont fréquemment publiés avec la même clé de tri ou pour lesquels la latence est extrêmement important.

Exporter les clients abonnés

Les abonnements d'exportation sont compatibles avec les messages ordonnés. Pour BigQuery abonnements, les messages ayant la même clé de tri sont écrits dans l'ordre de la table BigQuery. Pour les abonnements Cloud Storage, les messages ayant la même clé de tri peuvent ne pas être écrits dans le même fichier. Dans le même fichier, les messages correspondant à une clé de tri sont classés. Quand ? répartis sur plusieurs fichiers, les messages ultérieurs d'une clé de tri peuvent apparaître dans une dont le nom comporte un code temporel antérieur à l'horodatage du nom le fichier avec les messages précédents.

Activer le tri des messages

Pour recevoir les messages dans l'ordre, définissez la propriété de tri des messages sur l'abonnement à partir duquel vous recevez les messages. La réception des messages dans l'ordre peut augmenter la latence. Une fois que vous avez terminé, vous ne pouvez plus modifier la propriété de tri des messages créer un abonnement.

Vous pouvez définir la propriété de tri des messages lorsque vous créez un abonnement à l'aide de la console Google Cloud, la Google Cloud CLI ou l'API Pub/Sub.

Console

Pour créer un abonnement avec la propriété de tri des messages, procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page Abonnements.

Accéder aux abonnements

  1. Cliquez sur Créer un abonnement.

  2. Saisissez un ID d'abonnement.

  3. Choisissez un sujet à partir duquel vous souhaitez recevoir des messages.

  4. Dans la section Tri des messages, sélectionnez Trier les messages avec une clé de tri.

  5. Cliquez sur Créer.

gcloud

Pour créer un abonnement avec la propriété de tri des messages, utilisez la méthode gcloud pubsub subscriptions create et Indicateur --enable-message-ordering:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

Remplacez SUBSCRIPTION_ID par l'ID de l'abonnement.

Si la requête aboutit, la ligne de commande affiche une confirmation :

Created subscription [SUBSCRIPTION_ID].

REST

Pour créer un abonnement avec la propriété de tri des messages, envoyez un PUT. la requête suivante:

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet avec le sujet
  • SUBSCRIPTION_ID : ID de l'abonnement

Dans le corps de la requête, spécifiez les éléments suivants :

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

Remplacez TOPIC_ID par l'ID du sujet à associer à l'abonnement.

Si la requête aboutit, la réponse est l'abonnement au format JSON :

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_message_ordering(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createWithOrdering(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                 topic,
		AckDeadline:           20 * time.Second,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

topic        = pubsub.topic topic_id
subscription = topic.subscribe subscription_id,
                               message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

Étape suivante