メッセージの順序指定

Google Cloud Pub/Sub では、高可用性とスケーラビリティを持ったメッセージ配信サービスを使用できます。これらの機能を使用する場合の妥協点として、サブスクライバーによって受信されるメッセージの順序は保証されません。順序指定できないのは問題のように思えますが、厳密な順序指定が実際に必要になる使用例はごくわずかです。

このドキュメントでは、メッセージの順序が実際に意味することと、それによる妥協点の概要と、Google Cloud Pub/Sub に移動するときに現在のワークフローで順序に依存しているメッセージングを処理するユースケースとテクニックについて説明します。

順序とは

表面的には、順序指定されたメッセージングの概念はシンプルです。次の図は、メッセージがメッセージ配信サービスを介して流れていく様子を大まかに示しています。

順序指定とは

パブリッシャーはトピックに関するメッセージを Google Cloud Pub/Sub に送信します。次に、メッセージはサブスクリプションを介してサブスクライバーに配信されます。単一の同期パブリッシャー、単一の同期サブスクライバー、および単一の同期メッセージ配信サーバーがすべて 1 つの同期トランスポート層で実行されている場合、順序の概念はシンプルです。メッセージは、パブリッシャーがメッセージを正常にパブリッシュしたときに、いくつかの絶対時間またはシーケンスの測定値によって順序指定されます。

この単純なケースでも、メッセージの順序指定を保証するとスループットが多大な制約を受けます。メッセージの順序を確実に保証する唯一の方法は、メッセージ配信サービスでメッセージを 1 通ずつサブスクライバーに配信し、サブスクライバーが現在のメッセージを受信して処理したことをサービスが認識するまで(通常、サブスクライバーからサービスに送信される受信確認で認識)、次のメッセージの配信を待機するしかありません。一度にサブスクライバーに 1 通ずつメッセージを送信する場合のスループットは拡大できません。代わりに、サービスが保証できるのはメッセージの最初の配信が順番どおりに行われることのみであるため、いつでも再配信を試行できます。これにより、多数の送信をサブスクライバーに同時に送信できるようになります。ただし、この方法では順序指定による拘束が緩和されているとしても、単一のパブリッシャー / メッセージ配信サービス / サブスクライバーのケース以外の場合は、「順序」に意味はほとんどありません。

本当に順序があるか

パブリッシャーやサブスクライバーによっては、メッセージの順序の定義は複雑になることがあります。第 1 に、単一のサブスクリプションで複数のサブスクライバーがメッセージを処理する可能性があります。

複数のサブスクライバー

この場合、メッセージがサブスクリプションを介して順番に受信したとしても、メッセージがサブスクライバーによって処理される順番を保証することはできません。処理の順序が重要である場合、Cloud DatastoreCloud SQL など、なんらかの ACID ストレージ システムを介してサブスクライバーを調整する必要があります。

同様に、同じトピックに関して複数のパブリッシャーが存在する場合も、順序指定が困難になることがあります。

複数のパブリッシャー

異なるパブリッシャーからのメッセージに、どのようにして順序を割り当てればいいのでしょうか。パブリッシャー自体が調整するか、メッセージ配信サービス自体がすべての受信メッセージに順序の概念を添付する必要があります。各メッセージには順序指定の情報を含める必要があります。順序指定情報はタイムスタンプ(ただしこれは、クロック ドリフトの問題を回避するために、同じソースから順番にすべてのサーバーが取得するタイムスタンプである必要がある)、またはシーケンス番号(ACID が保証されている単一のソースから取得)にすることができます。メッセージの順序を保証するその他のメッセージング システムでは、単一のサーバーから単一のサブスクライバーにメッセージを送信する複数のパブリッシャーにシステムを効率的に制限する設定が必要です。

拡張されたメッセージ配信サービスノード

上記の例で使用された抽象メッセージ配信サービスが単一の同期サーバーである場合、サービス自体で順序が保証されます。ただし、Google Cloud Pub/Sub などのメッセージ配信サービスは、サーバーの役割の観点からも、サーバー数の観点からも、単一のサーバーではありません。実際に、パブリッシャーおよびサブスクライバーと、Google Cloud Pub/Sub システム自体の間には層があります。Google Cloud Pub/Sub がメッセージ配信システムである場合に、どのように実行されるかをより詳細に示した図を以下に示します。

メッセージ配信

上記に示されているように、単一のメッセージがパブリッシャーからサブスクライバーに使用できるパスは多数あります。このようなアーキテクチャーの利点は、高可用性(1 台のサーバーが停止してもシステム全体の遅延を生じない)と、スケーラビリティ(スループットを最大化するために、多数のサーバーにわたってメッセージを分散できる)です。このような分散システムの利点は、Google Cloud Pub/Sub を実行する同じシステムの最上部にビルドされる Google 検索、Ads、Gmail などの Google プロダクトに有益なものです。

順序の処理方法

おそらく、メッセージの順序指定がかなり複雑である理由と、Google Cloud Pub/Sub では順序付けの必要性を重視しない理由はすでに理解していただけたはずです。可用性とスケーラビリティを獲得するには、順序への依存を最小限に抑えることが重要です。順序への依存にはいくつかのタイプがあります。各タイプとともに、典型的なユースケースと解決策を以下に示します。

順序がまったく重要ではないケース

典型的なユースケース: 個々のタスクの割り当て、イベントに関する統計の収集

順序がまったく問題にならないユースケースは Google Cloud Pub/Sub に最適です。たとえば、サブスクライバーが実行する必要のある個々のタスクがある場合に、各タスクがメッセージであり、そのメッセージを受け取るサブスクライバーがアクションを実行するケースなどです。別の例として、サーバー上でクライアントによって実行されたすべてのアクションに関する統計を収集する必要がある場合に、各イベント用のメッセージをパブリッシュした後、サブスクライバーにメッセージを順番に並べさせ、永続ストレージで結果を更新させるケースも挙げられます。

最終結果の順序が重要なケース

典型的なユースケース: ログ、状態の更新

このカテゴリーのユースケースでは、メッセージが処理される順序は問題になりません。問題になるのは、最終結果が適切に順序指定されているということのみです。たとえば、処理され、ディスクに保存される、順序指定されたログについて考えてみましょう。ログイベントは複数のパブリッシャーから受信します。この場合、ログイベントが処理される実際の順序は問題になりません。問題になるのは、時系列に沿って最終結果にアクセスできるということのみです。このため、パブリッシャーの各イベントにタイムスタンプを添付し、サブスクライバーに、タイムスタンプの時系列に沿って保存または取得できる何らかの基盤となるデータストア(Cloud Datastore など)にメッセージを保管させることができます。

直前の状態のみにアクセスする必要のある状態の更新についても、同じオプションが機能します。たとえば、履歴ではなく、直前の価格のみに注意が払われる、さまざまな在庫品の現在価格の追跡について考えてみましょう。各在庫品のティックにタイムスタンプを添付し、現在保存されている値よりも最近のもののみを保存することができます。

メッセージの処理順序が重要なケース

典型的なユースケース: しきい値を強制する必要のあるトランザクション データ

メッセージが処理される順序に完全に依存するのは、最も複雑なケースとなります。メッセージの厳密な順序指定を強制するソリューションは、パフォーマンスとスループットを犠牲にして実行されます。順序指定に依存するのは、それが絶対に必要であり、秒あたりに多数のメッセージを処理できるようにスケールする必要がないことがわかっている場合のみにしてください。メッセージを順番に処理するには、サブスクライバーが以下のいずれかである必要があります。

  • 未処理のメッセージのリスト全体と、それらが処理される必要のある順序を把握している。

  • 現在受け取っているすべてのメッセージから、最初に処理する必要のあるメッセージでまだ受け取っていないものがあるかどうかを判別する方法を利用できる。

最初のオプションは、各メッセージに一意の ID を割り当て、何らかの永続的な場所(Cloud Datastore など)にメッセージが処理される必要のある順序を保管することで実現できます。サブスクライバーはその永続ストレージをチェックして、次に処理する必要のあるメッセージを把握し、確実に次回はそのメッセージの処理のみを行い、受信した他のメッセージが完全な順序指定のリストを上がってくる間、その処理を待機します。その時点で、メッセージ キューとして永続ストレージ自体を使用し、Google Cloud Pub/Sub を介したメッセージ配信に依存しないことを検討する価値はあります。

後者は、Cloud Monitoring を使用して pubsub.googleapis.com/subscription/oldest_unacked_message_age 指標を追跡し続けることで実現できます(詳しくは、サポートされている指標を参照)。サブスクライバーは一時的にすべてのメッセージを何らかの永続ストレージに置き、メッセージを ACK 処理します。これは、最も古い ACK 処理されていないメッセージの経過時間を定期的にチェックし、ストレージ内のメッセージのパブリッシュ タイムスタンプと比較してチェックします。最も古い ACK 処理されていないメッセージよりも前にパブリッシュされたすべてのメッセージは受信されたことが保証されているため、それらのメッセージは永続ストレージから削除し、順番に処理できます。

あるいは、単一の同期パブリッシャーと単一のサブスクライバーがある場合、シーケンス番号を使用して順序指定を確実に行えます。この手法では、永続カウンターを使用する必要があります。メッセージごとに、パブリッシャーは次を実行します。

Node.js

Cloud Pub/Sub クライアントの作成方法については、Cloud Pub/Sub クライアント ライブラリをご覧ください。

function publishOrderedMessage (topicName, data) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing topic, e.g. "my-topic"
  const topic = pubsub.topic(topicName);

  const message = {
    data: data,

    // Pub/Sub messages are unordered, so assign an order id to the message to
    // manually order messages
    attributes: {
      counterId: `${getPublishCounterValue()}`
    }
  };

  // Publishes the message, use raw: true to pass a message with attributes
  return topic.publish(message, { raw: true })
    .then((results) => {
      const messageIds = results[0];

      // Update the counter value
      setPublishCounterValue(parseInt(message.attributes.counterId, 10) + 1);

      console.log(`Message ${messageIds[0]} published.`);

      return messageIds;
    });
}

サブスクライバーは次を実行します。

Node.js

Cloud Pub/Sub クライアントの作成方法については、Cloud Pub/Sub クライアント ライブラリをご覧ください。

const outstandingMessages = {};

function pullOrderedMessages (subscriptionName) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Pulls messages. Set returnImmediately to false to block until messages are
  // received.
  return subscription.pull()
    .then((results) => {
      const messages = results[0];

      // Pub/Sub messages are unordered, so here we manually order messages by
      // their "counterId" attribute which was set when they were published.
      messages.forEach((message) => {
        outstandingMessages[message.attributes.counterId] = message;
      });

      const outstandingIds = Object.keys(outstandingMessages).map((counterId) => parseInt(counterId, 10));
      outstandingIds.sort();

      outstandingIds.forEach((counterId) => {
        const counter = getSubscribeCounterValue();
        const message = outstandingMessages[counterId];

        if (counterId < counter) {
          // The message has already been processed
          subscription.ack(message.ackId);
          delete outstandingMessages[counterId];
        } else if (counterId === counter) {
          // Process the message
          console.log(`* %d %j %j`, message.id, message.data, message.attributes);

          setSubscribeCounterValue(counterId + 1);
          subscription.ack(message.ackId);
          delete outstandingMessages[counterId];
        } else {
          // Have not yet processed the message on which this message is dependent
          return false;
        }
      });
    });
}

これらのいずれかのソリューションを実行すると、メッセージのパブリッシュと処理にレイテンシが発生するため、パブリッシャーでは同期ステップで順序を作成し、サブスクライバーでは順番が入れ替わったメッセージに遅延処理を行って順序を守る必要があります。

まとめ

初めてメッセージ配信サービスを使用するときは、メッセージが順序どおりに配信されることは望ましい特性のように思えます。そうすれば、順序が重要な場合にメッセージ処理に必要なコードが簡略化されます。しかし、メッセージの順序を守るには、使用するメッセージ配信システムに関係なく、可用性とスケーラビリティに関して多大な犠牲を払うことになります。Google Cloud Pub/Sub の同じインフラストラクチャーにビルドされている Google サービスの場合、可用性とスケーラビリティは極めて重要な機能であるため、このサービスでは順序指定されたメッセージ配信は提供されていません。可能な限り、メッセージの順序に依存しないようにアプリケーションを設計してください。Google Cloud Pub/Sub のスケーリング機能を使用して簡単にスケールし、すべてのメッセージをすばやく、信頼性のある方法で配信できます。

Google Cloud Pub/Sub を使用してなんらかの形式のメッセージ順序指定を実施するように決定した場合は、Cloud DatastoreCloud SQL を確認して、このドキュメントで説明されている手法の実行方法を学習してください。

外出先でもリソースをモニタリング

Google Cloud Console アプリを入手して、プロジェクトの管理にお役立てください。

フィードバックを送信...

Cloud Pub/Sub のドキュメント