サブスクリプションからのメッセージをフィルタする

このページでは、フィルタを使用して Pub/Sub サブスクリプションを作成する方法について説明します。

フィルタを含むサブスクリプションからメッセージを受信すると、フィルタに一致するメッセージのみを受信します。Pub/Sub サービスは、フィルタと一致しないメッセージに、自動的に確認応答します。メッセージは属性でフィルタできますが、メッセージ内のデータではフィルタできません。

1 つのトピックに複数のサブスクリプションをアタッチできます。また、各サブスクリプションには異なるフィルタを設定できます。

たとえば、トピックが世界の異なるリージョンのニュースを受け取る場合は、特定のリージョンからのみ公開されるニュースでフィルタリングするようにサブスクリプションを構成できます。この構成では、トピックのメッセージ属性の一つがニュースをパブリッシュするリージョンに対応している状態にする必要があります。

フィルタを含むサブスクリプションからメッセージを受信した場合、Pub/Sub が自動的に確認応答するメッセージに対して送信メッセージ料金は発生しません。これらのメッセージに対しては、メッセージ配信料金とシーク関連のストレージ料金が発生します。

フィルタを含むサブスクリプションを作成する

pull サブスクリプションと push サブスクリプションにはフィルタを含めることができます。すべてのサブスクライバーは、StreamingPull API を使用するサブスクライバーを含む、フィルタを含むサブスクリプションからメッセージを受信できます。

Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API を使用して、フィルタを含むサブスクリプションを作成できます。

Console

フィルタを含む pull サブスクリプションを作成するには、次の手順に従います。

  1. Google Cloud コンソールで、[サブスクリプション] ページに移動します。

    [サブスクリプション] ページに移動

  2. [サブスクリプションを作成] をクリックします。

  3. [サブスクリプション ID] を入力します。

  4. プルダウン メニューからトピックを選択するか、作成します。サブスクリプションがトピックからメッセージを受信します。

  5. [サブスクリプション フィルタ] セクションで、フィルタ式を入力します。

  6. [作成] をクリックします。

フィルタを含む push サブスクリプションを作成するには、次の手順に従います。

  1. Google Cloud コンソールで、[サブスクリプション] ページに移動します。

    [サブスクリプション] ページに移動

  2. [サブスクリプションを作成] をクリックします。

  3. [サブスクリプション ID] を入力します。

  4. プルダウン メニューからトピックを選択するか、作成します。サブスクリプションがトピックからメッセージを受信します。

  5. [配信タイプ] で [Push] をクリックします。

  6. [エンドポイント URL] 欄に、push エンドポイントの URL を入力します。

  7. [サブスクリプション フィルタ] セクションで、フィルタ式を入力します。

  8. [作成] をクリックします。

gcloud

フィルタを含む pull サブスクリプションを作成するには、--message-filter フラグを指定して gcloud pubsub subscriptions create コマンドを使用します。

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --message-filter='FILTER'

以下を置き換えます。

  • SUBSCRIPTION_ID: 作成するサブスクリプションの ID
  • TOPIC_ID: サブスクリプションに関連付けるトピックの ID
  • FILTER: フィルタリング構文の式

フィルタを含む push サブスクリプションを作成するには、--push-endpoint フラグと --message-filter フラグを指定して、gcloud pubsub subscriptions create コマンドを使用します。

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --push-endpoint=PUSH_ENDPOINT \
  --message-filter='FILTER'

以下を置き換えます。

  • SUBSCRIPTION_ID: 作成するサブスクリプションの ID
  • TOPIC_ID: サブスクリプションに関連付けるトピックの ID
  • PUSH_ENDPOINT: push サブスクライバーが実行されるサーバーの URL
  • FILTER: フィルタリング構文の式

REST

フィルタを含むサブスクリプションを作成するには、projects.subscriptions.create メソッドを使用します。

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

以下を置き換えます。

  • PROJECT_ID: サブスクリプションを作成するプロジェクトのプロジェクト ID
  • SUBSCRIPTION_ID: 作成するサブスクリプションの ID

フィルタを含む pull サブスクリプションを作成するには、リクエスト本文にフィルタを指定します。

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "filter": "FILTER"
}

以下を置き換えます。

  • PROJECT_ID: トピックが含まれるプロジェクトのプロジェクト ID
  • TOPIC_ID: サブスクリプションに関連付けるトピックの ID
  • FILTER: フィルタリング構文の式

フィルタを含む push サブスクリプションを作成するには、リクエスト本文に push エンドポイントとフィルタを指定します。

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "pushConfig": {
    "pushEndpoint": "PUSH_ENDPOINT"
  },
  "filter": "FILTER"
}

以下を置き換えます。

  • PROJECT_ID: トピックが含まれるプロジェクトのプロジェクト ID
  • TOPIC_ID: サブスクリプションに関連付けるトピックの ID
  • PUSH_ENDPOINT: push サブスクライバーが実行されるサーバーの URL
  • FILTER: フィルタリング構文の式

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string topic_id,
   std::string subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, std::move(subscription_id))
          .FullName());
  request.set_topic(
      pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.set_filter(R"""(attributes.is-even = "false")""");
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API リファレンス ドキュメントをご覧ください。


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithFilteringSample
{
    public Subscription CreateSubscriptionWithFiltering(string projectId, string topicId, string subscriptionId, string filter)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        Subscription subscription = null;

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            Filter = filter
        };

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createWithFilter(w io.Writer, projectID, subID, filter string, topic *pubsub.Topic) error {
	// Receive messages with attribute key "author" and value "unknown".
	// projectID := "my-project-id"
	// subID := "my-sub"
	// filter := "attributes.author=\"unknown\""
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:  topic,
		Filter: filter,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with filter: %v\n", sub)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithFiltering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    String filter = "attributes.author=\"unknown\"";

    createSubscriptionWithFilteringExample(projectId, topicId, subscriptionId, filter);
  }

  public static void createSubscriptionWithFilteringExample(
      String projectId, String topicId, String subscriptionId, String filter) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Receive messages with attribute key "author" and value "unknown".
                  .setFilter(filter)
                  .build());

      System.out.println(
          "Created a subscription with filtering enabled: " + subscription.getAllFields());
    }
  }
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId,
  subscriptionNameOrId,
  filterString
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`
  );
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const filterString = 'YOUR_FILTER_STRING';   // e.g. 'attributes.author="unknown"'

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithFilter(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  filterString: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      filter: filterString,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with filter ${filterString}.`
  );
}

PHP

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API リファレンス ドキュメントをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $filter  The Pub/Sub subscription filter.
 */
function create_subscription_with_filter(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $filter
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);

    $subscription->create(['filter' => $filter]);

    printf('Subscription created: %s' . PHP_EOL, $subscription->name());
    printf('Subscription info: %s' . PHP_EOL, json_encode($subscription->info()));
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"
# filter = "attributes.author=\"unknown\""

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={"name": subscription_path, "topic": topic_path, "filter": filter}
    )
    print(f"Created subscription with filtering enabled: {subscription}")

Ruby

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API リファレンス ドキュメントをご覧ください。

require "google/cloud/pubsub"

# Shows how to create a new subscription with filter for a given topic
class PubsubCreateSubscriptionWithFilter
  def create_subscription_with_filter project_id:, topic_id:, subscription_id:, filter:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, filter: filter
    puts "Created subscription with filtering enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    filter = "attributes.author=\"unknown\""
    PubsubCreateSubscriptionWithFilter.new.create_subscription_with_filter project_id: project_id,
                                                                           topic_id: topic_id,
                                                                           subscription_id: subscription_id,
                                                                           filter: filter
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithFilter.run
end

フィルタの最大長は 256 バイトです。フィルタはサブスクリプションの不変プロパティです。サブスクリプションを作成した後、サブスクリプションを更新してフィルタを変更することはできません。

フィルタがバックログ指標に与える影響

作成したサブスクリプションをモニタリングするには、フィルタを使用してサブスクリプションをモニタリングするをご覧ください。

フィルタが有効になっている場合、バックログ指標にはフィルタに一致するメッセージのデータのみが含まれます。バックログ指標のリストを次に示します。

  • subscription/backlog_bytes
  • subscription/unacked_bytes_by_region
  • subscription/num_undelivered_messages
  • subscription/num_unacked_messages_by_region
  • subscription/oldest_unacked_message_age
  • subscription/oldest_unacked_message_age_by_region
  • topic/unacked_bytes_by_region
  • topic/num_unacked_messages_by_region
  • topic/oldest_unacked_messages_age_by_region

これらの指標の詳細については、Pub/Sub 指標のリストをご覧ください。

サブスクリプションのフィルタを更新する

既存のサブスクリプションのフィルタを更新することはできません。代わりに、この回避策に沿ってご対応ください。

  1. フィルタを変更する定期購入のスナップショットを作成します。

    コンソールを使用してスナップショットを取得する方法の詳細については、スナップショットの作成を作成するをご覧ください。

  2. 新しいフィルタを使用して新しいサブスクリプションを作成します。

    フィルタを含むサブスクリプションの作成の詳細については、フィルタを含むサブスクリプションを作成するをご覧ください。

  3. Google Cloud コンソールで、[Pub/Sub サブスクリプション] ページに移動します。

    サブスクリプションに移動

  4. 作成したサブスクリプションをクリックします。

  5. [サブスクリプションの詳細] ページで、[メッセージの再生] をクリックします。

  6. シークするには、[スナップショットまで] をクリックします。

  7. 手順 1 で元のサブスクリプション用に作成したスナップショットを選択し、[シーク] をクリックします。

    移行中にメッセージが失われることはありません。

  8. 新しい定期購入を使用するように定期購入を変更します。

この手順が完了したら、元のサブスクリプションを削除できます。

フィルタを作成する構文

メッセージをフィルタするには、属性を操作するを記述します。属性のキーまたは値に一致する式を記述できます。attributes の ID により、メッセージ内の属性が選択されます。

たとえば、次の表のフィルタは name 属性を選択します。

フィルタ 説明
attributes:name name 属性を含むメッセージ
NOT attributes:name name 属性を含まないメッセージ
attributes.name = "com" name 属性と com の値を含むメッセージ
attributes.name != "com" name 属性と com の値を含まないメッセージ
hasPrefix(attributes.name, "co") name 属性と co で始まる値を含むメッセージ
NOT hasPrefix(attributes.name, "co") name 属性と、co で始まる値を含まないメッセージ

フィルタ式の比較演算子

次の比較演算子を使用して属性をフィルタできます。

  • :
  • =
  • !=

: 演算子は、属性のリスト内のキーを一致させます。

attributes:KEY

等価演算子はキーと値に一致させます。値は文字列リテラルにする必要があります。

attributes.KEY = "VALUE"

等価演算子を含む式はキーで始まり、等価演算子はキーと値を比較する必要があります。

  • 有効: フィルタでキーと値を比較している

    attributes.name = "com"
    
  • 無効: フィルタの左側が値である

    "com" = attributes.name
    
  • 無効: フィルタで 2 つのキーを比較している

    attributes.name = attributes.website
    

キーと値は大文字と小文字が区別され、属性と正確に一致している必要があります。キーにハイフン、アンダースコア、英数字以外の文字が含まれている場合は、文字列リテラルを使用します。

attributes."iana.org/language_tag" = "en"

フィルタでバックスラッシュ、引用符、非出力文字を使用するには、文字列リテラル内の文字をエスケープします。文字列リテラル内で Unicode、16 進数、8 進数のエスケープ シーケンスを使用することもできます。

  • 有効: フィルタで文字列リテラル内の文字をエスケープする

    attributes:"\u307F\u3093\u306A"
    
  • 無効: フィルタで文字列リテラルを使用せず文字をエスケープする

    attributes:\u307F\u3093\u306A
    

フィルタ式のブール演算子

フィルタでブール演算子 ANDNOTOR を使用できます。演算子は大文字にする必要があります。たとえば、次のフィルタは iana.org/language_tag 属性を含むが、name 属性と com 値を含まないメッセージ用のものです。

attributes:"iana.org/language_tag" AND NOT attributes.name = "com"

NOT 演算子が最も優先されます。AND 演算子と OR 演算子を組み合わせるには、かっこを使用して式を完成させます。

  • 有効: かっこを使用した AND 演算子と OR 演算子

    attributes:"iana.org/language_tag" AND (attributes.name = "net" OR attributes.name = "org")
    
  • 無効: かっこを使用しない AND 演算子と OR 演算子

    attributes:"iana.org/language_tag" AND attributes.name = "net" OR attributes.name = "org"
    
  • 無効: AND 演算子と OR 演算子による不完全な式の組み合わせ

    attributes.name = "com" AND ("net" OR "org")
    

NOT 演算子の代わりに単項マイナス演算子を使用できます。

attributes.name = "com" AND -attributes:"iana.org/language_tag"

フィルタ式の関数

hasPrefix 関数を使用すると、部分文字列で始まる値を持つ属性をフィルタできます。フィルタでサポートされている関数は hasPrefix のみです。

接頭辞の一致は hasPrefix 関数でサポートされていますが、一般的な正規表現はサポートされていません。

hasPrefix(attributes.KEY, "SUBSTRING")

次のように置き換えます。

  • KEY: 属性名
  • SUBSTRING: 値の部分文字列