为消息排序

本页面介绍了如何按顺序接收消息。

如需详细了解如何接收消息,请参阅订阅者概览

按顺序接收消息

如果消息带有相同的排序键并且位于同一地区,则您可以启用消息排序并按 Pub/Sub 服务接收这些消息的顺序来接收消息。

Pub/Sub 会为每条消息至少传送一次,因此 Pub/Sub 服务可能会重新传送消息。如果您按顺序接收消息,并且 Pub/Sub 服务重新传送带有排序键的消息,则 Pub/Sub 也会重新传送带有相同排序键的后续消息,从而保持顺序。Pub/Sub 服务会按消息最初接收的顺序重新传送这些消息。

当 Pub/Sub 服务重新传送带有排序键的消息时,Pub/Sub 服务也会重复传送带有相同排序键的所有后续消息,包括已确认的消息。再次确认这些消息。

启用消息排序

要按顺序接收消息,请在从中接收消息的订阅上设置消息排序属性。按顺序接收消息可能会增加延迟时间。

您可以在使用 Cloud Console、gcloud 命令行工具或 Pub/Sub API 创建订阅时设置消息排序属性。

控制台

要使用消息排序属性创建订阅,请执行以下操作:

  1. 在 Cloud Console 中,转到订阅页面。

    转到“订阅”页面

  2. 点击创建订阅

  3. 输入订阅 ID

  4. 选择要接收消息的主题。

  5. 消息排序部分,选择使用排序键对消息排序

  6. 点击创建

gcloud

要使用消息排序属性创建订阅,请使用 gcloud pubsub subscriptions create 命令和 --enable-message-ordering 标志:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

SUBSCRIPTION_ID 替换为订阅的 ID。

如果请求成功,命令行会显示一条确认消息:

Created subscription [SUBSCRIPTION_ID].

REST

要使用消息排序属性创建订阅,请发送如下所示的 PUT 请求:

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

替换以下内容:

  • PROJECT_ID:包含主题的项目的 ID
  • SUBSCRIPTION_ID:订阅的 ID

在请求正文中,指定以下内容:

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

TOPIC_ID 替换为要附加到订阅的主题的 ID。

如果请求成功,则响应为 JSON 格式的订阅:

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = google::cloud::pubsub;
[](pubsub::SubscriptionAdminClient client, std::string const& project_id,
   std::string const& topic_id, std::string const& subscription_id) {
  auto sub = client.CreateSubscription(
      pubsub::Topic(project_id, std::move(topic_id)),
      pubsub::Subscription(project_id, std::move(subscription_id)),
      pubsub::SubscriptionBuilder{}.enable_message_ordering(true));
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::runtime_error(sub.status().message());

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string subscriptionId, string topicId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createWithOrdering(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// Message ordering can only be set when creating a subscription.
	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                 topic,
		AckDeadline:           20 * time.Second,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %v", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicName = 'YOUR_TOPIC_NAME';
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering() {
  // Creates a new subscription
  await pubSubClient.topic(topicName).createSubscription(subscriptionName, {
    enableMessageOrdering: true,
  });
  console.log(
    `Created subscription ${subscriptionName} with ordering enabled.`
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.'
  );
}

createSubscriptionWithOrdering().catch(console.error);

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# topic_name        = "Your Pubsub topic name"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new

topic        = pubsub.topic topic_name
subscription = topic.subscribe subscription_name,
                               message_ordering: true

puts "Pull subscription #{subscription_name} created with message ordering."

设置消息排序属性后,Pub/Sub 服务会按照接收消息的顺序来传送带有相同排序键的消息。例如,如果发布者发送两条带有相同排序键的消息,则 Pub/Sub 服务会先传递最早的消息。