Ordena mensajes

Pub/Sub proporciona un servicio de entrega de mensajes escalable y con alta disponibilidad. Estas propiedades se compensan con una desventaja, que consiste en que no se garantiza el orden en el que los suscriptores reciben los mensajes. Si bien la falta de orden puede parecer dificultosa, hay muy pocos casos prácticos en los que en verdad se requiere un orden estricto.

En este documento, se incluye una descripción general del significado real del orden de los mensajes y de sus compensaciones, además de un análisis sobre los casos prácticos y las técnicas para tratar los mensajes dependientes del orden en tu flujo de trabajo actual cuando pasas a Pub/Sub.

¿Qué es el orden?

A simple vista, la idea de que los mensajes estén en orden parece simple. En la siguiente imagen, se representa de forma general cómo un mensaje circula a través de un servicio de entrega de mensajes:

¿Qué orden?

El publicador envía un mensaje en un tema dentro Pub/Sub. Luego, el mensaje se entrega al suscriptor a través de una suscripción. En el caso de un publicador síncrono único, un suscriptor síncrono único y un servidor de entrega de mensajes síncrono único (todos en ejecución en una capa de transporte síncrono), la noción de orden parece simple: los mensajes se ordenan según el momento en el que el publicador los publica con éxito, de acuerdo con una secuencia o medida de tiempo absoluto.

Incluso en este caso simple, garantizar el orden de los mensajes impondría limitaciones graves a la capacidad de procesamiento. La única manera de garantizar el orden de los mensajes es que el servicio entregue los mensajes uno por uno al suscriptor y que, para entregar el siguiente mensaje, espere hasta tener la certeza de que el suscriptor recibió y procesó el mensaje actual (por lo general, mediante una confirmación de recepción enviada por el suscriptor al servicio). La capacidad de procesamiento del envío de un mensaje a la vez al suscriptor no es escalable. En cambio, el servicio podría tan solo garantizar que la primera entrega de cualquier mensaje esté en orden y, así, permitir que los intentos de volver a entregar un mensaje se realicen en cualquier momento. Eso permitiría enviar muchos mensajes al suscriptor a la vez. Sin embargo, incluso si las restricciones de orden son menos estrictas de esta manera, “ordenar” pierde sentido a medida que nos alejamos del caso de un publicador, suscriptor y servicio de entrega de mensajes único.

¿En verdad tengo un orden?

Definir el orden de los mensajes puede ser complicado, según tus publicadores y suscriptores. En primer lugar, es posible que varios de tus suscriptores procesen mensajes en una sola suscripción:

Varios suscriptores

En este caso, incluso si los mensajes provienen de la suscripción en orden, no hay garantías del orden en el que tus suscriptores los procesarán. Si el orden de procesamiento es importante, los suscriptores deberán coordinarse con algún sistema de almacenamiento ACID como Cloud Firestore o Cloud SQL.

Del mismo modo, contar con varios publicadores en un mismo tema puede dificultar el orden:

Varios publicadores

¿Cómo se les asigna un orden a los mensajes publicados por diferentes publicadores? Los publicadores deben coordinarse o el servicio de entrega de mensajes debe adjuntar una noción de orden a cada mensaje entrante. Cada mensaje debe incluir la información sobre el orden. La información sobre el orden puede ser una marca de tiempo (pero debe ser una marca de tiempo que todos los servidores obtengan de la misma fuente para evitar problemas de desajuste del reloj) o un número de secuencia (adquirido de una fuente única con garantías ACID). Otros sistemas de mensajería que garantizan el orden de los mensajes requieren una configuración que limite con efectividad el sistema para varios publicadores que envían mensajes a través de un servidor único a un solo suscriptor.

El nodo del servicio de entrega de mensajes, expandido

Si el servicio de entrega de mensajes abstracto que se usó en los ejemplos anteriores fuera un servidor único y síncrono, el servicio mismo podría garantizar el orden. Sin embargo, un servicio de entrega de mensajes como Pub/Sub no es un servidor único, en cuanto a las funciones del servidor y la cantidad de servidores. De hecho, existen capas entre tus publicadores y suscriptores, y el propio sistema Pub/Sub. A continuación, se muestra un diagrama más detallado de lo que sucede cuando Pub/Sub es el sistema de entrega de mensajes en uso:

Entrega de mensajes

Como puedes ver, existen muchas rutas que un mismo mensaje puede tomar para llegar desde el publicador hasta el suscriptor. La ventaja de una arquitectura de este tipo es que cuenta con alta disponibilidad (la interrupción de un servidor único no genera demoras en todo el sistema) y es escalable (los mensajes se pueden distribuir en muchos servidores para maximizar la capacidad de procesamiento). Los beneficios que producen los sistemas distribuidos como este resultaron fundamentales para los productos de Google, como Búsqueda, Ads y Gmail, que están compilados en los mismos sistemas que ejecutan Pub/Sub.

¿Cómo debo manejar el orden?

Con suerte, ahora comprendes por qué el orden de los mensajes es complejo y por qué Pub/Sub le resta importancia a la necesidad de orden. Para lograr la disponibilidad y escalabilidad, es importante que minimices tu dependencia del orden. La dependencia del orden puede adoptar varias formas. A continuación, se describe cada una de ellas y se incluyen algunos casos prácticos típicos con sus soluciones.

El orden no tiene importancia alguna

Casos prácticos típicos: Lista de tareas independientes en cola, recopilación de estadísticas sobre eventos

Los casos prácticos en los que el orden no tiene importancia alguna son perfectos para Pub/Sub. Por ejemplo, si hay tareas independientes que tus suscriptores deben realizar, cada tarea es un mensaje y el suscriptor que lo recibe realiza la acción. Como segundo ejemplo, si deseas recopilar estadísticas sobre todas las acciones realizadas por los clientes en tu servidor, puedes publicar un mensaje por cada evento y, luego, hacer que los suscriptores recopilen los mensajes y actualicen los resultados en el almacenamiento continuo.

El orden en el resultado final es importante

Casos prácticos típicos: Registros, actualizaciones de estado

En los casos prácticos de esta categoría, el orden en el que se procesan los mensajes no importa. Lo que sí importa es que el resultado final se ordene de forma correcta. Por ejemplo, imagina un registro recopilado que se procesa y almacena en el disco. Los eventos de registro provienen de varios publicadores. En este caso, el orden real en el que se procesan los eventos de registro no importa. Lo único que importa es que el resultado final esté ordenado de forma cronológica. Por lo tanto, podrías adjuntar una marca de tiempo a cada evento del publicador y hacer que el suscriptor almacene los mensajes en un almacén de datos subyacente (como Cloud Firestore) que permita el almacenamiento o la recuperación según la marca de tiempo ordenada.

La misma opción funciona con las actualizaciones de estado que requieren acceso solo al estado más reciente. Por ejemplo, imagina que realizas un seguimiento de los precios actuales de diferentes acciones sin que te interese su historial, solo el valor más reciente. Podrías adjuntar una marca de tiempo a cada variación de la acción y solo almacenar las que sean más recientes que el valor almacenado en el momento.

El orden de los mensajes procesados es importante

Casos prácticos típicos: Datos transaccionales en los que deben aplicarse umbrales.

Los casos en los que hay una dependencia total del orden en el que se procesan los mensajes son los más complicados. Cualquier solución que imponga un orden estricto de los mensajes afectará al rendimiento y la capacidad de procesamiento. Solo debes depender del orden cuando sea en verdad necesario y cuando estés seguro de que no necesitarás escalar a una cantidad mayor de mensajes por segundo. Para procesar los mensajes en orden, un suscriptor debe cumplir con alguno de estos aspectos:

  • Conocer la lista completa de mensajes pendientes y el orden en el que deben procesarse

  • Tener una manera de determinar, a partir de todos los mensajes recibidos hasta el momento, si hay mensajes que aún no recibió y que necesita procesar primero

Puedes implementar la primera opción si le asignas a cada mensaje un identificador único y almacenas el orden en el que se deben procesar los mensajes en una ubicación continua (como Cloud Firestore). Un suscriptor verifica el almacenamiento continuo para determinar el próximo mensaje que debe procesar y se asegura de procesar solo ese mensaje a continuación, de modo que espera para procesar los otros mensajes que haya recibido cuando llegan en el orden completo. En ese momento, vale la pena considerar el uso del almacenamiento continuo en sí mismo como la lista de mensajes en cola y no depender de la entrega de mensajes a través de Pub/Sub.

Esto último se puede lograr si usas Cloud Monitoring con el fin de realizar un seguimiento de la métrica pubsub.googleapis.com/subscription/oldest_unacked_message_age (consulta la página sobre métricas admitidas para obtener una descripción). Un suscriptor coloca por un tiempo todos los mensajes en un almacenamiento continuo y los confirma. Verifica de forma periódica la antigüedad del mensaje sin confirmar más antiguo y la compara con las marcas de tiempo de publicación de los mensajes almacenados. Se garantiza que se recibieron todos los mensajes publicados antes del mensaje no confirmado más antiguo, por lo que esos mensajes se pueden quitar del almacenamiento continuo y se los puede procesar en orden.

Como alternativa, si tienes un publicador síncrono y un suscriptor únicos, puedes usar un número de secuencia para garantizar el orden. Este método requiere el uso de un contador continuo. Para cada mensaje, el publicador realiza lo siguiente:

Node.js

Para aprender a crear un cliente de Pub/Sub, consulta la documentación sobre bibliotecas cliente de Pub/Sub.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const attributes = {
  // Pub/Sub messages are unordered, so assign an order ID and manually order messages
  counterId: `${getPublishCounterValue()}`,
};

// Publishes the message
const messageId = await pubsub
  .topic(topicName)
  .publish(dataBuffer, attributes);
// Update the counter value
setPublishCounterValue(parseInt(attributes.counterId, 10) + 1);
console.log(`Message ${messageId} published.`);
return messageId;

El suscriptor realiza lo siguiente:

Node.js

Para aprender a crear un cliente de Pub/Sub, consulta la documentación sobre bibliotecas cliente de Pub/Sub.

const outstandingMessages = {};

async function listenForOrderedMessages(subscriptionName, timeout) {
  // Imports the Google Cloud client library
  const {PubSub} = require('@google-cloud/pubsub');

  // Creates a client
  const pubsub = new PubSub();

  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubsub.subscription(subscriptionName);

  // Create an event handler to handle messages
  const messageHandler = function(message) {
    // Buffer the message in an object (for later ordering)
    outstandingMessages[message.attributes.counterId] = message;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on(`message`, messageHandler);
  await new Promise(r => setTimeout(r, timeout * 1000));
  subscription.removeListener(`message`, messageHandler);

  // Pub/Sub messages are unordered, so here we manually order messages by
  // their "counterId" attribute which was set when they were published.
  const outstandingIds = Object.keys(outstandingMessages).map(counterId =>
    Number(counterId, 10)
  );
  outstandingIds.sort();

  outstandingIds.forEach(counterId => {
    const counter = getSubscribeCounterValue();
    const message = outstandingMessages[counterId];

    if (counterId < counter) {
      // The message has already been processed
      message.ack();
      delete outstandingMessages[counterId];
    } else if (counterId === counter) {
      // Process the message
      console.log(
        `* %d %j %j`,
        message.id,
        message.data.toString(),
        message.attributes
      );
      setSubscribeCounterValue(counterId + 1);
      message.ack();
      delete outstandingMessages[counterId];
    } else {
      // Have not yet processed the message on which this message is dependent
      return false;
    }
  });
}

Cualquiera de estas soluciones genera latencia en el proceso de publicación y el procesamiento de mensajes. El publicador debe contar con un paso síncrono para crear el orden y el suscriptor debe contar con una demora en los mensajes desordenados a fin de que se cumpla el orden.

Resumen

Cuando se usa un servicio de entrega de mensajes por primera vez, la propiedad de entrega de mensajes en orden resulta conveniente. Simplifica el código que se necesita para procesar mensajes en los que el orden es importante. Sin embargo, proporcionar mensajes en orden afecta en profundidad a la disponibilidad y la escalabilidad, sin importar el sistema de entrega de mensajes que uses. En los productos de Google compilados en la misma infraestructura de Pub/Sub, la disponibilidad y la escalabilidad son características de vital importancia, por lo que el servicio no ofrece la entrega de mensajes en orden. Siempre que sea posible, diseña tus aplicaciones para que eviten la dependencia del orden de los mensajes. De ese modo, podrás escalar con facilidad mediante el escalamiento de Pub/Sub y, así, podrás entregar todos sus mensajes con rapidez y confiabilidad.

Si decides implementar algún tipo de orden de mensajes con Pub/Sub, consulta la documentación de Cloud Firestore y Cloud SQL a fin de obtener más información sobre cómo implementar las estrategias descritas en este documento.