Cómo filtrar mensajes de una suscripción

En esta página, se explica cómo crear suscripciones de Pub/Sub con filtros.

Cuando recibes mensajes de una suscripción con un filtro, solo recibes los mensajes que coinciden con el filtro. El servicio Pub/Sub confirma de forma automática los mensajes que no coinciden con el filtro. Puedes filtrar los mensajes según sus atributos, pero no según los datos que contienen.

Puedes tener varias suscripciones vinculadas a un tema, y cada una puede tener un filtro diferente.

Por ejemplo, si tienes un tema que recibe noticias de diferentes partes del mundo, puedes configurar una suscripción para filtrar las noticias que se publican solo desde una región específica. Para esta configuración, debes asegurarte de que uno de los atributos de los mensajes a temas que representa la región donde se publican las noticias.

Cuando recibes mensajes de una suscripción con un filtro, no incurres tarifas de mensajes salientes para los mensajes que Pub/Sub confirma la recepción de forma automática. Se cobran tarifas por la entrega de mensajes y el almacenamiento relacionado con las búsquedas para estos mensajes.

Crea una suscripción con un filtro

Las suscripciones de extracción y envío pueden tener filtros. Todos los suscriptores pueden recibir mensajes de suscripciones con filtros, incluidos los que usan la API de StreamingPull.

Puedes crear una suscripción con un filtro mediante la consola de Google Cloud, Google Cloud CLI, las bibliotecas cliente o la API de Pub/Sub.

Console

Para crear una suscripción de extracción con un filtro, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Suscripciones.

    Ir a la página Suscripciones

  2. Haz clic en Crear suscripción.

  3. Ingresa el ID de suscripción.

  4. Elige o crea un tema desde el menú desplegable. La suscripción recibe mensajes del tema.

  5. En la sección Filtro de suscripción, ingresa la expresión de filtros.

  6. Haga clic en Crear.

Para crear una suscripción de envío con un filtro, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Suscripciones.

    Ir a la página Suscripciones

  2. Haz clic en Crear suscripción.

  3. Ingresa el ID de suscripción.

  4. Elige o crea un tema desde el menú desplegable. La suscripción recibe mensajes del tema.

  5. En la sección Tipo de entrega, haz clic en Enviar.

  6. En el campo URL de extremo, ingresa la URL del extremo de envío.

  7. En la sección Filtro de suscripción, ingresa la expresión de filtros.

  8. Haga clic en Crear.

gcloud

Para crear una suscripción de extracción con un filtro, usa el comando gcloud pubsub subscriptions create con la marca --message-filter:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --message-filter='FILTER'

Reemplaza lo siguiente:

  • SUBSCRIPTION_ID: Es el ID de la suscripción que se creará.
  • TOPIC_ID: Es el ID del tema para adjuntar a la suscripción.
  • FILTER: Es una expresión en la sintaxis de filtrado.

Para crear una suscripción de envío con un filtro, usa el comando gcloud pubsub subscriptions create con las marcas --push-endpoint y --message-filter:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --push-endpoint=PUSH_ENDPOINT \
  --message-filter='FILTER'

Reemplaza lo siguiente:

  • SUBSCRIPTION_ID: Es el ID de la suscripción que se creará.
  • TOPIC_ID: Es el ID del tema para adjuntar a la suscripción.
  • PUSH_ENDPOINT: La URL del servidor en la que se ejecuta el suscriptor de envío
  • FILTER: Es una expresión en la sintaxis de filtrado.

REST

Para crear una suscripción con un filtro, usa el método projects.subscriptions.create.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto para crear la suscripción.
  • SUBSCRIPTION_ID: Es el ID de la suscripción que se creará.

Para crear una suscripción de extracción con un filtro, especifica el filtro en el cuerpo de la solicitud:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "filter": "FILTER"
}

Reemplaza lo siguiente:

  • PROJECT_ID: el ID del proyecto con el tema
  • TOPIC_ID: Es el ID del tema para adjuntar a la suscripción.
  • FILTER: Es una expresión en la sintaxis de filtrado.

Para crear una suscripción de envío con un filtro, especifica el extremo de envío y el filtro en el cuerpo de la solicitud:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "pushConfig": {
    "pushEndpoint": "PUSH_ENDPOINT"
  },
  "filter": "FILTER"
}

Reemplaza lo siguiente:

  • PROJECT_ID: el ID del proyecto con el tema
  • TOPIC_ID: Es el ID del tema para adjuntar a la suscripción.
  • PUSH_ENDPOINT: La URL del servidor en la que se ejecuta el suscriptor de envío
  • FILTER: Es una expresión en la sintaxis de filtrado.

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string topic_id,
   std::string subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, std::move(subscription_id))
          .FullName());
  request.set_topic(
      pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.set_filter(R"""(attributes.is-even = "false")""");
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithFilteringSample
{
    public Subscription CreateSubscriptionWithFiltering(string projectId, string topicId, string subscriptionId, string filter)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        Subscription subscription = null;

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            Filter = filter
        };

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createWithFilter(w io.Writer, projectID, subID, filter string, topic *pubsub.Topic) error {
	// Receive messages with attribute key "author" and value "unknown".
	// projectID := "my-project-id"
	// subID := "my-sub"
	// filter := "attributes.author=\"unknown\""
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:  topic,
		Filter: filter,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with filter: %v\n", sub)
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithFiltering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String filter = "attributes.author=\"unknown\"";

    createSubscriptionWithFilteringExample(projectId, topicId, subscriptionId, filter);
  }

  public static void createSubscriptionWithFilteringExample(
      String projectId, String topicId, String subscriptionId, String filter) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Receive messages with attribute key "author" and value "unknown".
                  .setFilter(filter)
                  .build());

      System.out.println(
          "Created a subscription with filtering enabled: " + subscription.getAllFields());
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId,
  subscriptionNameOrId,
  filterString
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`
  );
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  filterString: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`
  );
}

PHP

Antes de probar esta muestra, sigue las instrucciones de configuración de PHP en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $filter  The Pub/Sub subscription filter.
 */
function create_subscription_with_filter(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $filter
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);

    $subscription->create(['filter' => $filter]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
    printf('Subscription info: %s' . PHP_EOL, json_encode($subscription->info()));
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={"name": subscription_path, "topic": topic_path, "filter": filter}
    )
    print(f"Created subscription with filtering enabled: {subscription}")

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

require "google/cloud/pubsub"

# Shows how to create a new subscription with filter for a given topic
class PubsubCreateSubscriptionWithFilter
  def create_subscription_with_filter project_id:, topic_id:, subscription_id:, filter:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, filter: filter
    puts "Created subscription with filtering enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    filter = "attributes.author=\"unknown\""
    PubsubCreateSubscriptionWithFilter.new.create_subscription_with_filter project_id: project_id,
                                                                           topic_id: topic_id,
                                                                           subscription_id: subscription_id,
                                                                           filter: filter
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithFilter.run
end

La longitud máxima de un filtro es de 256 bytes. El filtro es es una propiedad inmutable de una suscripción. Después de crear una suscripción, no puedes actualizar la suscripción para modificar el filtro.

Cómo afectan los filtros a las métricas del backlog

Para supervisar la suscripción que acabas de crear, consulta Supervisa las suscripciones con filtros.

Si el filtrado está habilitado, las métricas de tareas pendientes solo incluyen los datos de los mensajes que coinciden con el filtro. La siguiente es una lista de las métricas de tareas pendientes:

  • subscription/backlog_bytes
  • subscription/unacked_bytes_by_region
  • subscription/num_undelivered_messages
  • subscription/num_unacked_messages_by_region
  • subscription/oldest_unacked_message_age
  • subscription/oldest_unacked_message_age_by_region
  • topic/unacked_bytes_by_region
  • topic/num_unacked_messages_by_region
  • topic/oldest_unacked_messages_age_by_region

Para obtener más información sobre estas métricas, consulta la lista de Métricas de Pub/Sub.

Actualiza el filtro de una suscripción

No puedes actualizar el filtro de una suscripción existente. En su lugar, sigue esta solución alternativa.

  1. Crea una instantánea de la suscripción para la que deseas cambiar el filtro.

    Para obtener más información sobre cómo tomar una instantánea con la consola, consulta Crea una instantánea.

  2. Crea una nueva suscripción con el filtro nuevo.

    Para obtener más información sobre cómo crear una suscripción con un filtro, consulta Cómo crear una suscripción con un filtro.

  3. En la consola de Google Cloud, ve a la página de suscripciones a Pub/Sub.

    Ir a las suscripciones

  4. Haz clic en la suscripción que acabas de crear.

  5. En la página de detalles de la suscripción, haz clic en Repetir mensajes.

  6. En Saltar, haz clic en A una instantánea.

  7. Selecciona la instantánea que creaste para la suscripción original. en el paso 1 y, luego, en Buscar.

    No perderás ningún mensaje durante la transición.

  8. Cambia los suscriptores para que usen la suscripción nueva.

Después de completar este procedimiento, puedes borrar la suscripción original.

Sintaxis para crear un filtro

Para filtrar mensajes, escribe una expresión que funcione en atributos. Puedes escribir una expresión que coincida con la clave o con el valor de los atributos. El identificador attributes selecciona los atributos del mensaje.

Por ejemplo, los filtros de la siguiente tabla seleccionan el atributo name:

Filtro Descripción
attributes:name Mensajes con el atributo name
NOT attributes:name Mensajes sin el atributo name
attributes.name = "com" Mensajes con el atributo name y con el valor de com
attributes.name != "com" Mensajes sin el atributo name y con el valor de com
hasPrefix(attributes.name, "co") Mensajes con el atributo name y con un valor que comienza con co
NOT hasPrefix(attributes.name, "co") Mensajes sin el atributo name y un valor que comienza con co

Operadores de comparación para la expresión de filtro

Puedes filtrar atributos con los siguientes operadores de comparación:

  • :
  • =
  • !=

El operador : coincide con una clave en una lista de atributos.

attributes:KEY

Los operadores de igualdad coinciden con las claves y los valores. El valor debe ser un literal de string.

attributes.KEY = "VALUE"

Una expresión con un operador de igualdad debe comenzar con una clave, y el operador de igualdad debe comparar una clave y un valor.

  • Válido: El filtro compara una clave y un valor.

    attributes.name = "com"
    
  • No válido: El lado izquierdo del filtro es un valor.

    "com" = attributes.name
    
  • No válido: El filtro compara dos claves.

    attributes.name = attributes.website
    

La clave y el valor distinguen entre mayúsculas y minúsculas, y deben coincidir exactamente con el atributo. Si una clave contiene caracteres distintos de guiones, guiones bajos o caracteres alfanuméricos, usa un literal de string.

attributes."iana.org/language_tag" = "en"

Para usar barras invertidas, comillas y caracteres no imprimibles en un filtro, escapa los caracteres dentro de un literal de cadena. También puedes usar secuencias de escape Unicode, hexadecimales y octales dentro de un literal de string.

  • Válido: Filtra los caracteres de escape dentro de un literal de string.

    attributes:"\u307F\u3093\u306A"
    
  • No válido: Filtra los caracteres de escape sin un literal de string.

    attributes:\u307F\u3093\u306A
    

Operadores booleanos para la expresión de filtro

Puedes usar los operadores booleanos AND, NOT y OR en un filtro. Los operadores debe estar en mayúsculas. Por ejemplo, el siguiente filtro es para mensajes con el atributo iana.org/language_tag, pero sin el atributo name y el valor com.

attributes:"iana.org/language_tag" AND NOT attributes.name = "com"

El operador NOT tiene la prioridad más alta. Para combinar los operadores AND y OR, usa paréntesis y expresiones completas.

  • Válido: operadores AND y OR con paréntesis

    attributes:"iana.org/language_tag" AND (attributes.name = "net" OR attributes.name = "org")
    
  • No válido: Los operadores AND y OR sin paréntesis

    attributes:"iana.org/language_tag" AND attributes.name = "net" OR attributes.name = "org"
    
  • No válido: Los operadores AND y OR combinan expresiones incompletas

    attributes.name = "com" AND ("net" OR "org")
    

También puedes usar el operador menos en lugar del operador NOT.

attributes.name = "com" AND -attributes:"iana.org/language_tag"

Funciones para la expresión de filtro

Puedes usar la función hasPrefix para filtrar los atributos con valores que comienzan con una substring. hasPrefix es la única función compatible en un filtro.

Si bien la concordancia de prefijos es compatible con la función hasPrefix, no se admiten las expresiones regulares generales.

hasPrefix(attributes.KEY, "SUBSTRING")

Reemplaza lo siguiente:

  • KEY: Es el nombre del atributo.
  • SUBSTRING: Es una substring del valor.