メッセージの順序指定

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

このページでは、メッセージを順に受信する方法について説明します。

メッセージの受信の詳細については、サブスクライバーの概要をご覧ください。

メッセージを順に受信

メッセージに同じ順序指定キーがあり、同じリージョンに存在している場合、メッセージの順序指定を有効にすると、Pub/Sub サービスで受信する順序でメッセージを受信できます。

Pub/Sub は各メッセージを少なくとも 1 回配信するため、Pub/Sub サービスでメッセージが再度配信される可能性があります。メッセージを順序どおりに受信し、Pub/Sub サービスで順序指定キーのあるメッセージが再配信される場合、Pub/Sub では、同じ順序指定キーのある後続のメッセージも再配信することによって順序が維持されます。Pub/Sub サービスでは、こうしたメッセージが最初に受信した順序で再配信されます。

Pub/Sub サービスが順序指定キーを持つメッセージを再配信する場合は、確認済みメッセージを含む、同じ順序キーを持つメッセージもすべて再配信します。サブスクリプションで順序指定とデッドレター トピックの両方のメッセージが有効になっている場合は、Pub/Sub はデッドレター トピックにメッセージをベスト エフォートに基づき転送するため、これが正しくないこともあります。再配信されたメッセージを受信したら、これらのメッセージを再確認する必要があります。前のメッセージが確認応答されずに、後続のメッセージが送信される保証はありません。

順序指定キーとメッセージ配信

同じ順序指定キーを持つメッセージは、順序どおりに配信されます。

順序指定キーが異なるメッセージは順序どおりに配信されるとは限らず、パブリッシュ時間にも依存しません。

メッセージを確認できないと、他の順序指定キーのメッセージが配信できなくなる場合があります。この問題は、サーバーが予期せず再起動した場合や、トラフィックの変化が原因で使用されている一連のサーバーが変更された場合に発生することがあります。このようなイベント間で順序を保持するには、新しいサーバーからのメッセージを配信する前に、古いサーバーにパブリッシュされたすべてのメッセージに対し、順序指定キーが異なる場合でも確認応答する必要があります。 すべてのメッセージには適切なタイミングで確実に確認応答できない場合、サブスクリプションにデッドレター トピックを追加することを検討してください。デッドレター トピックに書き込まれるとき、メッセージの順序が保持されないことがあります。

メッセージの順序指定の有効化

メッセージを順に受信するには、メッセージを受信するサブスクリプションのメッセージ順序指定プロパティを設定します。メッセージを順に受信すると、レイテンシが増加する可能性があります。

Google Cloud コンソール、Google Cloud CLI、または Pub/Sub API を使用してサブスクリプションを作成するときに、メッセージの順序指定プロパティを設定できます。

Console

メッセージの順序指定プロパティがあるサブスクリプションを作成する手順は次のとおりです。

  1. Google Cloud コンソールで、[サブスクリプション] ページに移動します。

    [サブスクリプション] ページに移動

  2. [サブスクリプションを作成] をクリックします。

  3. [サブスクリプション ID] を入力します。

  4. メッセージを受信するトピックを選択します。

  5. [メッセージの順序指定] セクションで、[順序指定キーを使用してメッセージの順序を指定する] を選択します。

  6. [作成] をクリックします。

gcloud

メッセージの順序指定プロパティがあるサブスクリプションを作成するには、gcloud pubsub subscriptions create コマンドと --enable-message-ordering フラグを使用します。

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

SUBSCRIPTION_ID をサブスクリプションの ID に置き換えます。

リクエストが成功すると、コマンドラインに確認メッセージが表示されます。

Created subscription [SUBSCRIPTION_ID].

REST

メッセージの順序指定プロパティがあるサブスクリプションを作成するには、次のような PUT リクエストを送信します。

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

以下を置き換えます。

  • PROJECT_ID: トピックがあるプロジェクトのプロジェクト ID
  • SUBSCRIPTION_ID: サブスクリプション ID

リクエスト本文には、次のように指定します。

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

TOPIC_ID は、サブスクリプションに接続するトピックの ID に置き換えます。

リクエストが成功した場合のレスポンスは、JSON 形式のサブスクリプションになります。

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SubscriptionAdminClient client, std::string const& project_id,
   std::string const& topic_id, std::string const& subscription_id) {
  auto sub = client.CreateSubscription(
      pubsub::Topic(project_id, std::move(topic_id)),
      pubsub::Subscription(project_id, std::move(subscription_id)),
      pubsub::SubscriptionBuilder{}.enable_message_ordering(true));
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string subscriptionId, string topicId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createWithOrdering(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                 topic,
		AckDeadline:           20 * time.Second,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %v", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`
  );

  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic        = pubsub.topic topic_id
subscription = topic.subscribe subscription_id,
                               message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

メッセージの順序設定プロパティを設定すると、Pub/Sub サービスでメッセージを受信する順序と同じ順序指定キーがあるメッセージが配信されます。たとえば、パブリッシャーが同じ順序指定キーがある 2 つのメッセージを送信する場合、Pub/Sub サービスでは最も古いメッセージが最初に配信されます。

次のステップ