Filtrar mensagens de uma assinatura

Esta página explica como criar assinaturas do Pub/Sub com filtros.

Ao receber mensagens de uma assinatura com um filtro, você recebe apenas as mensagens que correspondem ao filtro. O serviço Pub/Sub reconhece automaticamente as mensagens que não correspondem ao filtro. É possível filtrar mensagens pelos atributos, mas não pelos dados da mensagem.

É possível ter várias assinaturas anexadas a um tópico, e cada assinatura pode ter um filtro diferente.

Por exemplo, se você tiver um tópico que recebe notícias de diferentes partes do mundo, configure uma assinatura para filtrar notícias publicadas apenas em uma região específica. Para essa configuração, é necessário garantir que um dos atributos da mensagem do tópico transmita a região da publicação de notícias.

Quando você recebe mensagens de uma assinatura com um filtro, não há taxas de mensagem de saída para as mensagens que o Pub/Sub reconhece automaticamente. Sujeito a taxas de entrega de mensagens e taxas de armazenamento relacionadas à busca para essas mensagens.

Criar uma assinatura com um filtro

As assinaturas de pull e push podem ter filtros. Todos os assinantes podem receber mensagens de assinaturas com filtros, incluindo assinantes que usam a API StreamingPull.

É possível criar uma assinatura com um filtro usando o console do Google Cloud, a Google Cloud CLI, as bibliotecas de cliente ou a API Pub/Sub.

Console

Para criar uma assinatura de pull com um filtro, siga estas etapas:

  1. No console do Google Cloud, acesse a página Assinaturas.

    Acessar a página "Assinaturas"

  2. Clique em Criar assinatura.

  3. Insira o ID da assinatura.

  4. Escolha ou crie um tópico no menu suspenso. A assinatura recebe mensagens do tópico.

  5. Na seção Filtro de assinatura, insira a expressão de filtro.

  6. Clique em Criar.

Para criar uma assinatura de push com um filtro, siga estas etapas:

  1. No console do Google Cloud, acesse a página Assinaturas.

    Acessar a página "Assinaturas"

  2. Clique em Criar assinatura.

  3. Insira o ID da assinatura.

  4. Escolha ou crie um tópico no menu suspenso. A assinatura recebe mensagens do tópico.

  5. Na seção Tipo de entrega, clique em Push.

  6. No campo URL do endpoint, Insira o URL do endpoint de push.

  7. Na seção Filtro de assinatura, insira a expressão de filtro.

  8. Clique em Criar.

gcloud

Para criar uma assinatura de pull com um filtro, use o comando gcloud pubsub subscriptions create com a sinalização --message-filter:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --message-filter='FILTER'

Substitua:

  • SUBSCRIPTION_ID: o ID da assinatura a ser criada
  • TOPIC_ID: o ID do tópico a ser anexado à assinatura
  • FILTER: uma expressão na sintaxe de filtragem

Para criar uma assinatura de push com um filtro, use o comando gcloud pubsub subscriptions create com as sinalizações --push-endpoint e --message-filter:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --push-endpoint=PUSH_ENDPOINT \
  --message-filter='FILTER'

Substitua:

  • SUBSCRIPTION_ID: o ID da assinatura a ser criada
  • TOPIC_ID: o ID do tópico a ser anexado à assinatura
  • PUSH_ENDPOINT: o URL do servidor em que o assinante de push é executado
  • FILTER: uma expressão na sintaxe de filtragem

REST

Para criar uma assinatura com um filtro, use o método projects.subscriptions.create.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Substitua:

  • PROJECT_ID: o ID do projeto para criar a assinatura
  • SUBSCRIPTION_ID: o ID da assinatura a ser criada

Para criar uma assinatura de pull com um filtro, especifique o filtro no corpo da solicitação:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "filter": "FILTER"
}

Substitua:

  • PROJECT_ID: o ID do projeto com o tópico
  • TOPIC_ID: o ID do tópico a ser anexado à assinatura
  • FILTER: uma expressão na sintaxe de filtragem

Para criar uma assinatura de push com um filtro, especifique o endpoint de push e o filtro no corpo da solicitação:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "pushConfig": {
    "pushEndpoint": "PUSH_ENDPOINT"
  },
  "filter": "FILTER"
}

Substitua:

  • PROJECT_ID: o ID do projeto com o tópico
  • TOPIC_ID: o ID do tópico a ser anexado à assinatura
  • PUSH_ENDPOINT: o URL do servidor em que o assinante de push é executado
  • FILTER: uma expressão na sintaxe de filtragem

C++

Antes de tentar esse exemplo, siga as instruções de configuração do C++ em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string topic_id,
   std::string subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, std::move(subscription_id))
          .FullName());
  request.set_topic(
      pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.set_filter(R"""(attributes.is-even = "false")""");
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Antes de tentar esse exemplo, siga as instruções de configuração do C# em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithFilteringSample
{
    public Subscription CreateSubscriptionWithFiltering(string projectId, string topicId, string subscriptionId, string filter)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        Subscription subscription = null;

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            Filter = filter
        };

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

Antes de tentar esse exemplo, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createWithFilter(w io.Writer, projectID, subID, filter string, topic *pubsub.Topic) error {
	// Receive messages with attribute key "author" and value "unknown".
	// projectID := "my-project-id"
	// subID := "my-sub"
	// filter := "attributes.author=\"unknown\""
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:  topic,
		Filter: filter,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with filter: %v\n", sub)
	return nil
}

Java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithFiltering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String filter = "attributes.author=\"unknown\"";

    createSubscriptionWithFilteringExample(projectId, topicId, subscriptionId, filter);
  }

  public static void createSubscriptionWithFilteringExample(
      String projectId, String topicId, String subscriptionId, String filter) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Receive messages with attribute key "author" and value "unknown".
                  .setFilter(filter)
                  .build());

      System.out.println(
          "Created a subscription with filtering enabled: " + subscription.getAllFields());
    }
  }
}

Node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId,
  subscriptionNameOrId,
  filterString
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`
  );
}

Node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  filterString: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`
  );
}

PHP

Antes de tentar esse exemplo, siga as instruções de configuração do PHP em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $filter  The Pub/Sub subscription filter.
 */
function create_subscription_with_filter(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $filter
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);

    $subscription->create(['filter' => $filter]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
    printf('Subscription info: %s' . PHP_EOL, json_encode($subscription->info()));
}

Python

Antes de tentar esse exemplo, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={"name": subscription_path, "topic": topic_path, "filter": filter}
    )
    print(f"Created subscription with filtering enabled: {subscription}")

Ruby

Antes de tentar esse exemplo, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

require "google/cloud/pubsub"

# Shows how to create a new subscription with filter for a given topic
class PubsubCreateSubscriptionWithFilter
  def create_subscription_with_filter project_id:, topic_id:, subscription_id:, filter:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, filter: filter
    puts "Created subscription with filtering enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    filter = "attributes.author=\"unknown\""
    PubsubCreateSubscriptionWithFilter.new.create_subscription_with_filter project_id: project_id,
                                                                           topic_id: topic_id,
                                                                           subscription_id: subscription_id,
                                                                           filter: filter
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithFilter.run
end

O comprimento máximo de um filtro é 256 bytes. O filtro é uma propriedade imutável de uma assinatura. Depois de criar uma assinatura, não é possível atualizar a assinatura para modificar o filtro.

Como os filtros afetam as métricas de pendências

Para monitorar a assinatura que você acabou de criar, consulte Monitorar assinaturas com filtros.

Se a filtragem estiver ativada, as métricas de backlog vão incluir apenas dados de mensagens que correspondem ao filtro. Confira a seguir uma lista das métricas de pendências:

  • subscription/backlog_bytes
  • subscription/unacked_bytes_by_region
  • subscription/num_undelivered_messages
  • subscription/num_unacked_messages_by_region
  • subscription/oldest_unacked_message_age
  • subscription/oldest_unacked_message_age_by_region
  • topic/unacked_bytes_by_region
  • topic/num_unacked_messages_by_region
  • topic/oldest_unacked_messages_age_by_region

Para saber mais sobre essas métricas, consulte a lista de métricas do Pub/Sub.

Atualizar o filtro de uma assinatura

Não é possível atualizar o filtro de uma assinatura. Em vez disso, siga esta solução alternativa.

  1. Tire um snapshot da assinatura em que você quer mudar o filtro.

    Para saber mais sobre como fazer um snapshot usando o console, consulte Criar um snapshot.

  2. Crie uma nova assinatura com o novo filtro.

    Para saber mais sobre como criar uma assinatura com um filtro, consulte Criar uma assinatura com um filtro.

  3. No console do Google Cloud, acesse a página Assinaturas do Pub/Sub.

    Acessar "Assinaturas"

  4. Clique na assinatura que você acabou de criar.

  5. Na página de detalhes da assinatura, clique em Repetir mensagens.

  6. Em Procurar, clique em Para um snapshot.

  7. Selecione o snapshot que você criou para a assinatura original na etapa 1 e clique em Procurar.

    Você não perde nenhuma mensagem durante a transição.

  8. Mude os assinantes para usar a nova assinatura.

Depois de concluir esse procedimento, você pode excluir a assinatura original.

Sintaxe para criar um filtro

Para filtrar mensagens, escreva uma expressão que opere em atributos. É possível escrever uma expressão que corresponda à chave ou ao valor dos atributos. O identificador attributes seleciona os atributos na mensagem.

Por exemplo, os filtros na tabela a seguir selecionam o atributo name:

Filtro Descrição
attributes:name Mensagens com o atributo name
NOT attributes:name Mensagens sem o atributo name
attributes.name = "com" Mensagens com o atributo name e o valor de com
attributes.name != "com" Mensagens sem o atributo name e o valor de com
hasPrefix(attributes.name, "co") Mensagens com o atributo name e um valor que começa com co
NOT hasPrefix(attributes.name, "co") Mensagens sem o atributo name e um valor que começa com co

Operadores de comparação para a expressão de filtro

Filtre os atributos com os seguintes operadores de comparação:

  • :
  • =
  • !=

O operador : corresponde a uma chave em uma lista de atributos.

attributes:KEY

Os operadores de igualdade correspondem a chaves e valores. O valor precisa ser um literal de string.

attributes.KEY = "VALUE"

Uma expressão com um operador de igualdade precisa começar com uma chave, e ele precisa comparar uma chave e um valor.

  • Válido: o filtro compara uma chave e um valor.

    attributes.name = "com"
    
  • Inválido: o lado esquerdo do filtro é um valor.

    "com" = attributes.name
    
  • Inválido: o filtro compara duas chaves

    attributes.name = attributes.website
    

A chave e o valor diferenciam maiúsculas de minúsculas e precisam corresponder exatamente ao atributo. Se uma chave contiver caracteres diferentes de hifens, sublinhados ou caracteres alfanuméricos, use um literal de string.

attributes."iana.org/language_tag" = "en"

Para usar barras invertidas, aspas e caracteres não imprimíveis em um filtro, faça o escape dos caracteres em um literal de string. Também é possível usar sequências Unicode, hexadecimais e octais em um literal de string.

  • Válido: filtra os caracteres de escape dentro de um literal de string

    attributes:"\u307F\u3093\u306A"
    
  • Inválido: o filtro faz o escape de caracteres sem um literal de string

    attributes:\u307F\u3093\u306A
    

Operadores booleanos para a expressão do filtro

É possível usar os operadores booleanos AND, NOT e OR em um filtro. Os operadores precisam estar em letras maiúsculas. Por exemplo, o filtro a seguir é para mensagens com o atributo iana.org/language_tag, mas sem o atributo name e o valor com.

attributes:"iana.org/language_tag" AND NOT attributes.name = "com"

O operador NOT tem a precedência mais alta. Para combinar os operadores AND e OR, use parênteses e expressões completas.

  • Válido: operadores AND e OR com parênteses

    attributes:"iana.org/language_tag" AND (attributes.name = "net" OR attributes.name = "org")
    
  • Inválido: operadores AND e OR sem parênteses

    attributes:"iana.org/language_tag" AND attributes.name = "net" OR attributes.name = "org"
    
  • Inválido: os operadores AND e OR combinam expressões incompletas.

    attributes.name = "com" AND ("net" OR "org")
    

Você também pode usar o operador menos unário em vez do operador NOT.

attributes.name = "com" AND -attributes:"iana.org/language_tag"

Funções para a expressão de filtro

É possível usar a função hasPrefix para filtrar atributos com valores que começam com uma substring. hasPrefix é a única função com suporte em um filtro.

Embora a correspondência de prefixo seja compatível com a função hasPrefix, as expressões regulares gerais não são.

hasPrefix(attributes.KEY, "SUBSTRING")

Substitua:

  • KEY: o nome do atributo
  • SUBSTRING: uma substring do valor