过滤订阅中的消息

本页介绍了如何使用过滤条件创建 Pub/Sub 订阅。

当您通过过滤条件接收订阅消息时,您只会收到与过滤条件匹配的消息。Pub/Sub 服务会自动确认与过滤条件不匹配的消息。您可以按消息的属性过滤消息,但不能按消息中的数据过滤消息。

您可以为某个主题附加多个订阅,并且每个订阅可以使用不同的过滤条件。

例如,如果您有一个主题用于接收来自世界各地的新闻,则可以配置订阅,以过滤出仅在特定区域发布的新闻。对于此配置,您必须确保某个主题消息属性传达了新闻发布区域。

当您收到包含过滤条件的订阅的消息时,您无需为 Pub/Sub 自动确认的消息支付出站消息费用。您需要支付消息传送费用以及跳转相关存储费用。

使用过滤条件创建订阅

拉取和推送订阅可以包含过滤条件。所有订阅者都可以接收具有过滤条件的订阅的消息,包括使用 StreamingPull API 的订阅者。

您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API 创建具有过滤条件的订阅。

控制台

要创建具有过滤条件的拉取订阅,请按以下步骤操作:

  1. 在 Google Cloud 控制台中,前往订阅页面。

    转到“订阅”页面

  2. 点击创建订阅

  3. 输入订阅 ID

  4. 从下拉菜单中选择或创建一个主题。订阅将接收来自该主题的消息。

  5. 订阅过滤条件部分,输入过滤条件表达式

  6. 点击创建

要创建具有过滤条件的推送订阅,请按以下步骤操作:

  1. 在 Google Cloud 控制台中,前往订阅页面。

    转到“订阅”页面

  2. 点击创建订阅

  3. 输入订阅 ID

  4. 从下拉菜单中选择或创建一个主题。订阅将接收来自该主题的消息。

  5. 传送类型部分中,点击推送

  6. 端点网址字段中,输入推送端点的网址。

  7. 订阅过滤条件部分,输入过滤条件表达式

  8. 点击创建

gcloud

要创建具有过滤条件的拉取订阅,请使用带有 --message-filter 标志的 gcloud pubsub subscriptions create 命令:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --message-filter='FILTER'

请替换以下内容:

  • SUBSCRIPTION_ID:要创建的订阅的 ID
  • TOPIC_ID:要附加到订阅的主题的 ID
  • FILTER过滤语法中的表达式

要创建具有过滤条件的推送订阅,请使用带有 --push-endpoint--message-filter 标志的 gcloud pubsub subscriptions create 命令:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --push-endpoint=PUSH_ENDPOINT \
  --message-filter='FILTER'

请替换以下内容:

  • SUBSCRIPTION_ID:要创建的订阅的 ID
  • TOPIC_ID:要附加到订阅的主题的 ID
  • PUSH_ENDPOINT:推送订阅者在其上运行的服务器的网址
  • FILTER过滤语法中的表达式

REST

要创建具有过滤条件的订阅,请使用 projects.subscriptions.create 方法。

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

请替换以下内容:

  • PROJECT_ID:要在其中创建订阅的项目的 ID
  • SUBSCRIPTION_ID:要创建的订阅的 ID

要创建具有过滤条件的拉取订阅,请在请求正文中指定过滤条件:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "filter": "FILTER"
}

请替换以下内容:

  • PROJECT_ID:包含主题的项目的 ID
  • TOPIC_ID:要附加到订阅的主题的 ID
  • FILTER过滤语法中的表达式

要创建具有过滤条件的推送订阅,请在请求正文中指定推送端点和过滤条件:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "pushConfig": {
    "pushEndpoint": "PUSH_ENDPOINT"
  },
  "filter": "FILTER"
}

请替换以下内容:

  • PROJECT_ID:包含主题的项目的 ID
  • TOPIC_ID:要附加到订阅的主题的 ID
  • PUSH_ENDPOINT:推送订阅者在其上运行的服务器的网址
  • FILTER过滤语法中的表达式

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string topic_id,
   std::string subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, std::move(subscription_id))
          .FullName());
  request.set_topic(
      pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.set_filter(R"""(attributes.is-even = "false")""");
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithFilteringSample
{
    public Subscription CreateSubscriptionWithFiltering(string projectId, string topicId, string subscriptionId, string filter)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        Subscription subscription = null;

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            Filter = filter
        };

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createWithFilter(w io.Writer, projectID, subID, filter string, topic *pubsub.Topic) error {
	// Receive messages with attribute key "author" and value "unknown".
	// projectID := "my-project-id"
	// subID := "my-sub"
	// filter := "attributes.author=\"unknown\""
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:  topic,
		Filter: filter,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with filter: %v\n", sub)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithFiltering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String filter = "attributes.author=\"unknown\"";

    createSubscriptionWithFilteringExample(projectId, topicId, subscriptionId, filter);
  }

  public static void createSubscriptionWithFilteringExample(
      String projectId, String topicId, String subscriptionId, String filter) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Receive messages with attribute key "author" and value "unknown".
                  .setFilter(filter)
                  .build());

      System.out.println(
          "Created a subscription with filtering enabled: " + subscription.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId,
  subscriptionNameOrId,
  filterString
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`
  );
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  filterString: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`
  );
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $filter  The Pub/Sub subscription filter.
 */
function create_subscription_with_filter(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $filter
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);

    $subscription->create(['filter' => $filter]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
    printf('Subscription info: %s' . PHP_EOL, json_encode($subscription->info()));
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={"name": subscription_path, "topic": topic_path, "filter": filter}
    )
    print(f"Created subscription with filtering enabled: {subscription}")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

require "google/cloud/pubsub"

# Shows how to create a new subscription with filter for a given topic
class PubsubCreateSubscriptionWithFilter
  def create_subscription_with_filter project_id:, topic_id:, subscription_id:, filter:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, filter: filter
    puts "Created subscription with filtering enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    filter = "attributes.author=\"unknown\""
    PubsubCreateSubscriptionWithFilter.new.create_subscription_with_filter project_id: project_id,
                                                                           topic_id: topic_id,
                                                                           subscription_id: subscription_id,
                                                                           filter: filter
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithFilter.run
end

过滤条件的最大长度为 256 个字节。过滤条件是订阅的不可变属性。创建订阅后,您无法更新订阅来修改过滤条件。

过滤器对积压工作量指标的影响

如需监控您刚刚创建的订阅,请参阅使用过滤条件监控订阅

如果您启用了过滤功能,积压指标将仅包含与过滤条件匹配的消息中的数据。以下是积压指标列表:

  • subscription/backlog_bytes
  • subscription/unacked_bytes_by_region
  • subscription/num_undelivered_messages
  • subscription/num_unacked_messages_by_region
  • subscription/oldest_unacked_message_age
  • subscription/oldest_unacked_message_age_by_region
  • topic/unacked_bytes_by_region
  • topic/num_unacked_messages_by_region
  • topic/oldest_unacked_messages_age_by_region

如需详细了解这些指标,请参阅 Pub/Sub 指标列表。

更新订阅的过滤条件

您无法更新现有订阅的过滤条件。请改为按照此替代方案操作。

  1. 为要更改过滤条件的订阅截取快照。

    如需详细了解如何使用控制台创建快照,请参阅创建快照

  2. 使用新过滤条件创建新订阅。

    如需详细了解如何创建包含过滤条件的订阅,请参阅创建包含过滤条件的订阅

  3. 在 Google Cloud 控制台中,前往 Pub/Sub 订阅页面。

    前往“订阅”页面

  4. 点击您刚刚创建的订阅。

  5. 在订阅详情页面中,点击重放消息

  6. 对于跳转,点击到快照

  7. 选择您在第 1 步中为原始订阅创建的快照,然后点击还原

    在转换过程中,您不会丢失任何消息。

  8. 将所有订阅者更改为使用新订阅。

完成此过程后,您可以继续删除原始订阅。

用于创建过滤条件的语法

要过滤消息,请编写对属性运行的表达式。您可以编写与属性的键或值匹配的表达式。attributes 标识符用于选择消息中的属性。

例如,下表中的过滤条件选择了 name 属性:

过滤 说明
attributes:name 具有 name 属性的消息
NOT attributes:name 不含 name 属性的消息
attributes.name = "com" 具有 name 属性和 com 值的消息
attributes.name != "com" 不含 name 属性和 com 值的消息
hasPrefix(attributes.name, "co") 具有 name 属性且值以 co 开头的消息
NOT hasPrefix(attributes.name, "co") 不含 name 属性且以 co 开头的值的消息

过滤条件表达式的比较运算符

您可以使用以下比较运算符过滤属性:

  • :
  • =
  • !=

: 运算符匹配属性列表中的键。

attributes:KEY

等式运算符匹配键和值。该值必须是字符串字面量。

attributes.KEY = "VALUE"

具有等式运算符的表达式必须以键开头,等式运算符必须比较键和值。

  • 有效:过滤条件比较键和值

    attributes.name = "com"
    
  • 无效:过滤条件的左侧是一个值

    "com" = attributes.name
    
  • 无效:过滤条件比较两个键

    attributes.name = attributes.website
    

键和值区分大小写,并且必须与属性完全匹配。如果键包含的字符不是连字符、下划线或字母数字字符,请使用字符串字面量。

attributes."iana.org/language_tag" = "en"

如需在过滤条件中使用反斜杠、引号和非打印字符,请转义字符串字面量中的字符。您还可以在字符串字面量中使用 Unicode、十六进制和八进制转义序列。

  • 有效:过滤条件对字符串字面量中的字符进行转义

    attributes:"\u307F\u3093\u306A"
    
  • 无效:过滤条件转义不含字符串字面量的字符

    attributes:\u307F\u3093\u306A
    

过滤条件表达式的布尔运算符

您可以在过滤器中使用布尔运算符 ANDNOTOR。运算符必须使用大写字母。例如,以下过滤条件适用于具有 iana.org/language_tag 属性但没有 name 属性和 com 值的消息。

attributes:"iana.org/language_tag" AND NOT attributes.name = "com"

NOT 运算符具有最高优先级。要组合 ANDOR 运算符,请使用括号和完整表达式。

  • 有效AND 和带括号的 OR 运算符

    attributes:"iana.org/language_tag" AND (attributes.name = "net" OR attributes.name = "org")
    
  • 无效:不带括号的 ANDOR 运算符

    attributes:"iana.org/language_tag" AND attributes.name = "net" OR attributes.name = "org"
    
  • 无效ANDOR 运算符结合使用不完整的表达式

    attributes.name = "com" AND ("net" OR "org")
    

您也可以使用一元取反运算符代替 NOT 运算符。

attributes.name = "com" AND -attributes:"iana.org/language_tag"

过滤条件表达式的函数

您可以使用 hasPrefix 函数过滤出值以子字符串开头的属性。hasPrefix 是过滤器中唯一受支持的函数。

虽然 hasPrefix 函数支持前缀匹配,但不支持常规正则表达式。

hasPrefix(attributes.KEY, "SUBSTRING")

替换以下内容:

  • KEY:属性的名称。
  • SUBSTRING:值的子字符串