对消息排序

Pub/Sub 提供可用性高、可扩缩的消息传送服务。具备这些特性的代价是不能保证订阅者接收消息的顺序。虽然不保证顺序可能听起来让人不太容易接受,但实际上很少有使用场景需要严格排序。

本文档概括介绍了消息顺序的实际意义及其折衷方案,以及讨论了在迁移到 Pub/Sub 时处理当前工作流中与顺序相关的消息传递的使用场景和技术。

什么是顺序?

从表面上看,消息按顺序排列是一个非常简单的概念。下图显示了消息在消息传送服务中流动的直观图示:

什么顺序?

发布者将某个主题的消息发送到 Pub/Sub 中。该消息然后通过订阅传送给订阅者。在存在单一同步发布者、单一同步订阅者和单一同步消息传送服务器(所有这些都在同步传输层上运行)的情况下,顺序概念看起来很简单:消息按发布者成功发布它们的时间排序(以绝对时间或次序来衡量)。

即使在这种简单的情况下,保证消息顺序也会对吞吐量产生严重限制。真正保证消息顺序的唯一方法是:消息传送服务一次将一条消息传送至订阅者,等待服务知道订阅者已经接收并处理了当前消息(通常通过订阅者发送给服务的确认获悉),然后再传送下一条消息。一次向订阅者传送一条消息的吞吐量不可扩展。服务可以改为采用这种方式:仅保证第一次传送任何消息时是按顺序进行的,允许随时尝试重新传送。这样允许一次向订阅者发送多条消息。但是,即使以这种方式放宽了排序限制,对于单一发布者/消息传送服务/订阅者以外的使用场景,“顺序”也没有多大意义。

“顺序”真的存在吗?

定义消息顺序可能非常复杂,具体取决于您的发布者和订阅者。首先,您可能有多个订阅者在处理单个订阅的消息:

多个订阅者

在这种情况下,即使消息有序地通过订阅传递,也不能保证订阅者处理消息的顺序。如果处理顺序很重要,那么订阅者需要通过某个 ACID 存储系统(如 Cloud FirestoreCloud SQL)进行协调。

类似地,同一主题有多个发布者可能会让保证顺序变得非常困难:

多个发布者

如何为不同发布者发布的消息分配顺序呢?要么发布者们必须自行协调,要么消息传送服务自身必须对每条传入消息附加一种顺序表示法。每条消息都需要包含顺序信息。顺序信息可以是一个时间戳(但它必须是所有服务器从同一来源获得的时间戳,以避免时钟偏移问题)或序列号(从具有 ACID 保证的单个来源获取)。其他保证消息顺序的消息传送系统需要相应设置,将系统有效地限制为通过单个服务器向单个订阅者发送消息的多个发布者。

扩展的消息传送服务节点

如果上述示例中使用的抽象消息传送服务是单个同步服务器,则服务本身可以保证顺序。但是,无论从服务器角色还是服务器数量来看,消息传送服务(如 Pub/Sub)都不会是单一服务器。实际上,发布者和订阅者及 Pub/Sub 系统本身之间存在多个层次。下图更详细地介绍了 Pub/Sub 充当消息传送系统时发生的情况:

消息传送

如您所见,一条消息可以通过多条路径从发布者传送到订阅者。这种架构的优点是高可用性(单个服务器中断不会导致系统范围的延迟)和可扩缩(消息可以分发至多个服务器以便最大限度提高吞吐量)。相似分布式系统的优点对在运行 Pub/Sub 的相同系统上构建的 Google 产品(如 Google 搜索、Google Ads 和 Gmail)很有帮助。

应该如何处理顺序?

您现在应该已经理解为什么消息的顺序相当复杂,以及为什么 Pub/Sub 不再强调顺序的必要性。为了实现可用性和可扩缩性,您应务必尽量减轻对顺序的依赖。对顺序的依赖可能表现为多种形式,下面介绍了每种形式以及一些典型的使用场景和解决方案。

顺序无关紧要

典型使用场景:独立任务队列、事件统计信息收集

Pub/Sub 非常适合用于顺序无关紧要的使用场景。例如,如果您有独立任务需要由订阅者执行,每个任务都是一条消息,接收该消息的订阅者执行相应操作。又例如,如果您想收集有关客户端在您的服务器上执行的所有操作的统计信息,那么您可以为每个事件发布一条消息,然后让订阅者整理消息并在永久性存储空间中更新结果。

最终结果中的顺序很重要

典型使用场景:日志、状态更新

在此类别的使用场景中,消息的处理顺序并不重要;真正重要的是最终结果中的顺序必须正确。例如已处理并存储到磁盘上的整理日志。日志事件来自多个发布者。在这种情况下,日志事件的实际处理顺序并不重要;真正重要的是可以按照时间排序方式访问最终结果。因此,您可以为发布者中的每个事件附加一个时间戳,并使订阅者将消息存储在支持按时间戳排序进行存储或检索的某个底层数据存储区(如 Cloud Firestore)中。

同一方法适用于只需要访问最新状态的状态更新。例如,跟踪不同股票的当前价格,这种情况不关注历史价格,只关注最新股值。您可以为每次股票波动添加一个时间戳,仅存储比当前存储值更新的股值。

消息的处理顺序很重要

典型使用场景:必须强制实施阈值的事务性数据

完全依赖消息处理顺序的情况最为复杂。任何强制实施严格消息排序的解决方案都会以牺牲性能和吞吐量为代价。所以,仅当绝对必要并且确信不需要扩展至每秒大量消息时,您才应依赖顺序。为了按顺序处理消息,订阅者必须满足以下条件之一:

  • 知道未完成消息的完整列表以及处理这些消息时必须遵循的顺序,或者

  • 能够通过某种方法判断出在当前收到的所有消息中是否有消息需要优先处理,不考虑是否有消息尚未收到。

您可以为每条消息分配一个唯一标识符,并按处理消息时应遵循的顺序将其存储在某个永久位置(如 Cloud Firestore),来实现第一种方案。订阅者将检查永久性存储来获知必须处理的下一条消息,并且确保接下来仅处理该消息,然后等待处理完全按顺序到达的其他的消息。此时,有必要考虑将永久性存储本身用作消息队列,而不依赖于通过 Pub/Sub 传送消息。

第二种方案可通过使用 Cloud Monitoring 跟踪 pubsub.googleapis.com/subscription/oldest_unacked_message_age 指标(请参阅支持的指标了解相关说明)来实现。订阅者会暂时将所有消息存入某个永久性存储空间并确认消息。 它会定期检查最早的未确认消息的存在时长,并与存储空间中消息的发布时间戳进行对照。系统保证最早未确认消息之前发布的所有消息都已被接收,所以可以从永久性存储空间中删除并按顺序处理。

或者,如果您有单个同步发布者和单个订阅者,则可以使用序号来确保排序。这种方法需要使用永久性计数器。对于每条消息,发布者执行以下操作:

Node.js

如需了解如何创建 Pub/Sub 客户端,请参阅 Pub/Sub 客户端库

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishOrderedMessage() {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  const attributes = {
    // Pub/Sub messages are unordered, so assign an order ID and manually order messages
    counterId: `${getPublishCounterValue()}`,
  };

  // Publishes the message
  const messageId = await pubSubClient
    .topic(topicName)
    .publish(dataBuffer, attributes);

  // Update the counter value
  setPublishCounterValue(parseInt(attributes.counterId, 10) + 1);
  console.log(`Message ${messageId} published.`);

  return messageId;
}

return await publishOrderedMessage();

订阅者执行以下操作:

Node.js

如需了解如何创建 Pub/Sub 客户端,请参阅 Pub/Sub 客户端库

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 1000;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForOrderedMessages() {
  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubSubClient.subscription(subscriptionName);

  // Create an event handler to handle messages
  const messageHandler = function (message) {
    // Buffer the message in an object (for later ordering)
    outstandingMessages[message.attributes.counterId] = message;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);
  await new Promise(r => setTimeout(r, timeout * 1000));
  subscription.removeListener('message', messageHandler);

  // Pub/Sub messages are unordered, so here we manually order messages by
  // their "counterId" attribute which was set when they were published.
  const outstandingIds = Object.keys(outstandingMessages).map(counterId =>
    Number(counterId, 10)
  );
  outstandingIds.sort();

  outstandingIds.forEach(counterId => {
    const counter = getSubscribeCounterValue();
    const message = outstandingMessages[counterId];

    if (counterId < counter) {
      // The message has already been processed
      message.ack();
      delete outstandingMessages[counterId];
    } else if (counterId === counter) {
      // Process the message
      console.log(
        '* %d %j %j',
        message.id,
        message.data.toString(),
        message.attributes
      );
      setSubscribeCounterValue(counterId + 1);
      message.ack();
      delete outstandingMessages[counterId];
    } else {
      // Have not yet processed the message on which this message is dependent
      return false;
    }
  });
}

return await listenForOrderedMessages();

这两种解决方案都会在发布和处理消息时产生延迟;需要在发布者中执行一个同步步骤来建立顺序,并且订阅者会因对无序消息强制实施顺序产生延迟。

摘要

第一次使用消息传送服务时,让消息按顺序传送似乎是一项必要要求。这样可简化处理顺序非常重要的消息时必需的代码。但是,无论使用哪种消息传送系统,按顺序提供消息都会让可用性和可扩缩性大打折扣。对于在 Pub/Sub 所用的同一基础架构上构建的 Google 产品,可用性和可扩缩性是非常重要的特性,所以该服务不保证按顺序传送消息。设计应用时,请尽可能地避免依赖于消息顺序。这样,您才可以借助 Pub/Sub 的扩缩功能轻松扩缩,快速可靠地传送所有消息。

如果您决定使用 Pub/Sub 实现某种形式的消息排序,请参阅 Cloud FirestoreCloud SQL,详细了解如何实现本文档中所述的策略。