メッセージの順序指定

メッセージの順序指定は Pub/Sub の機能の 1 つで、サブスクライバー クライアントで、パブリッシャー クライアントによってパブリッシュされた順序でメッセージを受信できます。

たとえば、あるリージョンのパブリッシャー クライアントがメッセージ 1、2、3 を順番にパブリッシュするとします。メッセージの順序指定では、サブスクライバー クライアントはパブリッシュされたメッセージを同じ順序で受信します。順序どおりに配信するには、パブリッシャー クライアントが同じリージョンにメッセージをパブリッシュする必要があります。

メッセージの順序指定は、データベース変更キャプチャ、ユーザー セッション トラッキング、ストリーミング アプリケーションなど、イベントの時系列を保持する必要があるストリーミング アプリケーションなどで役立ちます。

このページでは、メッセージの順序指定のコンセプトと、メッセージを順序どおりに受信するようにサブスクライバー クライアントを設定する方法について説明します。メッセージの順序指定用のパブリッシャー クライアントを構成するには、順序指定キーを使用してメッセージをパブリッシュするをご覧ください。

メッセージの順序指定の概要

Pub/Sub の順序は、次の要因によって決まります。

  • 順序指定キー: Pub/Sub メッセージ メタデータで使用される文字列であり、メッセージを順序付ける必要があるエンティティを表します。順序指定キーの長さは最大 1 KB です。あるリージョン内で順序付きの一連のメッセージを受信するには、同じリージョン内で同じ順序指定キーを持つすべてのメッセージをパブリッシュする必要があります。順序指定キーの例としては、お客様 ID やデータベース内の行の主キーなどがあります。

    各順序指定キーのパブリッシュ スループットは 1 MBps に制限されています。トピックのすべての順序指定キーのスループットは、パブリッシュ リージョンで使用可能な割り当てに制限されます。この上限は、数 Gbps 単位に引き上げることができます。

    順序指定キーは、パーティションよりもはるかに高いカーディナリティを持つことが想定されるため、パーティション ベースのメッセージング システムのパーティションに相当するわけではありません。

  • メッセージの順序指定を有効にする: これはサブスクリプションの設定です。サブスクリプションでメッセージの順序指定が有効になっている場合、サブスクライバー クライアントは、同じリージョンにパブリッシュされたメッセージを、サービスで受信された順序と同じ順序指定キーを使用して受信します。サブスクリプションでこの設定を有効にする必要があります。

    たとえば、2 つのサブスクリプション A と B が同じトピック T にアタッチされているとします。サブスクリプション A でメッセージの順序指定を有効にして構成し、サブスクリプション B でメッセージの順序指定を有効にせずに構成しています。このアーキテクチャでは、サブスクリプション A と B の両方がトピック T から同じ一連のメッセージを受信します。同じリージョン内に順序指定キーを使用してメッセージをパブリッシュすると、サブスクリプション A はパブリッシュされた順序でメッセージを受信します。一方、サブスクリプション B は順序指定なしでメッセージを受信します。

一般に、ソリューションでパブリッシャー クライアントが順序付きのメッセージと順序なしメッセージの両方を送信する必要がある場合は、順序付きのメッセージ用と順序なしメッセージ用の別々のトピックを作成します。

順序指定メッセージを使用する場合の考慮事項

次のリストは、Pub/Sub の順序付きメッセージの動作に関する重要な情報を示しています。

  • キー内順序指定: 同じ順序指定キーを使用してパブリッシュされたメッセージは、順序どおりに受信されます。順序指定キー A に対して、メッセージ 1、2、3 をパブリッシュするとします。順序指定を有効にすると、1 は 2 より前に配信され、2 は 3 より前に配信されることが想定されます。

  • キー間の順序指定: 異なる順序指定キーを使用してパブリッシュされたメッセージは、順序どおりに受信されません。A と B の順序指定キーがあるとします。順序指定キー A の場合、メッセージ 1 と 2 が順番にパブリッシュされます。順序指定キー B の場合、メッセージ 3 と 4 が順番にパブリッシュされます。ただし、メッセージ 1 は、メッセージ 4 の前または後に到着する可能性があります。

  • メッセージの再配信: Pub/Sub は各メッセージを少なくとも 1 回配信するため、Pub/Sub サービスがメッセージを再配信する場合があります。メッセージが再配信されると、そのキーに対する後続のすべてのメッセージ(確認済みメッセージも含む)の再配信がトリガーされます。サブスクライバー クライアントが特定の順序指定キーのメッセージ 1、2、3 を受信するとします。確認応答期限が切れた、または Pub/Sub でベスト エフォートの確認応答が保持されなかったために、メッセージ 2 が再配信される場合、メッセージ 3 も再配信されます。サブスクリプションでメッセージの順序指定とデッドレター トピックの両方が有効になっている場合、Pub/Sub はベスト エフォート方式でデッドレター トピックにメッセージを転送するため、正確でないこともあります。

  • 確認応答の遅延とデッドレター トピック: 特定の順序指定キーに対する未確認メッセージにより、他の順序指定キーのメッセージの配信が遅れる場合があります(特にサーバーの再起動やトラフィックの変更時)。このようなイベント間で順序を維持するには、すべてのメッセージにタイムリーな確認応答を行います。タイムリーな確認応答ができない場合は、デッドレター トピックを使用して、メッセージを無期限に保持しないようにすることを検討してください。メッセージがデッドレター トピックに書き込まれる場合、順序が保持されないことがあります。

  • メッセージ アフィニティ(streamingPull クライアント): 同じキーのメッセージは通常、同じ streamingPull サブスクライバー クライアントに配信されます。特定のサブスクライバー クライアントに対する順序指定キーに対するメッセージが未処理の場合、アフィニティが想定されます。未処理のメッセージがない場合、アフィニティは負荷分散またはクライアントの切断のためにシフトすることがあります。

    アフィニティが変化する可能性があっても処理がスムーズに行われるようにするには、すべてのクライアントで特定の順序指定キーのメッセージを処理できるように streamingPull アプリケーションを設計することが重要です。

  • Dataflow との統合: Pub/Sub を使用して Dataflow を構成する場合は、サブスクリプションのメッセージの順序指定を有効にしないでください。Dataflow には、ウィンドウ処理オペレーションの一部としてすべてのメッセージの順序付けを保証する、合計メッセージの順序指定に関する独自のメカニズムがあります。この順序指定方法は、Pub/Sub の順序指定キーベースの方法とは異なります。Dataflow で順序指定キーを使用すると、パイプラインのパフォーマンスが低下する可能性があります。

  • 自動スケーリング: Pub/Sub の順序指定配信は、数十億個の順序指定キーにスケーリングされます。順序指定キーの数が多いと、同じ順序指定キーを持つすべてのメッセージに順序指定が適用されるため、サブスクライバーへの並列配信数が多くなります。

順序指定配信にはいくつかのトレードオフがあります。順序なし配信と比較すると、順序指定配信ではパブリッシュの可用性がわずかに低下し、エンドツーエンドのメッセージ配信のレイテンシが増加する可能性があります。順序付けられた配信の場合、フェイルオーバーでは、メッセージが正しい順序で書き込まれて読み取られるように調整が必要になります。

メッセージの順序指定の使用方法の詳細については、次のベスト プラクティスのトピックをご覧ください。

メッセージの順序指定に関するサブスクライバー クライアントの動作

サブスクライバー クライアントは、特定のリージョンにパブリッシュされた順序でメッセージを受信します。Pub/Sub は、pull サブスクリプションと push サブスクリプションに接続されているサブスクライバー クライアントなど、さまざまな受信方法をサポートしています。クライアント ライブラリでは streamingPull が使用されます(PHP を除く)。

これらのサブスクリプション タイプの詳細については、サブスクリプション タイプの選択をご覧ください。

次のセクションでは、サブスクライバー クライアントの種類ごとに、メッセージを順に受信することの意味について説明します。

StreamingPull サブスクライバー クライアント

streamPull でクライアント ライブラリを使用する場合は、サブスクライバー クライアントがメッセージを受信するたびに実行されるユーザー コールバックを指定する必要があります。クライアント ライブラリでは、指定された順序指定キーについて、正しい順序でメッセージを完成させるためにコールバックが実行されます。そのコールバック内でメッセージが確認応答されると、メッセージに対するすべての計算が順番に行われます。ただし、ユーザーがメッセージに対する他の非同期処理をスケジュールする場合、サブスクライバー クライアントは非同期処理が順番に行われることを確認する必要があります。1 つの方法は、順番に処理されるローカル作業キューにメッセージを追加する方法です。

サブスクライバー クライアントを pull する

pull サブスクリプションに接続されているサブスクライバー クライアントの場合、Pub/Sub メッセージの順序指定では以下がサポートされます。

  • PullResponse の順序指定キーのすべてのメッセージが、リスト内の適切な順序で配置されています。

  • 順序指定キーに対して一度に未処理であるメッセージのバッチは 1 つだけです。

一度に未処理にできるメッセージのバッチは 1 つだけという要件は、順序付けられた配信を維持するために必要ですが、これは、Pub/Subサービスがサブスクライバーの pull リクエストのために送信する応答の成功または遅延を保証できないためです。

push サブスクライバー クライアント

push の制限は pull の制限よりも厳しくなります。 push サブスクリプションの場合、Pub/Sub は、順序指定キーごとに一度に 1 つの未処理のメッセージのみをサポートします。各メッセージは、個別のリクエストとして push エンドポイントに送信されます。したがって、リクエストを並列で送信すると、サブスクライバーを同時に pull するために同じ順序指定キーに対してメッセージの複数のバッチを配信する場合と同じ問題があります。push サブスクリプションは、同じ順序指定キーを使用してメッセージが頻繁にパブリッシュされるトピックや、レイテンシが極めて重要なトピックには適していない場合があります。

サブスクライバー クライアントをエクスポートする

エクスポート サブスクリプションでは、順序付きのメッセージがサポートされます。BigQuery サブスクリプションの場合、同じ順序指定キーを持つメッセージは BigQuery テーブルに順番に書き込まれます。Cloud Storage サブスクリプションの場合、同じ順序指定キーを持つメッセージがすべて同じファイルに書き込まれるとは限りません。同じファイル内の場合、順序指定キーのメッセージが順序どおりに処理されます。複数のファイルに分散している場合、順序指定キーの後続のメッセージは、以前のメッセージがあるファイルの名前のタイムスタンプよりも前のタイムスタンプを持つファイルに表示される可能性があります。

メッセージの順序指定を有効にする

メッセージを順に受信するには、メッセージを受信するサブスクリプションのメッセージ順序指定プロパティを設定します。メッセージを順に受信すると、レイテンシが増加する可能性があります。 サブスクリプションの作成後にメッセージの順序指定プロパティを変更することはできません。

Google Cloud コンソール、Google Cloud CLI、または Pub/Sub API を使用してサブスクリプションを作成するときに、メッセージの順序指定プロパティを設定できます。

Console

メッセージの順序指定プロパティがあるサブスクリプションを作成する手順は次のとおりです。

  1. Google Cloud コンソールで、[サブスクリプション] ページに移動します。

サブスクリプションに移動

  1. [サブスクリプションを作成] をクリックします。

  2. [サブスクリプション ID] を入力します。

  3. メッセージを受信するトピックを選択します。

  4. [メッセージの順序指定] セクションで、[順序指定キーを使用してメッセージの順序を指定する] を選択します。

  5. [作成] をクリックします。

gcloud

メッセージの順序指定プロパティがあるサブスクリプションを作成するには、gcloud pubsub subscriptions create コマンドと --enable-message-ordering フラグを使用します。

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

SUBSCRIPTION_ID をサブスクリプションの ID に置き換えます。

リクエストが成功すると、コマンドラインに確認メッセージが表示されます。

Created subscription [SUBSCRIPTION_ID].

REST

メッセージの順序指定プロパティがあるサブスクリプションを作成するには、次のような PUT リクエストを送信します。

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

次のように置き換えます。

  • PROJECT_ID: トピックがあるプロジェクトのプロジェクト ID
  • SUBSCRIPTION_ID: サブスクリプション ID

リクエスト本文には、次のように指定します。

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

TOPIC_ID は、サブスクリプションに接続するトピックの ID に置き換えます。

リクエストが成功した場合のレスポンスは、JSON 形式のサブスクリプションになります。

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SubscriptionAdminClient client, std::string const& project_id,
   std::string const& topic_id, std::string const& subscription_id) {
  auto sub = client.CreateSubscription(
      pubsub::Topic(project_id, topic_id),
      pubsub::Subscription(project_id, subscription_id),
      pubsub::SubscriptionBuilder{}.enable_message_ordering(true));
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SubscriptionAdminClient client, std::string const& project_id,
   std::string const& topic_id, std::string const& subscription_id) {
  auto sub = client.CreateSubscription(
      pubsub::Topic(project_id, topic_id),
      pubsub::Subscription(project_id, subscription_id),
      pubsub::SubscriptionBuilder{}.enable_message_ordering(true));
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createWithOrdering(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                 topic,
		AckDeadline:           20 * time.Second,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

topic        = pubsub.topic topic_id
subscription = topic.subscribe subscription_id,
                               message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

次のステップ