Pub/Sub admite la entrega de mensajes de envío y extracción. Si quieres obtener una descripción general y una comparación de suscripciones de extracción y de envío, consulta la descripción general de los suscriptores. En este documento, se describe la entrega de extracción. Si quieres obtener más información sobre la entrega de envío, consulta la guía del suscriptor de envío.
Extracción asíncrona
El uso de la extracción asíncrona proporciona una mayor capacidad de procesamiento en tu aplicación, ya que no requiere que tu aplicación bloquee los mensajes nuevos. Tu aplicación puede recibir mensajes con agente de escucha de mensajes de larga duración, y se puede confirmar un mensaje a la vez, como se muestra en el ejemplo a continuación. Los clientes Java, Python, .NET, Go y Ruby usan la API del servicio de streamingPull para implementar la API del cliente asíncrona de manera eficiente.
No todas las bibliotecas cliente admiten la extracción asíncrona de mensajes. Si quieres obtener más información sobre cómo hacer una extracción de mensajes síncrona, consulta la página sobre extracción síncrona.
Si quieres obtener más información, consulta la documentación de referencia de la API en tu lenguaje de programación.
C++
Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.
C#
Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.
Go
Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.
Java
Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.
Node.js
Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.
Ruby
Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.
Procesa atributos personalizados
En este ejemplo, se muestra cómo extraer mensajes de forma asíncrona y recuperar los atributos personalizados de los metadatos:
C++
Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.
C#
Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.
Go
Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.
Java
Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.
Node.js
Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.
Ruby
Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.
Soluciona errores
En este ejemplo, se muestra cómo resolver los errores que surgen cuando se suscriben mensajes:
C++
Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.
Go
Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.
Java
Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.
Node.js
Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.
Ruby
Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.
Control de flujo de mensajes
Es posible que tu cliente suscriptor procese y confirme los mensajes de forma más lenta de lo que Pub/Sub los envía al cliente. En este caso, puede ocurrir lo siguiente:
Puede ser que un cliente tenga una acumulación de mensajes porque no tiene la capacidad de procesar el volumen de mensajes entrantes, pero otro cliente en la red sí tenga esa capacidad. El segundo cliente podría reducir el trabajo acumulado de la suscripción, pero no tiene la oportunidad de hacerlo porque el primer cliente mantiene una asignación de tiempo en los mensajes que recibe. Esto reduce la tasa general de procesamiento ya que los mensajes se atascan en el primer cliente.
Debido a que la biblioteca cliente extiende muchas veces la fecha límite de confirmación para los mensajes atrasados, esos mensajes continúan consumiendo recursos de memoria, CPU y ancho de banda. Por lo tanto, es posible que el cliente suscriptor se quede sin recursos (como la memoria). Esto puede afectar de forma negativa la capacidad de procesamiento y la latencia del procesamiento de mensajes.
Para mitigar los problemas anteriores, usa las funciones de control de flujo del suscriptor para controlar la velocidad a la que el suscriptor recibe los mensajes. Estas funciones de control de flujo se ilustran en los siguientes ejemplos:
C++
Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.
C#
Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.
Go
Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.
Java
Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.
Node.js
Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.
Ruby
Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.
En términos más generales, la necesidad de control de flujo indica que los mensajes se publican a una frecuencia mayor de la que se consumen. Si se trata de un estado persistente, en lugar de un pico transitorio del volumen de mensajes, considera aumentar la cantidad de instancias de cliente suscriptor.
Control de simultaneidad
La asistencia para la simultaneidad depende de tu lenguaje de programación. A fin de implementar lenguajes que sean compatibles con subprocesos paralelos, como Java y Go, las bibliotecas cliente realizan una elección predeterminada para la cantidad de subprocesos. Es posible que esta opción no sea óptima para tu aplicación. Por ejemplo, si tu aplicación de suscriptor no mantiene el ritmo del volumen de mensajes entrantes y no está vinculada a la CPU, debes aumentar el conteo de subprocesos. En el caso de las operaciones de procesamiento de mensajes con uso intensivo de CPU, podría ser conveniente reducir la cantidad de subprocesos.
En el siguiente ejemplo, se muestra cómo controlar la simultaneidad en un suscriptor:
C++
Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.
Go
Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.
Java
Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.
Ruby
Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.
La asistencia para la simultaneidad depende de tu lenguaje de programación. Consulta la documentación de referencia de la API para obtener más información.
StreamingPull
En el servicio Pub/Sub existen dos API para recuperar mensajes:
Siempre que sea posible, en las bibliotecas cliente de Cloud se usa StreamingPull para obtener la mayor capacidad de procesamiento y la latencia más baja. Aunque es posible que nunca uses directamente la API de StreamingPull, es importante comprender algunas propiedades cruciales de StreamingPull y cómo difiere del método de extracción más tradicional.
El método de extracción se basa en un modelo de solicitud/respuesta:
- El cliente envía una solicitud al servidor para enviar mensajes.
- El servidor responde con cero o más mensajes y cierra la conexión.
La API del servicio StreamingPull depende de una conexión bidireccional persistente para recibir múltiples mensajes a medida que estén disponibles:
- El cliente envía una solicitud al servidor para establecer una conexión.
- El servidor envía mensajes de forma continua al cliente conectado.
- El cliente o el servidor terminan la conexión.
Debes proporcionar una devolución de llamada al suscriptor y este ejecuta la devolución de llamada de forma asíncrona para cada mensaje. Si un suscriptor recibe mensajes con la misma clave de ordenamiento, las bibliotecas cliente ejecutan la devolución de llamada de forma secuencial. El servicio Pub/Sub entrega estos mensajes al mismo suscriptor en función del mejor esfuerzo.
StreamingPull tiene una tasa de error del 100% (es de esperar)
Las transmisiones de StreamingPull siempre se cierran con un estado que no es correcto. Ten en cuenta que, a diferencia de los RPC normales, el estado aquí es solo un indicador de que se ha interrumpido un flujo, no de que las solicitudes estén fallando. Por lo tanto, aunque la API de StreamingPull puede tener una tasa de error del 100% que parece sorprendente, así se diseñó.
Diagnostica errores de StreamingPull
Debido a que las transmisiones de StreamingPull siempre finalizan con un error, no es útil examinar las métricas de finalización de la transmisión cuando diagnosticas errores. Más bien, enfócate en las métricas de operación de mensajes de StreamingPull (subscription/streaming_pull_message_operation_count
). Busca estos errores:
- Los errores
FAILED_PRECONDITION
se pueden producir en estos casos:- Pub/Sub intenta desencriptar un mensaje con una clave de Cloud KMS inhabilitada.
- Las suscripciones pueden suspenderse de forma temporal si hay mensajes en los trabajos acumulados de suscripciones que están encriptadas por una clave de Cloud KMS inhabilitada.
- Errores
UNAVAILABLE
StreamingPull: lidia con una gran cantidad de mensajes pendientes
La pila gRPC de StreamingPull está optimizada para lograr una gran capacidad de procesamiento y, por lo tanto, almacena los mensajes en el búfer. Esto puede tener algunas consecuencias si estás intentando procesar grandes acumulaciones de mensajes pequeños (en lugar de un flujo constante de mensajes nuevos). En estas condiciones, es posible que veas mensajes entregados varias veces y que la carga se balancee de manera efectiva entre los clientes.
El búfer entre el servicio de Pub/Sub y el espacio de usuario de la biblioteca cliente es de alrededor de 10 MB. Para comprender el impacto de este búfer en el comportamiento de la biblioteca cliente, considera el siguiente ejemplo:
- Hay una acumulación de 10,000 mensajes de 1 KB en una suscripción.
- Cada mensaje tarda 1 segundo en procesarse de forma secuencial, mediante una instancia de cliente de un solo subproceso.
- La primera instancia de cliente que establezca una conexión StreamingPull al servicio para esa suscripción llenará su búfer con los 10,000 mensajes.
- Se necesitan 10,000 segundos (casi 3 horas) para procesar el búfer.
- En ese período, algunos de los mensajes almacenados en búfer exceden su fecha límite de confirmación de recepción y se vuelven a enviar al mismo cliente, lo que genera duplicados.
- Cuando se ejecutan varias instancias de clientes, los mensajes atascados en el búfer de un cliente no estarán disponibles para ninguna otra instancia del cliente.
Esta situación no ocurrirá si los mensajes llegan a una velocidad constante (en lugar de como un lote grande): el servicio nunca tiene los 10 MB completos de mensajes a la vez, por lo que puede balancear las cargas de mensajes de manera eficaz entre varios suscriptores.
Para abordar esta situación, usa una suscripción de envío o la API de extracción, ahora disponible en algunas de las bibliotecas cliente de Cloud, (consulta la sección de extracción síncrona) y en todas las bibliotecas cliente de la API. Si quieres obtener más información, consulta la documentación de bibliotecas cliente .
Extracción síncrona
Hay casos en que la extracción asíncrona no es la mejor opción para tu aplicación. Por ejemplo, la lógica de la aplicación podría depender de un patrón de sondeo para recuperar mensajes o requerir un límite preciso en una cantidad de mensajes recuperados por el cliente en un momento dado. Para asistir tales aplicaciones, el servicio admite un método de extracción síncrona.
Este es un ejemplo de código para extraer y confirmar una cantidad fija de mensajes:
C#
Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.
Go
Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.
Java
Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.
Node.js
Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.
PHP
Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.
Protocolo
Solicitud:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
"returnImmediately": "false",
"maxMessages": "1"
}
Respuesta:
200 OK
{
"receivedMessages": [{
"ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
"message": {
"data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
"messageId": "19917247034"
}
}]
}
Solicitud:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
"ackIds": [
"dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
]
}
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.
Ruby
Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.
Pub/Sub entrega una lista de mensajes. Si la lista tiene varios mensajes, Pub/Sub los ordena con la misma clave de ordenamiento.
Ten en cuenta que, para lograr una baja latencia de entrega de mensajes con extracción síncrona, es importante tener muchas solicitudes de extracción pendientes al mismo tiempo. A medida que aumenta la capacidad de procesamiento del tema, se necesitan más solicitudes de extracción. En general, se prefiere la extracción síncrona para aplicaciones sensibles a la latencia.
Extracción síncrona con administración de la asignación de tiempo
El procesamiento de un mensaje individual puede exceder el plazo de confirmación preconfigurado, también conocido como asignación de tiempo. Para evitar el reenvío de estos mensajes, las bibliotecas cliente proporcionan una manera de restablecer sus plazos de confirmación (excepto las bibliotecas cliente de Go, que modifican de forma automática los plazos de confirmación para los mensajes sondeados), como se muestra en los ejemplos a continuación:
C#
Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.
Java
Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.
Node.js
Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.
Python
Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.
Ruby
Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.
Escalamiento
Es posible que debas implementar un mecanismo de escalamiento para que tu aplicación de suscriptor se mantenga al día con el volumen de mensajes. Cómo hacerlo depende de tu entorno, pero en general se basará en las métricas de trabajo pendiente que se ofrecen a través del servicio de supervisión del conjunto de operaciones de Google Cloud. Si quieres obtener detalles sobre cómo hacer esto para Compute Engine, consulta Ajusta la escala según las métricas de Stackdriver Monitoring.
Ve a la sección de Pub/Sub de la lista de métricas de GCP para saber qué métricas se pueden supervisar de manera programática.
Por último, como con todos los servicios distribuidos, puede que se vuelva a intentar cada solicitud de vez en cuando.
Detecta mensajes duplicados y fuerza reintentos
Cuando no confirmas un mensaje antes de que venza el plazo de confirmación, Pub/Sub vuelve a enviar el mensaje. Como resultado, Pub/Sub puede enviar mensajes duplicados. Usa el paquete de operaciones de Google Cloud para supervisar operaciones de confirmación con el código de respuesta expired
para detectar esta condición. Si quieres obtener estos datos, selecciona la métrica Operaciones de confirmación de mensajes (Acknowledge message operations) y, luego, agrúpala o fíltrala por la etiqueta response_code
. Ten en cuenta que response_code
es una etiqueta del sistema en una métrica; no es una métrica.

Para reducir la tasa de duplicación, extiende el plazo del mensaje.
- Las bibliotecas cliente manejan la extensión de plazos de forma automática, pero debes tener en cuenta que existen límites máximos predeterminados para la extensión que se pueden configurar.
- Si creas tu propia biblioteca cliente, usa el método
modifyAckDeadline
para extender el plazo de confirmación.
Como alternativa, para forzar a Pub/Sub a volver a enviar un mensaje, establece modifyAckDeadline
en 0.