Filtrar mensajes de una suscripción

En esta página se explica cómo crear suscripciones de Pub/Sub con filtros.

Cuando recibes mensajes de una suscripción con un filtro, solo recibes los mensajes que coinciden con el filtro. El servicio Pub/Sub confirma automáticamente los mensajes que no coinciden con el filtro. Puedes filtrar los mensajes por sus atributos, pero no por los datos que contengan.

Puedes tener varias suscripciones asociadas a un tema y cada suscripción puede tener un filtro diferente.

Por ejemplo, si tienes un tema que recibe noticias de diferentes partes del mundo, puedes configurar una suscripción para filtrar las noticias que se publiquen solo en una región específica. Para esta configuración, debe asegurarse de que uno de los atributos del mensaje del tema indique la región de publicación de las noticias.

Cuando recibes mensajes de una suscripción con un filtro, no se te aplican tarifas de mensajes salientes por los mensajes que Pub/Sub confirma automáticamente. Se te cobrarán tarifas por la entrega de mensajes y por el almacenamiento relacionado con la función de búsqueda (seek) de estos mensajes.

Crear una suscripción con un filtro

Las suscripciones de extracción y de inserción pueden tener filtros. Todos los suscriptores pueden recibir mensajes de suscripciones con filtros, incluidos los que usan la API StreamingPull.

Puedes crear una suscripción con un filtro mediante la Google Cloud consola, la CLI de Google Cloud, las bibliotecas de cliente o la API de Pub/Sub.

Consola

Para crear una suscripción de extracción con un filtro, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Suscripciones.

    Ir a la página Suscripciones

  2. Haz clic en Crear suscripción.

  3. Introduce el ID de suscripción.

  4. Elige o crea un tema en el menú desplegable. La suscripción recibe mensajes del tema.

  5. En la sección Filtro de suscripción, introduce la expresión de filtro.

  6. Haz clic en Crear.

Para crear una suscripción push con un filtro, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Suscripciones.

    Ir a la página Suscripciones

  2. Haz clic en Crear suscripción.

  3. Introduce el ID de suscripción.

  4. Elige o crea un tema en el menú desplegable. La suscripción recibe mensajes del tema.

  5. En la sección Tipo de entrega, haga clic en Push.

  6. En el campo Endpoint URL (URL de acceso), introduce la URL del endpoint de envío.

  7. En la sección Filtro de suscripción, introduce la expresión de filtro.

  8. Haz clic en Crear.

gcloud

Para crear una suscripción de extracción con un filtro, usa el comando gcloud pubsub subscriptions create con la marca --message-filter:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --message-filter='FILTER'

Haz los cambios siguientes:

  • SUBSCRIPTION_ID: ID de la suscripción que se va a crear.
  • TOPIC_ID: el ID del tema al que se adjuntará la suscripción
  • FILTER: una expresión en la sintaxis de filtrado

Para crear una suscripción push con un filtro, usa el comando gcloud pubsub subscriptions create con las marcas --push-endpoint y --message-filter:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --push-endpoint=PUSH_ENDPOINT \
  --message-filter='FILTER'

Haz los cambios siguientes:

  • SUBSCRIPTION_ID: ID de la suscripción que se va a crear.
  • TOPIC_ID: el ID del tema al que se adjuntará la suscripción
  • PUSH_ENDPOINT: la URL del servidor en el que se ejecuta el suscriptor de push.
  • FILTER: una expresión en la sintaxis de filtrado

REST

Para crear una suscripción con un filtro, usa el método projects.subscriptions.create.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto en el que se va a crear la suscripción
  • SUBSCRIPTION_ID: ID de la suscripción que se va a crear.

Para crear una suscripción de extracción con un filtro, especifica el filtro en el cuerpo de la solicitud:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "filter": "FILTER"
}

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto que contiene el tema
  • TOPIC_ID: el ID del tema al que se adjuntará la suscripción
  • FILTER: una expresión en la sintaxis de filtrado

Para crear una suscripción push con un filtro, especifica el endpoint push y el filtro en el cuerpo de la solicitud:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "pushConfig": {
    "pushEndpoint": "PUSH_ENDPOINT"
  },
  "filter": "FILTER"
}

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto que contiene el tema
  • TOPIC_ID: el ID del tema al que se adjuntará la suscripción
  • PUSH_ENDPOINT: la URL del servidor en el que se ejecuta el suscriptor de push.
  • FILTER: una expresión en la sintaxis de filtrado

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string topic_id,
   std::string subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, std::move(subscription_id))
          .FullName());
  request.set_topic(
      pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.set_filter(R"""(attributes.is-even = "false")""");
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración de C# que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C# de Pub/Sub.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithFilteringSample
{
    public Subscription CreateSubscriptionWithFiltering(string projectId, string topicId, string subscriptionId, string filter)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        Subscription subscription = null;

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            Filter = filter
        };

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createWithFilter(w io.Writer, projectID, topic, subscription, filter string) error {
	// Receive messages with attribute key "author" and value "unknown".
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	// filter := "attributes.author=\"unknown\""
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:   subscription,
		Topic:  topic,
		Filter: filter,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with filter: %v\n", sub)
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithFiltering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String filter = "attributes.author=\"unknown\"";

    createSubscriptionWithFilteringExample(projectId, topicId, subscriptionId, filter);
  }

  public static void createSubscriptionWithFilteringExample(
      String projectId, String topicId, String subscriptionId, String filter) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Receive messages with attribute key "author" and value "unknown".
                  .setFilter(filter)
                  .build());

      System.out.println(
          "Created a subscription with filtering enabled: " + subscription.getAllFields());
    }
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId,
  subscriptionNameOrId,
  filterString,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`,
  );
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  filterString: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`,
  );
}

PHP

Antes de probar este ejemplo, sigue las instrucciones de configuración de PHP que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $filter  The Pub/Sub subscription filter.
 */
function create_subscription_with_filter(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $filter
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);

    $subscription->create(['filter' => $filter]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
    printf('Subscription info: %s' . PHP_EOL, json_encode($subscription->info()));
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={"name": subscription_path, "topic": topic_path, "filter": filter}
    )
    print(f"Created subscription with filtering enabled: {subscription}")

Ruby

En el siguiente ejemplo se usa la biblioteca de cliente de Ruby Pub/Sub v3. Si sigues usando la biblioteca v2, consulta la guía de migración a la versión 3. Para ver una lista de ejemplos de código de Ruby v2, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Ruby de Pub/Sub.

# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

pubsub = Google::Cloud::PubSub.new project_id: project_id
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  filter: filter

puts "Created subscription with filtering enabled: #{subscription_id}"

La longitud máxima de un filtro es de 256 bytes. El filtro es una propiedad inmutable de una suscripción. Una vez que hayas creado una suscripción, no podrás actualizarla para modificar el filtro.

Cómo afectan los filtros a las métricas de la cartera de proyectos

Para monitorizar la suscripción que acabas de crear, consulta Monitorizar suscripciones con filtros.

Si el filtrado está habilitado, las métricas de la cartera de pedidos solo incluyen datos de los mensajes que coinciden con el filtro. A continuación se muestra una lista de las métricas de la cartera de pedidos:

  • subscription/backlog_bytes
  • subscription/unacked_bytes_by_region
  • subscription/num_undelivered_messages
  • subscription/num_unacked_messages_by_region
  • subscription/oldest_unacked_message_age
  • subscription/oldest_unacked_message_age_by_region
  • topic/unacked_bytes_by_region
  • topic/num_unacked_messages_by_region
  • topic/oldest_unacked_messages_age_by_region

Para obtener más información sobre estas métricas, consulta la lista de métricas de Pub/Sub.

Actualizar el filtro de una suscripción

No puedes actualizar el filtro de una suscripción que ya tengas. En su lugar, sigue esta solución alternativa.

  1. Haz una captura de pantalla de la suscripción cuyo filtro quieras cambiar.

    Para obtener más información sobre cómo hacer una captura con la consola, consulta Crear una captura.

  2. Crea una suscripción con el nuevo filtro.

    Para obtener más información sobre cómo crear una suscripción con un filtro, consulta el artículo Crear una suscripción con un filtro.

  3. En la Google Cloud consola, ve a la página Suscripciones de Pub/Sub.

    Ir a suscripciones

  4. Haz clic en la suscripción que acabas de crear.

  5. En la página de detalles de la suscripción, haz clic en Reproducir mensajes.

  6. En Buscar, haz clic en A una captura.

  7. Selecciona la captura que has creado para la suscripción original en el paso 1 y, a continuación, haz clic en Buscar.

    No perderás ningún mensaje durante la transición.

  8. Cambia los suscriptores para que usen la nueva suscripción.

Una vez que hayas completado este procedimiento, puedes eliminar la suscripción original.

Sintaxis para crear un filtro

Para filtrar mensajes, escribe una expresión que opere en atributos. Puedes escribir una expresión que coincida con la clave o el valor de los atributos. El identificador attributes selecciona los atributos del mensaje.

Por ejemplo, los filtros de la siguiente tabla seleccionan el atributo name:

Filtro Descripción
attributes:name Mensajes con el atributo name
NOT attributes:name Mensajes sin el atributo name
attributes.name = "com" Mensajes con el atributo name y el valor com
attributes.name != "com" Mensajes sin el atributo name y el valor com
hasPrefix(attributes.name, "co") Mensajes con el atributo name y un valor que empieza por co
NOT hasPrefix(attributes.name, "co") Mensajes sin el atributo name y un valor que empiece por co

Operadores de comparación de la expresión de filtro

Puede filtrar atributos con los siguientes operadores de comparación:

  • :
  • =
  • !=

El operador : coincide con una clave de una lista de atributos.

attributes:KEY

Los operadores de igualdad coinciden con las claves y los valores. El valor debe ser un literal de cadena.

attributes.KEY = "VALUE"

Una expresión con un operador de igualdad debe empezar por una clave, y el operador de igualdad debe comparar una clave y un valor.

  • Válido: el filtro compara una clave y un valor.

    attributes.name = "com"
    
  • No válido: el lado izquierdo del filtro es un valor

    "com" = attributes.name
    
  • No válido: el filtro compara dos claves

    attributes.name = attributes.website
    

La clave y el valor distinguen entre mayúsculas y minúsculas, y deben coincidir exactamente con el atributo. Si una clave contiene caracteres que no sean guiones, guiones bajos o caracteres alfanuméricos, utiliza un literal de cadena.

attributes."iana.org/language_tag" = "en"

Para usar barras invertidas, comillas y caracteres que no se pueden imprimir en un filtro, aplique caracteres de escape a los caracteres dentro de un literal de cadena. También puedes usar secuencias de escape Unicode, hexadecimales y octales en un literal de cadena.

  • Válido: el filtro escapa los caracteres de un literal de cadena.

    attributes:"\u307F\u3093\u306A"
    
  • No válido: el filtro escapa caracteres sin un literal de cadena

    attributes:\u307F\u3093\u306A
    

Operadores booleanos de la expresión de filtro

Puedes usar los operadores booleanos AND, NOT y OR en un filtro. Los operadores deben estar escritos en mayúsculas. Por ejemplo, el siguiente filtro es para mensajes con el atributo iana.org/language_tag, pero sin el atributo name y el valor com.

attributes:"iana.org/language_tag" AND NOT attributes.name = "com"

El operador NOT tiene la precedencia más alta. Para combinar los operadores AND y OR, usa paréntesis y expresiones completas.

  • Válido: operadores AND y OR con paréntesis

    attributes:"iana.org/language_tag" AND (attributes.name = "net" OR attributes.name = "org")
    
  • No válido: operadores AND y OR sin paréntesis

    attributes:"iana.org/language_tag" AND attributes.name = "net" OR attributes.name = "org"
    
  • No válido: los operadores AND y OR combinan expresiones incompletas

    attributes.name = "com" AND ("net" OR "org")
    

También puedes usar el operador menos unario en lugar del operador NOT.

attributes.name = "com" AND -attributes:"iana.org/language_tag"

Funciones de la expresión de filtro

Puede usar la función hasPrefix para filtrar atributos con valores que empiecen por una subcadena. hasPrefix es la única función admitida en un filtro.

Aunque la función hasPrefix admite la coincidencia de prefijos, no admite expresiones regulares generales.

hasPrefix(attributes.KEY, "SUBSTRING")

Haz los cambios siguientes:

  • KEY: nombre del atributo
  • SUBSTRING: una subcadena del valor