メッセージの順序指定

Cloud Pub/Sub では、高可用性とスケーラビリティを備えたメッセージ配信サービスを使用できます。これらの機能を使用する場合の妥協点として、サブスクライバーによって受信されるメッセージの順序は保証されません。順序指定できないのは問題のように思えますが、厳密な順序指定が実際に必要になる使用例はごくわずかです。

このドキュメントでは、メッセージの順序が実際に意味することと、それによる妥協点の概要を示します。また Cloud Pub/Sub に移行する際に、現在のワークフローの順序に依存しているメッセージングをどのように処理できるか、そのユースケースと手法について説明します。

順序とは

表面的には、順序指定されたメッセージングの概念はシンプルです。次の図は、メッセージがメッセージ配信サービスを介して流れていく様子を大まかに示しています。

順序指定とは

パブリッシャーはトピックに関するメッセージを Cloud Pub/Sub に送信します。次に、メッセージはサブスクリプションを介してサブスクライバーに配信されます。単一の同期パブリッシャー、単一の同期サブスクライバー、および単一の同期メッセージ配信サーバーがすべて 1 つの同期トランスポート層で実行されている場合、順序の概念はシンプルです。メッセージは、パブリッシャーがメッセージを正常にパブリッシュしたときに、いくつかの絶対時間またはシーケンスの測定値によって順序指定されます。

この単純なケースでも、メッセージの順序指定を保証するとスループットが多大な制約を受けます。メッセージの順序を確実に保証する唯一の方法は、メッセージ配信サービスでメッセージを 1 通ずつサブスクライバーに配信し、サブスクライバーが現在のメッセージを受信して処理したことをサービスが認識するまで(通常、サブスクライバーからサービスに送信される確認応答で認識)、次のメッセージの配信を待機するしかありません。一度にサブスクライバーに 1 通ずつメッセージを送信する場合のスループットはスケーリングできません。代わりに、サービスが保証できるのはメッセージの最初の配信が順番どおりに行われることのみであるため、いつでも再配信を試行できます。これにより、多数の送信をサブスクライバーに同時に送信できるようになります。ただし、この方法では順序指定による制約が緩和されているとしても、単一のパブリッシャー / メッセージ配信サービス / サブスクライバーのケース以外の場合は、「順序」に意味はほとんどありません。

本当に順序があるか

パブリッシャーやサブスクライバーによっては、メッセージの順序の定義は複雑になることがあります。まず、単一のサブスクリプションで複数のサブスクライバーがメッセージを処理する可能性があります。

複数のサブスクライバー

この場合、メッセージがサブスクリプションを介して順番に配信されたとしても、サブスクライバーによって処理される順序を保証することはできません。処理の順序が重要である場合、Cloud DatastoreCloud SQL などの ACID ストレージ システムを介してサブスクライバーを調整する必要があります。

同様に、同じトピックに関して複数のパブリッシャーが存在する場合も、順序指定が困難になることがあります。

複数のパブリッシャー

異なるパブリッシャーからのメッセージに、どのようにして順序を割り当てればいいのでしょうか。パブリッシャー自体が調整するか、メッセージ配信サービス自体がすべての受信メッセージに順序の概念を添付する必要があります。各メッセージには順序指定の情報を含める必要があります。順序指定情報はタイムスタンプ(ただしこれは、クロック ドリフトの問題を回避するために、同じソースから順番にすべてのサーバーが取得するタイムスタンプである必要がある)、またはシーケンス番号(ACID が保証されている単一のソースから取得)にすることができます。メッセージの順序を保証するその他のメッセージング システムでは、複数のパブリッシャーからの送信であっても、実質的には単一のサーバーから単一のサブスクライバーへのメッセージ送信にシステムを制限する設定が必要です。

拡張されたメッセージ配信サービスノード

上記の例で使用された抽象メッセージ配信サービスが単一の同期サーバーである場合、サービス自体で順序が保証されます。ただし、Cloud Pub/Sub などのメッセージ配信サービスは、サーバーの役割の観点からも、サーバー数の観点からも、単一のサーバーではありません。実際に、パブリッシャーおよびサブスクライバーと、Cloud Pub/Sub システム自体の間には層があります。Cloud Pub/Sub がメッセージ配信システムである場合に、どのように実行されるかをより詳細に示した図を以下に示します。

メッセージ配信

上図に示されているように、単一のメッセージがパブリッシャーからサブスクライバーへ到達する際に使用できるパスは多数あります。このようなアーキテクチャの利点は、高可用性(1 台のサーバーが停止してもシステム全体の遅延は生じない)と、スケーラビリティ(スループットを最大化するために、多数のサーバーにわたってメッセージを分散できる)です。このような分散システムの利点は、Cloud Pub/Sub を実行する同じシステム上に構築される Google 検索、Google 広告、Gmail などの Google サービスに有益なものです。

順序の処理方法

おそらく、メッセージの順序指定がかなり複雑である理由と、Cloud Pub/Sub では順序付けの必要性を重視しない理由はすでに理解していただけたはずです。可用性とスケーラビリティを獲得するには、順序への依存を最小限に抑えることが重要です。順序への依存にはいくつかのタイプがあります。各タイプとともに、典型的なユースケースと解決策を以下に示します。

順序がまったく重要ではないケース

典型的なユースケース: 個々のタスクの割り当て、イベントに関する統計の収集

順序がまったく問題にならないユースケースは Cloud Pub/Sub に最適です。たとえば、サブスクライバーが実行する必要のある個々のタスクがある場合に、各タスクがメッセージであり、そのメッセージを受け取るサブスクライバーがアクションを実行するケースなどです。別の例として、サーバー上でクライアントによって実行されたすべてのアクションに関する統計を収集する必要がある場合に、各イベント用のメッセージをパブリッシュした後、サブスクライバーにメッセージを順番に並べさせ、永続ストレージで結果を更新させるケースも挙げられます。

最終結果の順序が重要なケース

典型的なユースケース: ログ、状態の更新

このカテゴリのユースケースでは、メッセージが処理される順序は問題になりません。問題になるのは、最終結果が適切に順序指定されているということのみです。たとえば、処理され、ディスクに保存される、順序指定されたログについて考えてみましょう。ログイベントは複数のパブリッシャーから受信します。この場合、ログイベントが処理される実際の順序は問題になりません。問題になるのは、時系列に沿って最終結果にアクセスできるということのみです。このため、パブリッシャーの各イベントにタイムスタンプを添付し、サブスクライバーに、タイムスタンプの時系列に沿って保存または取得できるなんらかの基盤となるデータストア(Cloud Datastore など)にメッセージを保管させることができます。

直前の状態のみにアクセスする必要のある状態の更新についても、同様の操作を行えます。たとえば、履歴ではなく、直前の価格のみに注意が払われる、さまざまな在庫品の現在価格の追跡について考えてみましょう。各在庫品のティックにタイムスタンプを添付し、現在保存されている値よりも最近のもののみを保存できます。

メッセージの処理順序が重要なケース

典型的なユースケース: しきい値を強制する必要のあるトランザクション データ

メッセージが処理される順序に完全に依存するのは、最も複雑なケースとなります。メッセージの厳密な順序指定を強制するソリューションでは、パフォーマンスとスループットが犠牲になります。順序指定に依存するのは、それが絶対に必要であり、秒あたりに多数のメッセージを処理できるようにスケールする必要がないことがわかっている場合のみにしてください。メッセージを順番に処理するには、サブスクライバーが以下のいずれかである必要があります。

  • 未処理のメッセージのリスト全体と、それらが処理される必要のある順序を把握している。

  • 現在受け取っているすべてのメッセージから、最初に処理する必要のあるメッセージでまだ受け取っていないものがあるかどうかを判別できる。

最初のオプションは、各メッセージに一意の ID を割り当て、なんらかの永続的な場所(Cloud Datastore など)にメッセージが処理される必要のある順序を保管することで実現できます。サブスクライバーはその永続ストレージをチェックして、次に処理する必要のあるメッセージを把握し、確実に次回はそのメッセージの処理のみを行います。また、他のメッセージを完全な順序で受け取る間、その処理を待機します。その時点で、メッセージ キューとして永続ストレージ自体を使用し、Cloud Pub/Sub を介したメッセージ配信に依存しないことを検討する価値はあります。

後者については、Cloud Monitoring を使用して pubsub.googleapis.com/subscription/oldest_unacked_message_age 指標を追跡できます(詳細については、サポートされている指標をご覧ください)。サブスクライバーは一時的にすべてのメッセージをなんらかの永続ストレージに置き、メッセージを ACK 処理します。これは、最も古い ACK 処理されていないメッセージの経過時間を定期的にチェックし、ストレージ内のメッセージのパブリッシュ タイムスタンプと比較してチェックします。最も古い ACK 処理されていないメッセージよりも前にパブリッシュされたすべてのメッセージは受信されたことが保証されているため、それらのメッセージは永続ストレージから削除し、順番に処理できます。

あるいは、単一の同期パブリッシャーと単一のサブスクライバーがある場合、シーケンス番号を使用して順序指定を確実に行えます。この手法では、永続カウンタを使用する必要があります。メッセージごとに、パブリッシャーは次のコードを実行します。

Node.js

Cloud Pub/Sub クライアントの作成方法については、Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const attributes = {
  // Pub/Sub messages are unordered, so assign an order ID and manually order messages
  counterId: `${getPublishCounterValue()}`,
};

// Publishes the message
const messageId = await pubsub
  .topic(topicName)
  .publish(dataBuffer, attributes);
// Update the counter value
setPublishCounterValue(parseInt(attributes.counterId, 10) + 1);
console.log(`Message ${messageId} published.`);
return messageId;

サブスクライバーは次のコードを実行します。

Node.js

Cloud Pub/Sub クライアントの作成方法については、Cloud Pub/Sub クライアント ライブラリをご覧ください。

const outstandingMessages = {};

async function listenForOrderedMessages(subscriptionName, timeout) {
  // Imports the Google Cloud client library
  const {PubSub} = require('@google-cloud/pubsub');

  // Creates a client
  const pubsub = new PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Create an event handler to handle messages
  const messageHandler = function(message) {
    // Buffer the message in an object (for later ordering)
    outstandingMessages[message.attributes.counterId] = message;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on(`message`, messageHandler);
  await new Promise(r => setTimeout(r, timeout * 1000));
  subscription.removeListener(`message`, messageHandler);

  // Pub/Sub messages are unordered, so here we manually order messages by
  // their "counterId" attribute which was set when they were published.
  const outstandingIds = Object.keys(outstandingMessages).map(counterId =>
    Number(counterId, 10)
  );
  outstandingIds.sort();

  outstandingIds.forEach(counterId => {
    const counter = getSubscribeCounterValue();
    const message = outstandingMessages[counterId];

    if (counterId < counter) {
      // The message has already been processed
      message.ack();
      delete outstandingMessages[counterId];
    } else if (counterId === counter) {
      // Process the message
      console.log(
        `* %d %j %j`,
        message.id,
        message.data.toString(),
        message.attributes
      );
      setSubscribeCounterValue(counterId + 1);
      message.ack();
      delete outstandingMessages[counterId];
    } else {
      // Have not yet processed the message on which this message is dependent
      return false;
    }
  });
}

これらのどのソリューションでも、メッセージのパブリッシュと処理にレイテンシが発生するため、パブリッシャーでは同期ステップで順序を作成し、サブスクライバーでは順番が入れ替わったメッセージに遅延処理を行って順序を守る必要があります。

まとめ

初めてメッセージ配信サービスを使用するときは、メッセージが順序どおりに配信されることは望ましい特性のように思えます。そうすれば、順序が重要な場合にメッセージ処理に必要なコードが簡略化されます。しかし、メッセージの順序を守るには、使用するメッセージ配信システムに関係なく、可用性とスケーラビリティに関して多大な犠牲を払うことになります。Cloud Pub/Sub の同じインフラストラクチャ上に構築されている Google サービスの場合、可用性とスケーラビリティは極めて重要な機能であるため、このサービスでは順序指定されたメッセージ配信は提供されていません。可能な限り、メッセージの順序に依存しないようにアプリケーションを設計してください。Cloud Pub/Sub のスケーリング機能を使用して簡単にスケールし、すべてのメッセージをすばやく、信頼性のある方法で配信できます。

Cloud Pub/Sub を使用してメッセージの順序指定を行う場合には、Cloud DatastoreCloud SQL を確認して、このドキュメントで説明した手法の実装方法を学習してください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

Cloud Pub/Sub ドキュメント