Recibe mensajes mediante extracción

Pub/Sub admite la entrega de mensajes de envío y extracción. Si quieres obtener una descripción general y una comparación de suscripciones de extracción y de envío, consulta la descripción general de los suscriptores. En este documento, se describe la entrega de extracción. Si quieres obtener más información sobre la entrega de envío, consulta la guía del suscriptor de envío.

Extracción asíncrona

El uso de la extracción asíncrona proporciona una mayor capacidad de procesamiento en tu aplicación, ya que no requiere que tu aplicación bloquee los mensajes nuevos. Tu aplicación puede recibir mensajes con agente de escucha de mensajes de larga duración, y se puede confirmar un mensaje a la vez, como se muestra en el ejemplo a continuación. Los clientes Java, Python, .NET, Go y Ruby usan la API del servicio de streamingPull para implementar la API del cliente asíncrona de manera eficiente.

No todas las bibliotecas cliente admiten la extracción asíncrona de mensajes. Si quieres obtener más información sobre cómo hacer una extracción de mensajes síncrona, consulta la página sobre extracción síncrona.

Si quieres obtener más información, consulta la documentación de referencia de la API en tu lenguaje de programación.

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración para C# en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName);
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración para Go en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"
	"sync"

	"cloud.google.com/go/pubsub"
)

func pullMsgs(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// Publish 10 messages on the topic.
	var results []*pubsub.PublishResult
	for i := 0; i < 10; i++ {
		res := topic.Publish(ctx, &pubsub.Message{
			Data: []byte(fmt.Sprintf("hello world #%d", i)),
		})
		results = append(results, res)
	}

	// Check that all messages were published.
	for _, r := range results {
		_, err := r.Get(ctx)
		if err != nil {
			return fmt.Errorf("Get: %v", err)
		}
	}
	// Consume 10 messages.
	var mu sync.Mutex
	received := 0
	sub := client.Subscription(subID)
	cctx, cancel := context.WithCancel(ctx)
	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
		mu.Lock()
		defer mu.Unlock()
		received++
		if received == 10 {
			cancel()
		}
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración para Java en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

ProjectSubscriptionName subscriptionName =
    ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync().awaitRunning();
  // Allow the subscriber to run indefinitely unless an unrecoverable error occurs
  subscriber.awaitTerminated();
} finally {
  // Stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración para Node.js en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración para Python en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
    time.sleep(60)

Procesa atributos personalizados

En este ejemplo, se muestra cómo extraer mensajes de forma asíncrona y recuperar los atributos personalizados de los metadatos:

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración para Python en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message.data))
    if message.attributes:
        print("Attributes:")
        for key in message.attributes:
            value = message.attributes.get(key)
            print("{}: {}".format(key, value))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
    time.sleep(60)

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración para Ruby en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Soluciona errores

En este ejemplo, se muestra cómo resolver los errores que surgen cuando se suscriben mensajes:

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración para Go en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsError(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// If the service returns a non-retryable error, Receive returns that error after
	// all of the outstanding calls to the handler have returned.
	err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración para Go en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración para Node.js en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function(message) {
  // Do something with the message
  console.log(`Message: ${message}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Create an event handler to handle errors
const errorHandler = function(error) {
  // Do something with the error
  console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);

setTimeout(() => {
  subscription.removeListener(`message`, messageHandler);
  subscription.removeListener(`error`, errorHandler);
}, timeout * 1000);

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración para Python en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

# TODO project_id        = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pubsub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
try:
    # When timeout is unspecified, the result method waits indefinitely.
    future.result(timeout=30)
except Exception as e:
    print(
        "Listening for messages on {} threw an Exception: {}.".format(
            subscription_name, e
        )
    )

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración para Go en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de la Pub/Sub para Go.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => e
  puts "Exception #{e.inspect}: #{e.message}"
  raise "Stopped listening for messages."
end

Control de flujo de mensajes

Es posible que tu cliente suscriptor procese y confirme los mensajes de forma más lenta de lo que Pub/Sub los envía al cliente. En este caso, puede ocurrir lo siguiente:

  • Puede ser que un cliente tenga una acumulación de mensajes porque no tiene la capacidad de procesar el volumen de mensajes entrantes, pero otro cliente en la red sí tenga esa capacidad. El segundo cliente podría reducir el trabajo acumulado de la suscripción, pero no tiene la oportunidad de hacerlo porque el primer cliente mantiene una asignación de tiempo en los mensajes que recibe. Esto reduce la tasa general de procesamiento ya que los mensajes se atascan en el primer cliente.

  • Debido a que la biblioteca cliente extiende muchas veces la fecha límite de confirmación para los mensajes atrasados, esos mensajes continúan consumiendo recursos de memoria, CPU y ancho de banda. Por lo tanto, es posible que el cliente suscriptor se quede sin recursos (como la memoria). Esto puede afectar de forma negativa la capacidad de procesamiento y la latencia del procesamiento de mensajes.

Para mitigar los problemas anteriores, usa las funciones de control de flujo del suscriptor para controlar la velocidad a la que el suscriptor recibe los mensajes. Estas funciones de control de flujo se ilustran en los siguientes ejemplos:

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración para C# en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName,
    settings: new SubscriberClient.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        AckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandingByteCount: 10240)
    });
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración para Go en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsSettings(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	sub := client.Subscription(subID)
	sub.ReceiveSettings.Synchronous = true
	// MaxOutstandingMessages is the maximum number of unprocessed messages the
	// client will pull from the server before pausing.
	//
	// This is only guaranteed when ReceiveSettings.Synchronous is set to true.
	// When Synchronous is set to false, the StreamingPull RPC is used which
	// can pull a single large batch of messages at once that is greater than
	// MaxOustandingMessages before pausing. For more info, see
	// https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages.
	sub.ReceiveSettings.MaxOutstandingMessages = 10
	// MaxOutstandingBytes is the maximum size of unprocessed messages,
	// that the client will pull from the server before pausing. Similar
	// to MaxOutstandingMessages, this may be exceeded with a large batch
	// of messages since we cannot control the size of a batch of messages
	// from the server (even with the synchronous Pull RPC).
	sub.ReceiveSettings.MaxOutstandingBytes = 1e10
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración para Java en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder()
        .setMaxOutstandingElementCount(10_000L)
        .setMaxOutstandingRequestBytes(1_000_000_000L)
        .build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración para Node.js en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const maxInProgress = 5;
// const timeout = 10;

const subscriberOptions = {
  flowControl: {
    maxMessages: maxInProgress,
  },
};

// References an existing subscription.
// Note that flow control settings are not persistent across subscribers.
const subscription = pubsub.subscription(subscriptionName, subscriberOptions);

console.log(
  `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
);

const messageHandler = message => {
  console.log(`Received message: ${message.id}`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.close();
}, timeout * 1000);

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración para Python en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control
)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
    time.sleep(60)

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración para Ruby en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

En términos más generales, la necesidad de control de flujo indica que los mensajes se publican a una frecuencia mayor de la que se consumen. Si se trata de un estado persistente, en lugar de un pico transitorio del volumen de mensajes, considera aumentar la cantidad de instancias de cliente suscriptor.

Control de simultaneidad

La asistencia para la simultaneidad depende de tu lenguaje de programación. A fin de implementar lenguajes que sean compatibles con subprocesos paralelos, como Java y Go, las bibliotecas cliente realizan una elección predeterminada para la cantidad de subprocesos. Es posible que esta opción no sea óptima para tu aplicación. Por ejemplo, si tu aplicación de suscriptor no mantiene el ritmo del volumen de mensajes entrantes y no está vinculada a la CPU, debes aumentar el conteo de subprocesos. En el caso de las operaciones de procesamiento de mensajes con uso intensivo de CPU, podría ser conveniente reducir la cantidad de subprocesos.

En el siguiente ejemplo, se muestra cómo controlar la simultaneidad en un suscriptor:

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración para Go en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. SI quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"
	"runtime"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsConcurrenyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
	// concurrency settings. Otherwise, NumGoroutines will be set to 1.
	sub.ReceiveSettings.Synchronous = false
	// NumGoroutines is the number of goroutines sub.Receive will spawn to pull messages concurrently.
	sub.ReceiveSettings.NumGoroutines = runtime.NumCPU()

	// Receive messages for 10 seconds.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Create a channel to handle messages to as they come in.
	cm := make(chan *pubsub.Message)
	// Handle individual messages in a goroutine.
	go func() {
		for {
			select {
			case msg := <-cm:
				fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
				msg.Ack()
			case <-ctx.Done():
				return
			}
		}
	}()

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		cm <- msg
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	close(cm)

	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración para Java en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración para Ruby en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

La asistencia para la simultaneidad depende de tu lenguaje de programación. Consulta la documentación de referencia de la API para obtener más información.

StreamingPull

En el servicio Pub/Sub existen dos API para recuperar mensajes:

Siempre que sea posible, en las bibliotecas cliente de Cloud se usa StreamingPull para obtener la mayor capacidad de procesamiento y la latencia más baja. Aunque es posible que nunca uses directamente la API de StreamingPull, es importante comprender algunas propiedades cruciales de StreamingPull y cómo difiere del método de extracción más tradicional.

El método de extracción se basa en un modelo de solicitud/respuesta:

  1. La aplicación envía una solicitud de mensajes.
  2. El servidor responde con cero o más mensajes y cierra la conexión.

La API del servicio StreamingPull se basa en una conexión bidireccional persistente para recibir múltiples mensajes a medida que estén disponibles, sean confirmados y se modifiquen los plazos de confirmación:

  1. El cliente envía una solicitud al servicio para establecer una conexión.
  2. El cliente usa esa conexión para intercambiar datos de mensajes.
  3. El cliente o el servidor finaliza la solicitud (es decir, la conexión bidireccional).

StreamingPull tiene una tasa de error del 100% (es de esperar)

Las transmisiones de StreamingPull siempre finalizan con un código de error. Ten en cuenta que, a diferencia de los RPC normales, el error aquí es solo un indicador de que se ha interrumpido un flujo, no de que las solicitudes estén fallando. Por lo tanto, aunque la API de StreamingPull puede tener una tasa de error del 100% que parece sorprendente, así se diseñó.

Diagnostica errores de StreamingPull

Debido a que las transmisiones de StreamingPull siempre finalizan con un error, no es útil examinar las métricas de solicitud de StreamingPull cuando diagnosticas errores. Más bien, concéntrate en las métricas de operación de mensajes de StreamingPull. Busca estos errores:

  • Los errores FAILED_PRECONDITION se pueden producir en estos casos:
    • Pub/Sub intenta desencriptar un mensaje con una clave de Cloud KMS inhabilitada.
    • La transmisión se cierra debido a una suscripción suspendida. Las suscripciones pueden suspenderse de forma temporal si hay mensajes en los trabajos acumulados de suscripciones que están protegidos por una clave de Cloud KMS inhabilitada.
  • Errores UNAVAILABLE

StreamingPull: lidia con una gran cantidad de mensajes pendientes

La pila gRPC de StreamingPull está optimizada para lograr una gran capacidad de procesamiento y, por lo tanto, almacena los mensajes en el búfer. Esto puede tener algunas consecuencias si estás intentando procesar grandes acumulaciones de mensajes pequeños (en lugar de un flujo constante de mensajes nuevos). En estas condiciones, es posible que veas mensajes entregados varias veces y que la carga se balancee de manera efectiva entre los clientes.

El búfer entre el servicio de Pub/Sub y el espacio de usuario de la biblioteca cliente es de alrededor de 10 MB. Para comprender el impacto de este búfer en el comportamiento de la biblioteca cliente, considera el siguiente ejemplo:

  • Hay una acumulación de 10,000 mensajes de 1 KB en una suscripción.
  • Cada mensaje tarda 1 segundo en procesarse de forma secuencial, mediante una instancia de cliente de un solo subproceso.
  • La primera instancia de cliente que establezca una conexión de StreamingPull con el servicio para esa suscripción obtendrá un búfer de los 10,000 mensajes completos.
  • Se necesitan 10,000 segundos (casi 3 horas) para procesar el búfer.
  • En ese período, algunos de los mensajes exceden su fecha límite de confirmación y se vuelven a enviar al mismo cliente, lo que genera duplicados.
  • Cuando se ejecutan varias instancias de clientes, los mensajes atascados en el búfer no estarán disponibles para ninguna otra instancia que no sea la primera.

Esta situación no ocurriría si los mensajes llegaran a una velocidad constante, en vez de en un solo lote grande. Los 10 MB del servicio nunca están completos de mensajes, por lo que puede balancear la carga de mensajes a través de varios suscriptores.

Para abordar esta situación, usa una suscripción de envío o una API de extracción, ahora disponible en algunas de las bibliotecas cliente de Cloud, (consulta la sección de extracción síncrona) y en todas las bibliotecas cliente de la API. Si quieres obtener más información, consulta la documentación de bibliotecas cliente .

Extracción síncrona

Hay casos en que la extracción asíncrona no es la mejor opción para tu aplicación. Por ejemplo, la lógica de la aplicación podría depender de un patrón de sondeo para recuperar mensajes o requerir un límite preciso en una cantidad de mensajes recuperados por el cliente en un momento dado. Para asistir tales aplicaciones, el servicio admite un método de extracción síncrona.

Este es un ejemplo de código para extraer y confirmar una cantidad fija de mensajes:

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración para C# en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient =
    SubscriberServiceApiClient.Create();
// Pull messages from server,
// allowing an immediate response if there are no messages.
PullResponse response = subscriberClient.Pull(
    subscriptionName, returnImmediately: true, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
    string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
    Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
}
// If acknowledgement required, send to server.
if (acknowledge)
{
    subscriberClient.Acknowledge(subscriptionName,
        response.ReceivedMessages.Select(msg => msg.AckId));
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración para Java en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

SubscriberStubSettings subscriberStubSettings =
    SubscriberStubSettings.newBuilder()
        .setTransportChannelProvider(
            SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                .setMaxInboundMessageSize(20 << 20) // 20MB
                .build())
        .build();

try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

PHP

Antes de probar este ejemplo, sigue las instrucciones de configuración para Node.js en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Protocol

Solicitud:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

{
  "returnImmediately": "false",
  "maxMessages": "1"
}

Respuesta:

200 OK

{
  "receivedMessages": [{
    "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
    "message": {
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
      "messageId": "19917247034"
    }
  }]
}

Solicitud:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración para Python en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

NUM_MESSAGES = 3

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

ack_ids = []
for received_message in response.received_messages:
    print("Received: {}".format(received_message.message.data))
    ack_ids.append(received_message.ack_id)

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)

print(
    "Received and acknowledged {} messages. Done.".format(
        len(response.received_messages)
    )
)

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración para Ruby en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

Ten en cuenta que, para lograr una baja latencia de entrega de mensajes con extracción síncrona, es importante tener muchas solicitudes de extracción pendientes al mismo tiempo. A medida que aumenta la capacidad de procesamiento del tema, se necesitan más solicitudes de extracción. En general, se prefiere la extracción síncrona para aplicaciones sensibles a la latencia.

Extracción síncrona con administración de la asignación de tiempo

El procesamiento de un mensaje individual puede exceder el plazo de confirmación preconfigurado, también conocido como asignación de tiempo. Para evitar el reenvío de estos mensajes, las bibliotecas cliente proporcionan una manera de restablecer sus plazos de confirmación (excepto las bibliotecas cliente de Go, que modifican de forma automática los plazos de confirmación para los mensajes sondeados), como se muestra en los ejemplos a continuación:

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración para Go en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"
	"time"

	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"cloud.google.com/go/pubsub"
)

func pullMsgsSync(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Turn on synchronous mode. This makes the subscriber use the Pull RPC rather
	// than the StreamingPull RPC, which is useful for guaranteeing MaxOutstandingMessages,
	// the max number of messages the client will hold in memory at a time.
	sub.ReceiveSettings.Synchronous = true
	sub.ReceiveSettings.MaxOutstandingMessages = 10

	// Receive messages for 5 seconds.
	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	// Create a channel to handle messages to as they come in.
	cm := make(chan *pubsub.Message)
	// Handle individual messages in a goroutine.
	go func() {
		for {
			select {
			case msg := <-cm:
				fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
				msg.Ack()
			case <-ctx.Done():
				return
			}
		}
	}()

	// Receive blocks until the passed in context is done.
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		cm <- msg
	})
	if err != nil && status.Code(err) != codes.Canceled {
		return fmt.Errorf("Receive: %v", err)
	}
	close(cm)

	return nil
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

// Imports the Google Cloud client library
const pubsub = require('@google-cloud/pubsub');

const client = new pubsub.v1.SubscriberClient();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectName = 'your-project';
// const subscriptionName = 'your-subscription';

const formattedSubscription = client.subscriptionPath(
  projectName,
  subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
  subscription: formattedSubscription,
  maxMessages: maxMessages,
};

let isProcessed = false;

// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
  console.log(`Processing "${message.message.data}"...`);

  setTimeout(() => {
    console.log(`Finished procesing "${message.message.data}".`);
    isProcessed = true;
  }, 30000);
}

// The subscriber pulls a specified number of messages.
const [response] = await client.pull(request);
// Obtain the first message.
const message = response.receivedMessages[0];
// Send the message to the worker function.
worker(message);

let waiting = true;
while (waiting) {
  await new Promise(r => setTimeout(r, 10000));
  // If the message has been processed..
  if (isProcessed) {
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
    };

    //..acknowledges the message.
    await client.acknowledge(ackRequest);
    console.log(`Acknowledged: "${message.message.data}".`);
    // Exit after the message is acknowledged.
    waiting = false;
    console.log(`Done.`);
  } else {
    // If the message is not yet processed..
    const modifyAckRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
      ackDeadlineSeconds: newAckDeadlineSeconds,
    };

    //..reset its ack deadline.
    await client.modifyAckDeadline(modifyAckRequest);

    console.log(
      `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
    );
  }
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración para Python en la guía de inicio rápido de Cloud Pub/Sub: usa bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

import logging
import multiprocessing
import random
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

NUM_MESSAGES = 2
ACK_DEADLINE = 30
SLEEP_TIME = 10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
    """Simulates a long-running process."""
    RUN_TIME = random.randint(1, 60)
    logger.info(
        "{}: Running {} for {}s".format(
            time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME
        )
    )
    time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
    process = multiprocessing.Process(target=worker, args=(message,))
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is still running, reset the ack deadline as
        # specified by ACK_DEADLINE once every while as specified
        # by SLEEP_TIME.
        if process.is_alive():
            # `ack_deadline_seconds` must be between 10 to 600.
            subscriber.modify_ack_deadline(
                subscription_path,
                [ack_id],
                ack_deadline_seconds=ACK_DEADLINE,
            )
            logger.info(
                "{}: Reset ack deadline for {} for {}s".format(
                    time.strftime("%X", time.gmtime()),
                    msg_data,
                    ACK_DEADLINE,
                )
            )

        # If the processs is finished, acknowledges using `ack_id`.
        else:
            subscriber.acknowledge(subscription_path, [ack_id])
            logger.info(
                "{}: Acknowledged {}".format(
                    time.strftime("%X", time.gmtime()), msg_data
                )
            )
            processes.pop(process)

    # If there are still processes running, sleeps the thread.
    if processes:
        time.sleep(SLEEP_TIME)

print(
    "Received and acknowledged {} messages. Done.".format(
        len(response.received_messages)
    )
)

Escalamiento

Es posible que debas implementar un mecanismo de escalamiento para que tu aplicación de suscriptor se mantenga al día con el volumen de mensajes. La forma de hacerlo dependerá de tu entorno, pero, en general, se basa en las métricas de trabajos acumulados que se ofrecen a través del servicio de supervisión de Stackdriver. Si quieres obtener detalles sobre cómo hacer esto para Compute Engine, consulta Ajusta la escala según las métricas de Stackdriver Monitoring.

Ve a la sección de Pub/Sub de la lista de métricas de GCP para saber qué métricas se pueden supervisar de manera programática.

Por último, como con todos los servicios distribuidos, puede que se vuelva a intentar cada solicitud de vez en cuando.

Detecta mensajes duplicados y fuerza reintentos

Cuando no confirmas un mensaje antes de que venza el plazo de confirmación, Pub/Sub vuelve a enviar el mensaje. Como resultado, Pub/Sub puede enviar mensajes duplicados. Usa Stackdriver para supervisar operaciones de confirmación con el código de respuesta expired para detectar esta condición. Si quieres obtener estos datos, selecciona la métrica Operaciones de confirmación de mensajes (Acknowledge message operations) y, luego, agrúpala o fíltrala por la etiqueta response_code. Ten en cuenta que response_code es una etiqueta del sistema en una métrica; no es una métrica.

Usa Stackdriver para buscar plazos límite de confirmación de mensajes vencidos.

Para reducir la tasa de duplicación, extiende el plazo del mensaje.

  • Las bibliotecas cliente manejan la extensión de plazos de forma automática, pero debes tener en cuenta que existen límites máximos predeterminados para la extensión que se pueden configurar.
  • Si creas tu propia biblioteca cliente, usa el método modifyAckDeadline para extender el plazo de confirmación.

Como alternativa, para forzar a Pub/Sub a volver a enviar un mensaje, establece modifyAckDeadline en 0.