Filtrer les messages d'un abonnement

Cette page explique comment créer des abonnements Pub/Sub avec des filtres.

Lorsque vous recevez des messages d'un abonnement avec un filtre, vous ne recevez que ceux qui correspondent au filtre. Le service Pub/Sub reconnaît automatiquement les messages qui ne correspondent pas au filtre. Vous pouvez filtrer les messages en fonction de leurs attributs, mais pas en fonction des données qu'ils contiennent.

Vous pouvez associer plusieurs abonnements à un sujet, et chacun d'eux peut avoir un filtre différent.

Par exemple, si un sujet reçoit des actualités provenant de différentes régions du monde, vous pouvez configurer un abonnement pour filtrer les actualités publiées uniquement depuis une région spécifique. Pour cette configuration, vous devez vous assurer que l'un des attributs du message de sujet indique la région de la publication.

Lorsque vous recevez des messages d'un abonnement avec un filtre, vous ne payez pas de frais pour les messages sortants pour les messages que Pub/Sub accuse automatiquement. Des frais de distribution de messages et de stockage associé à Seek vous sont facturés pour ces messages.

Créer un abonnement avec un filtre

Les abonnements pull et push peuvent comporter des filtres. Tous les abonnés peuvent recevoir des messages des abonnements avec des filtres, y compris ceux qui utilisent l'API StreamingPull.

Vous pouvez créer un abonnement avec un filtre à l'aide de la console Google Cloud, de Google Cloud CLI, des bibliothèques clientes ou de l'API Pub/Sub.

Console

Pour créer un abonnement pull avec filtre, procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page Abonnements.

    Accéder à la page Abonnements

  2. Cliquez sur Créer un abonnement.

  3. Saisissez l'ID de l'abonnement.

  4. Choisissez ou créez un sujet dans le menu déroulant. L'abonnement reçoit les messages du sujet.

  5. Dans la section Filtre d'abonnement, saisissez l'expression du filtre.

  6. Cliquez sur Créer.

Pour créer un abonnement push avec un filtre, procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page Abonnements.

    Accéder à la page Abonnements

  2. Cliquez sur Créer un abonnement.

  3. Saisissez l'ID de l'abonnement.

  4. Choisissez ou créez un sujet dans le menu déroulant. L'abonnement reçoit les messages du sujet.

  5. Dans la section Type de distribution, cliquez sur Push.

  6. Dans le champ URL du point de terminaison, saisissez l'URL du point de terminaison push.

  7. Dans la section Filtre d'abonnement, saisissez l'expression du filtre.

  8. Cliquez sur Créer.

gcloud

Pour créer un abonnement pull avec un filtre, exécutez la commande gcloud pubsub subscriptions create avec l'option --message-filter :

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --message-filter='FILTER'

Remplacez les éléments suivants :

  • SUBSCRIPTION_ID : ID de l'abonnement à créer
  • TOPIC_ID : ID du sujet à associer à l'abonnement
  • FILTER : expression dans la syntaxe de filtrage

Pour créer un abonnement push avec un filtre, exécutez la commande gcloud pubsub subscriptions create avec les options --push-endpoint et --message-filter :

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --push-endpoint=PUSH_ENDPOINT \
  --message-filter='FILTER'

Remplacez les éléments suivants :

  • SUBSCRIPTION_ID : ID de l'abonnement à créer
  • TOPIC_ID : ID du sujet à associer à l'abonnement
  • PUSH_ENDPOINT : URL du serveur sur lequel s'exécute l'abonné push
  • FILTER : expression dans la syntaxe de filtrage

REST

Pour créer un abonnement avec un filtre, utilisez la méthode projects.subscriptions.create.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet dans lequel créer l'abonnement
  • SUBSCRIPTION_ID : ID de l'abonnement à créer

Pour créer un abonnement pull avec un filtre, spécifiez le filtre dans le corps de la requête :

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "filter": "FILTER"
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet contenant le sujet.
  • TOPIC_ID : ID du sujet à associer à l'abonnement
  • FILTER : expression dans la syntaxe de filtrage

Pour créer un abonnement push avec un filtre, spécifiez le point de terminaison push et le filtre dans le corps de la requête :

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "pushConfig": {
    "pushEndpoint": "PUSH_ENDPOINT"
  },
  "filter": "FILTER"
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet contenant le sujet.
  • TOPIC_ID : ID du sujet à associer à l'abonnement
  • PUSH_ENDPOINT : URL du serveur sur lequel s'exécute l'abonné push
  • FILTER : expression dans la syntaxe de filtrage

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string topic_id,
   std::string subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, std::move(subscription_id))
          .FullName());
  request.set_topic(
      pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.set_filter(R"""(attributes.is-even = "false")""");
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithFilteringSample
{
    public Subscription CreateSubscriptionWithFiltering(string projectId, string topicId, string subscriptionId, string filter)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        Subscription subscription = null;

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            Filter = filter
        };

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createWithFilter(w io.Writer, projectID, subID, filter string, topic *pubsub.Topic) error {
	// Receive messages with attribute key "author" and value "unknown".
	// projectID := "my-project-id"
	// subID := "my-sub"
	// filter := "attributes.author=\"unknown\""
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:  topic,
		Filter: filter,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with filter: %v\n", sub)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithFiltering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String filter = "attributes.author=\"unknown\"";

    createSubscriptionWithFilteringExample(projectId, topicId, subscriptionId, filter);
  }

  public static void createSubscriptionWithFilteringExample(
      String projectId, String topicId, String subscriptionId, String filter) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Receive messages with attribute key "author" and value "unknown".
                  .setFilter(filter)
                  .build());

      System.out.println(
          "Created a subscription with filtering enabled: " + subscription.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId,
  subscriptionNameOrId,
  filterString
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`
  );
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  filterString: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`
  );
}

PHP

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage PHP qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $filter  The Pub/Sub subscription filter.
 */
function create_subscription_with_filter(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $filter
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);

    $subscription->create(['filter' => $filter]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
    printf('Subscription info: %s' . PHP_EOL, json_encode($subscription->info()));
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={"name": subscription_path, "topic": topic_path, "filter": filter}
    )
    print(f"Created subscription with filtering enabled: {subscription}")

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

require "google/cloud/pubsub"

# Shows how to create a new subscription with filter for a given topic
class PubsubCreateSubscriptionWithFilter
  def create_subscription_with_filter project_id:, topic_id:, subscription_id:, filter:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, filter: filter
    puts "Created subscription with filtering enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    filter = "attributes.author=\"unknown\""
    PubsubCreateSubscriptionWithFilter.new.create_subscription_with_filter project_id: project_id,
                                                                           topic_id: topic_id,
                                                                           subscription_id: subscription_id,
                                                                           filter: filter
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithFilter.run
end

La longueur maximale d'un filtre est de 256 octets. Le filtre est une propriété immuable d'un abonnement. Après avoir créé un abonnement, vous ne pouvez pas le mettre à jour pour modifier le filtre.

Comment les filtres affectent les métriques en attente

Pour surveiller l'abonnement que vous venez de créer, consultez Surveiller les abonnements à l'aide de filtres.

Si le filtrage est activé, les métriques de tâches en attente n'incluent que les données des messages correspondant au filtre. Voici une liste des métriques du backlog:

  • subscription/backlog_bytes
  • subscription/unacked_bytes_by_region
  • subscription/num_undelivered_messages
  • subscription/num_unacked_messages_by_region
  • subscription/oldest_unacked_message_age
  • subscription/oldest_unacked_message_age_by_region
  • topic/unacked_bytes_by_region
  • topic/num_unacked_messages_by_region
  • topic/oldest_unacked_messages_age_by_region

Pour en savoir plus sur ces métriques, consultez la liste des métriques Pub/Sub.

Mettre à jour le filtre d'un abonnement

Vous ne pouvez pas modifier le filtre pour un abonnement existant. Suivez plutôt la procédure ci-dessous.

  1. Prenez un instantané de l'abonnement pour lequel vous souhaitez modifier le filtre.

    Pour en savoir plus sur la prise d'un instantané à l'aide de la console, consultez Créer un instantané.

  2. Créez un abonnement avec le nouveau filtre.

    Pour en savoir plus sur la création d'un abonnement avec un filtre, consultez Créer un abonnement avec un filtre.

  3. Dans la console Google Cloud, accédez à la page Abonnements Pub/Sub.

    Accéder aux abonnements

  4. Cliquez sur l'abonnement que vous venez de créer.

  5. Sur la page d'informations de l'abonnement, cliquez sur Relire les messages.

  6. Dans le champ Seek (Rechercher), cliquez sur To a snapshot (Accéder à un instantané).

  7. Sélectionnez l'instantané que vous avez créé pour l'abonnement d'origine à l'étape 1, puis cliquez sur Seek (Rechercher).

    Aucun message n'est perdu pendant la transition.

  8. Modifiez les abonnés pour qu'ils utilisent le nouvel abonnement.

Une fois cette procédure terminée, vous pouvez supprimer l'abonnement d'origine.

Syntaxe pour créer un filtre

Pour filtrer les messages, écrivez une expression qui utilise des attributs. Vous pouvez écrire une expression qui correspond à la clé ou à la valeur des attributs. L'identifiant attributes sélectionne les attributs du message.

Par exemple, les filtres du tableau suivant sélectionnent l'attribut name :

Filtre Description
attributes:name Messages comportant l'attribut name
NOT attributes:name Messages sans attribut name
attributes.name = "com" Messages avec l'attribut name et la valeur com
attributes.name != "com" Messages sans l'attribut name et la valeur com
hasPrefix(attributes.name, "co") Messages avec l'attribut name et une valeur commençant par co
NOT hasPrefix(attributes.name, "co") Messages sans attribut name et une valeur commençant par co

Opérateurs de comparaison pour l'expression de filtre

Vous pouvez filtrer les attributs à l'aide des opérateurs de comparaison suivants :

  • :
  • =
  • !=

L'opérateur : correspond à une clé d'une liste d'attributs.

attributes:KEY

Les opérateurs d'égalité correspondent aux clés et aux valeurs. La valeur doit être une valeur littérale de chaîne.

attributes.KEY = "VALUE"

Une expression avec un opérateur d'égalité doit commencer par une clé, et l'opérateur doit comparer une clé et une valeur.

  • Valide : le filtre compare une clé et une valeur.

    attributes.name = "com"
    
  • Non valide : le côté gauche du filtre est une valeur.

    "com" = attributes.name
    
  • Non valide : le filtre compare deux clés.

    attributes.name = attributes.website
    

La clé et la valeur sont sensibles à la casse et doivent correspondre exactement à l'attribut. Si une clé contient des caractères autres que des traits d'union, des traits de soulignement ou des caractères alphanumériques, utilisez un littéral de chaîne.

attributes."iana.org/language_tag" = "en"

Pour utiliser des barres obliques inverses, des guillemets et des caractères non imprimables dans un filtre, échappez les caractères dans un littéral de chaîne. Vous pouvez également utiliser des séquences d'échappement Unicode, hexadécimales et octales dans un littéral de chaîne.

  • Valide : le filtre échappe les caractères dans un littéral de chaîne.

    attributes:"\u307F\u3093\u306A"
    
  • Non valide : le filtre échappe les caractères sans littéral de chaîne.

    attributes:\u307F\u3093\u306A
    

Opérateurs booléens pour l'expression de filtre

Vous pouvez utiliser les opérateurs booléens AND, NOT et OR dans un filtre. Les opérateurs doivent être en majuscules. Par exemple, le filtre suivant concerne les messages avec l'attribut iana.org/language_tag, mais sans l'attribut name ni la valeur com.

attributes:"iana.org/language_tag" AND NOT attributes.name = "com"

L'opérateur NOT a la priorité la plus élevée. Pour combiner les opérateurs AND et OR, utilisez des parenthèses et des expressions complètes.

  • Valides : opérateurs AND et OR avec des parenthèses.

    attributes:"iana.org/language_tag" AND (attributes.name = "net" OR attributes.name = "org")
    
  • Non valide : opérateurs AND et OR sans parenthèses.

    attributes:"iana.org/language_tag" AND attributes.name = "net" OR attributes.name = "org"
    
  • Non valide : les opérateurs AND et OR combinent des expressions incomplètes.

    attributes.name = "com" AND ("net" OR "org")
    

Vous pouvez également utiliser l'opérateur moins unaire au lieu de l'opérateur NOT.

attributes.name = "com" AND -attributes:"iana.org/language_tag"

Fonctions pour l'expression de filtre

Vous pouvez utiliser la fonction hasPrefix pour filtrer les attributs dont les valeurs commencent par une sous-chaîne. hasPrefix est la seule fonction compatible avec un filtre.

Bien que la correspondance des préfixes soit compatible avec la fonction hasPrefix, les expressions régulières générales ne sont pas acceptées.

hasPrefix(attributes.KEY, "SUBSTRING")

Remplacez les éléments suivants :

  • KEY : nom de l'attribut.
  • SUBSTRING : sous-chaîne de la valeur