Recibe mensajes mediante extracción

Pub/Sub admite la entrega de mensajes de envío y extracción. Si quieres obtener una descripción general y una comparación de suscripciones de extracción y de envío, consulta la descripción general de los suscriptores. En este documento, se describe la entrega de extracción. Si quieres obtener más información sobre la entrega de envío, consulta la guía del suscriptor de envío.

Extracción asíncrona

El uso de la extracción asíncrona proporciona una mayor capacidad de procesamiento en tu aplicación, ya que no requiere que tu aplicación bloquee los mensajes nuevos. Tu aplicación puede recibir mensajes con agente de escucha de mensajes de larga duración, y se puede confirmar un mensaje a la vez, como se muestra en el ejemplo a continuación. Los clientes Java, Python, .NET, Go y Ruby usan la API del servicio de streamingPull para implementar la API del cliente asíncrona de manera eficiente.

No todas las bibliotecas cliente admiten la extracción asíncrona de mensajes. Si quieres obtener más información sobre cómo hacer una extracción de mensajes síncrona, consulta la página sobre extracción síncrona.

Si quieres obtener más información, consulta la documentación de referencia de la API en tu lenguaje de programación.

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
        subscriptionId);
    SubscriberClient subscriber = await SubscriberClient.CreateAsync(
        subscriptionName);
    // SubscriberClient runs your message handle function on multiple
    // threads to maximize throughput.
    Task startTask = subscriber.StartAsync(
        async (PubsubMessage message, CancellationToken cancel) =>
        {
            string text =
                Encoding.UTF8.GetString(message.Data.ToArray());
            await Console.Out.WriteLineAsync(
                $"Message {message.MessageId}: {text}");
            return acknowledge ? SubscriberClient.Reply.Ack
                : SubscriberClient.Reply.Nack;
        });
    // Run for 3 seconds.
    await Task.Delay(3000);
    await subscriber.StopAsync(CancellationToken.None);

go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
    	"context"
    	"fmt"
    	"io"
    	"sync"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgs(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	// Publish 10 messages on the topic.
    	var results []*pubsub.PublishResult
    	for i := 0; i < 10; i++ {
    		res := topic.Publish(ctx, &pubsub.Message{
    			Data: []byte(fmt.Sprintf("hello world #%d", i)),
    		})
    		results = append(results, res)
    	}

    	// Check that all messages were published.
    	for _, r := range results {
    		_, err := r.Get(ctx)
    		if err != nil {
    			return fmt.Errorf("Get: %v", err)
    		}
    	}
    	// Consume 10 messages.
    	var mu sync.Mutex
    	received := 0
    	sub := client.Subscription(subID)
    	cctx, cancel := context.WithCancel(ctx)
    	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
    		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
    		msg.Ack()
    		mu.Lock()
    		defer mu.Unlock()
    		received++
    		if received == 10 {
    			cancel()
    		}
    	})
    	if err != nil {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	return nil
    }
    

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

String projectId = "my-project-id";
    String subscriptionId = "my-subscription-id";

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);
    // Instantiate an asynchronous message receiver
    MessageReceiver receiver =
        new MessageReceiver() {
          @Override
          public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            // handle incoming message, then ack/nack the received message
            System.out.println("Id : " + message.getMessageId());
            System.out.println("Data : " + message.getData().toStringUtf8());
            consumer.ack();
          }
        };

    Subscriber subscriber = null;
    try {
      // Create a subscriber for "my-subscription-id" bound to the message receiver
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      subscriber.startAsync().awaitRunning();
      // Allow the subscriber to run indefinitely unless an unrecoverable error occurs
      subscriber.awaitTerminated();
    } finally {
      // Stop receiving messages
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
    // const timeout = 60;

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    function listenForMessages() {
      // References an existing subscription
      const subscription = pubSubClient.subscription(subscriptionName);

      // Create an event handler to handle messages
      let messageCount = 0;
      const messageHandler = message => {
        console.log(`Received message ${message.id}:`);
        console.log(`\tData: ${message.data}`);
        console.log(`\tAttributes: ${message.attributes}`);
        messageCount += 1;

        // "Ack" (acknowledge receipt of) the message
        message.ack();
      };

      // Listen for new messages until timeout is hit
      subscription.on('message', messageHandler);

      setTimeout(() => {
        subscription.removeListener('message', messageHandler);
        console.log(`${messageCount} message(s) received.`);
      }, timeout * 1000);
    }

    listenForMessages();

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO timeout = 5.0  # "How long the subscriber should listen for
    # messages in seconds"

    subscriber = pubsub_v1.SubscriberClient()
    # The `subscription_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/subscriptions/{subscription_name}`
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    def callback(message):
        print("Received message: {}".format(message))
        message.ack()

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback
    )
    print("Listening for messages on {}..\n".format(subscription_path))

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result(timeout=timeout)
        except:  # noqa
            streaming_pull_future.cancel()

Procesa atributos personalizados

En este ejemplo, se muestra cómo extraer mensajes de forma asíncrona y recuperar los atributos personalizados de los metadatos:

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO timeout = 5.0  # "How long the subscriber should listen for
    # messages in seconds"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    def callback(message):
        print("Received message: {}".format(message.data))
        if message.attributes:
            print("Attributes:")
            for key in message.attributes:
                value = message.attributes.get(key)
                print("{}: {}".format(key, value))
        message.ack()

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback
    )
    print("Listening for messages on {}..\n".format(subscription_path))

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result(timeout=timeout)
        except:  # noqa
            streaming_pull_future.cancel()

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"
      unless received_message.attributes.empty?
        puts "Attributes:"
        received_message.attributes.each do |key, value|
          puts "#{key}: #{value}"
        end
      end
      received_message.acknowledge!
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!

Soluciona errores

En este ejemplo, se muestra cómo resolver los errores que surgen cuando se suscriben mensajes:

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
    	"context"
    	"fmt"
    	"io"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgsError(w io.Writer, projectID, subID string) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	// If the service returns a non-retryable error, Receive returns that error after
    	// all of the outstanding calls to the handler have returned.
    	err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
    		msg.Ack()
    	})
    	if err != nil {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	return nil
    }
    

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

subscriber.addListener(
        new Subscriber.Listener() {
          public void failed(Subscriber.State from, Throwable failure) {
            // Handle error.
          }
        },
        MoreExecutors.directExecutor());

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
    // const timeout = 10;

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    function listenForErrors() {
      // References an existing subscription
      const subscription = pubSubClient.subscription(subscriptionName);

      // Create an event handler to handle messages
      const messageHandler = function(message) {
        // Do something with the message
        console.log(`Message: ${message}`);

        // "Ack" (acknowledge receipt of) the message
        message.ack();
      };

      // Create an event handler to handle errors
      const errorHandler = function(error) {
        // Do something with the error
        console.error(`ERROR: ${error}`);
        throw error;
      };

      // Listen for new messages/errors until timeout is hit
      subscription.on('message', messageHandler);
      subscription.on('error', errorHandler);

      setTimeout(() => {
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
      }, timeout * 1000);
    }

    listenForErrors();

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

    # TODO project_id        = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pubsub subscription name"
    # TODO timeout = 5.0  # "How long the subscriber should listen for
    # messages in seconds"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    def callback(message):
        print("Received message: {}".format(message))
        message.ack()

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback
    )
    print("Listening for messages on {}..\n".format(subscription_path))

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        try:
            streaming_pull_future.result(timeout=timeout)
        except Exception as e:
            streaming_pull_future.cancel()
            print(
                "Listening for messages on {} threw an exception: {}.".format(
                    subscription_name, e
                )
            )

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"
      received_message.acknowledge!
    end
    # Propagate expection from child threads to the main thread as soon as it is
    # raised. Exceptions happened in the callback thread are collected in the
    # callback thread pool and do not propagate to the main thread
    Thread.abort_on_exception = true

    begin
      subscriber.start
      # Let the main thread sleep for 60 seconds so the thread for listening
      # messages does not quit
      sleep 60
      subscriber.stop.wait!
    rescue Exception => e
      puts "Exception #{e.inspect}: #{e.message}"
      raise "Stopped listening for messages."
    end

Control de flujo de mensajes

Es posible que tu cliente suscriptor procese y confirme los mensajes de forma más lenta de lo que Pub/Sub los envía al cliente. En este caso:

  • Puede ser que un cliente tenga una acumulación de mensajes porque no tiene la capacidad de procesar el volumen de mensajes entrantes, pero otro cliente en la red sí tenga esa capacidad. El segundo cliente podría reducir el trabajo acumulado de la suscripción, pero no tiene la oportunidad de hacerlo porque el primer cliente mantiene una asignación de tiempo en los mensajes que recibe. Esto reduce la tasa general de procesamiento, ya que los mensajes se atascan en el primer cliente.

  • Debido a que la biblioteca cliente extiende muchas veces la fecha límite de confirmación para los mensajes atrasados, esos mensajes continúan consumiendo recursos de memoria, CPU y ancho de banda. Por lo tanto, es posible que el cliente suscriptor se quede sin recursos (como la memoria). Esto puede afectar de forma negativa la capacidad de procesamiento y la latencia del procesamiento de mensajes.

Para mitigar los problemas anteriores, usa las funciones de control de flujo del suscriptor para controlar la velocidad a la que el suscriptor recibe los mensajes. Estas funciones de control de flujo se ilustran en los siguientes ejemplos:

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
        subscriptionId);
    SubscriberClient subscriber = await SubscriberClient.CreateAsync(
        subscriptionName,
        settings: new SubscriberClient.Settings()
        {
            AckExtensionWindow = TimeSpan.FromSeconds(4),
            Scheduler = Google.Api.Gax.SystemScheduler.Instance,
            AckDeadline = TimeSpan.FromSeconds(10),
            FlowControlSettings = new Google.Api.Gax
                .FlowControlSettings(
                maxOutstandingElementCount: 100,
                maxOutstandingByteCount: 10240)
        });
    // SubscriberClient runs your message handle function on multiple
    // threads to maximize throughput.
    Task startTask = subscriber.StartAsync(
        async (PubsubMessage message, CancellationToken cancel) =>
        {
            string text =
                Encoding.UTF8.GetString(message.Data.ToArray());
            await Console.Out.WriteLineAsync(
                $"Message {message.MessageId}: {text}");
            return acknowledge ? SubscriberClient.Reply.Ack
                : SubscriberClient.Reply.Nack;
        });
    // Run for 3 seconds.
    await Task.Delay(3000);
    await subscriber.StopAsync(CancellationToken.None);

go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
    	"context"
    	"fmt"
    	"io"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgsSettings(w io.Writer, projectID, subID string) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	sub := client.Subscription(subID)
    	sub.ReceiveSettings.Synchronous = true
    	// MaxOutstandingMessages is the maximum number of unprocessed messages the
    	// client will pull from the server before pausing.
    	//
    	// This is only guaranteed when ReceiveSettings.Synchronous is set to true.
    	// When Synchronous is set to false, the StreamingPull RPC is used which
    	// can pull a single large batch of messages at once that is greater than
    	// MaxOustandingMessages before pausing. For more info, see
    	// https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages.
    	sub.ReceiveSettings.MaxOutstandingMessages = 10
    	// MaxOutstandingBytes is the maximum size of unprocessed messages,
    	// that the client will pull from the server before pausing. Similar
    	// to MaxOutstandingMessages, this may be exceeded with a large batch
    	// of messages since we cannot control the size of a batch of messages
    	// from the server (even with the synchronous Pull RPC).
    	sub.ReceiveSettings.MaxOutstandingBytes = 1e10
    	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
    		msg.Ack()
    	})
    	if err != nil {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	return nil
    }
    

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

FlowControlSettings flowControlSettings =
        FlowControlSettings.newBuilder()
            .setMaxOutstandingElementCount(10_000L)
            .setMaxOutstandingRequestBytes(1_000_000_000L)
            .build();
    Subscriber subscriber =
        Subscriber.newBuilder(subscriptionName, receiver)
            .setFlowControlSettings(flowControlSettings)
            .build();

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
    // const maxInProgress = 5;
    // const timeout = 10;

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    async function subscribeWithFlowControlSettings() {
      const subscriberOptions = {
        flowControl: {
          maxMessages: maxInProgress,
        },
      };

      // References an existing subscription.
      // Note that flow control settings are not persistent across subscribers.
      const subscription = pubSubClient.subscription(
        subscriptionName,
        subscriberOptions
      );

      console.log(
        `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
      );

      const messageHandler = message => {
        console.log(`Received message: ${message.id}`);
        console.log(`\tData: ${message.data}`);
        console.log(`\tAttributes: ${message.attributes}`);

        // "Ack" (acknowledge receipt of) the message
        message.ack();
      };

      subscription.on(`message`, messageHandler);

      setTimeout(() => {
        subscription.close();
      }, timeout * 1000);
    }
    

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO timeout = 5.0  # "How long the subscriber should listen for
    # messages in seconds"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    def callback(message):
        print("Received message: {}".format(message.data))
        message.ack()

    # Limit the subscriber to only have ten outstanding messages at a time.
    flow_control = pubsub_v1.types.FlowControl(max_messages=10)

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback, flow_control=flow_control
    )
    print("Listening for messages on {}..\n".format(subscription_path))

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result(timeout=timeout)
        except:  # noqa
            streaming_pull_future.cancel()

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    subscriber   = subscription.listen inventory: 10 do |received_message|
      puts "Received message: #{received_message.data}"
      received_message.acknowledge!
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!

En términos más generales, la necesidad de control de flujo indica que los mensajes se publican a una frecuencia mayor de la que se consumen. Si se trata de un estado persistente, en lugar de un pico transitorio del volumen de mensajes, considera aumentar la cantidad de instancias de cliente suscriptor.

Control de simultaneidad

La asistencia para la simultaneidad depende de tu lenguaje de programación. A fin de implementar lenguajes que sean compatibles con subprocesos paralelos, como Java y Go, las bibliotecas cliente realizan una elección predeterminada para la cantidad de subprocesos. Es posible que esta opción no sea óptima para tu aplicación. Por ejemplo, si tu aplicación de suscriptor no mantiene el ritmo del volumen de mensajes entrantes y no está vinculada a la CPU, debes aumentar el conteo de subprocesos. En el caso de las operaciones de procesamiento de mensajes con uso intensivo de CPU, podría ser conveniente reducir la cantidad de subprocesos.

En el siguiente ejemplo, se muestra cómo controlar la simultaneidad en un suscriptor:

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
    	"context"
    	"fmt"
    	"io"
    	"runtime"
    	"time"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgsConcurrenyControl(w io.Writer, projectID, subID string) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}
    	defer client.Close()

    	sub := client.Subscription(subID)
    	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
    	// concurrency settings. Otherwise, NumGoroutines will be set to 1.
    	sub.ReceiveSettings.Synchronous = false
    	// NumGoroutines is the number of goroutines sub.Receive will spawn to pull messages concurrently.
    	sub.ReceiveSettings.NumGoroutines = runtime.NumCPU()

    	// Receive messages for 10 seconds.
    	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    	defer cancel()

    	// Create a channel to handle messages to as they come in.
    	cm := make(chan *pubsub.Message)
    	// Handle individual messages in a goroutine.
    	go func() {
    		for {
    			select {
    			case msg := <-cm:
    				fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
    				msg.Ack()
    			case <-ctx.Done():
    				return
    			}
    		}
    	}()

    	// Receive blocks until the context is cancelled or an error occurs.
    	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    		cm <- msg
    	})
    	if err != nil {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	close(cm)

    	return nil
    }
    

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

// Provides an executor service for processing messages. The default
    // `executorProvider` used by the subscriber has a default thread count of 5.
    ExecutorProvider executorProvider =
        InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

    // `setParallelPullCount` determines how many StreamingPull streams the
    // subscriber will open to receive message. It defaults to 1.
    // `setExecutorProvider` configures an executor for the subscriber to
    // process messages.
    // Here, the subscriber is configured to open 2 streams for receiving
    // messages, each stream creates a new executor with 4 threads to help
    // process the message callbacks. In total 2x4=8 threads are used for
    // message processing.
    Subscriber subscriber =
        Subscriber.newBuilder(subscriptionName, receiver)
            .setParallelPullCount(2)
            .setExecutorProvider(executorProvider)
            .build();

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    # Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
    # for sending acknowledgements and/or delays
    subscriber   = subscription.listen streams: 2, threads: {
      callback: 4,
      push:     2
    } do |received_message|
      puts "Received message: #{received_message.data}"
      received_message.acknowledge!
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!

La asistencia para la simultaneidad depende de tu lenguaje de programación. Consulta la documentación de referencia de la API para obtener más información.

StreamingPull

En el servicio Pub/Sub existen dos API para recuperar mensajes:

Siempre que sea posible, en las bibliotecas cliente de Cloud se usa StreamingPull para obtener la mayor capacidad de procesamiento y la latencia más baja. Aunque es posible que nunca uses la API de StreamingPull directamente, es importante comprender algunas propiedades fundamentales de StreamingPull y cómo se diferencia del método de extracción más tradicional.

El método de extracción se basa en un modelo de solicitud/respuesta:

  1. El cliente envía una solicitud de mensajes al servidor.
  2. El servidor responde con cero o más mensajes y cierra la conexión.

La API de servicio de StreamingPull depende de una conexión bidireccional persistente para recibir varios mensajes a medida que estén disponibles:

  1. El cliente envía una solicitud al servidor para establecer una conexión.
  2. El servidor envía mensajes de forma continua al cliente conectado.
  3. El cliente o el servidor terminan la conexión.

StreamingPull tiene una tasa de error del 100% (es de esperar)

Las transmisiones de StreamingPull siempre finalizan con un estado non-OK. Ten en cuenta que, a diferencia de las RPC normales, el estado aquí es simplemente una indicación de que la transmisión se interrumpió, no de que las solicitudes estén fallando. Por lo tanto, aunque la API de StreamingPull puede tener una tasa de error del 100% que parece sorprendente, así se diseñó.

Diagnostica errores de StreamingPull

Debido a que las transmisiones de StreamingPull siempre finalizan con un error, no es útil examinar las métricas de terminación de la transmisión mientras diagnostican errores. En su lugar, concéntrate en la métrica de operación de mensajes de StreamingPull (subscription/streaming_pull_message_operation_count). Busca estos errores:

  • Los errores FAILED_PRECONDITION se pueden producir en estos casos:
    • Pub/Sub intenta desencriptar un mensaje con una clave de Cloud KMS inhabilitada.
    • Las suscripciones pueden suspenderse temporalmente si hay mensajes en la acumulación de suscripciones encriptados con una clave de Cloud KMS inhabilitada.
  • Errores UNAVAILABLE

StreamingPull: lidia con una gran cantidad de mensajes pendientes

La pila gRPC de StreamingPull está optimizada para lograr una gran capacidad de procesamiento y, por lo tanto, almacena los mensajes en el búfer. Esto puede tener algunas consecuencias si estás intentando procesar grandes acumulaciones de mensajes pequeños (en lugar de un flujo constante de mensajes nuevos). En estas condiciones, es posible que veas mensajes entregados varias veces y que la carga se balancee de manera efectiva entre los clientes.

El búfer entre el servicio de Pub/Sub y el espacio de usuario de la biblioteca cliente es de alrededor de 10 MB. Para comprender el impacto de este búfer en el comportamiento de la biblioteca cliente, considera el siguiente ejemplo:

  • Hay una acumulación de 10,000 mensajes de 1 KB en una suscripción.
  • Cada mensaje tarda 1 segundo en procesarse de forma secuencial, mediante una instancia de cliente de un solo subproceso.
  • La primera instancia de cliente que establezca una conexión StreamingPull con el servicio para esa suscripción llenará su búfer con los 10,000 mensajes.
  • Se necesitan 10,000 segundos (casi 3 horas) para procesar el búfer.
  • En ese tiempo, algunos de los mensajes almacenados en búfer superan el plazo de confirmación y se reenvían al mismo cliente, lo que genera duplicados.
  • Cuando se ejecutan varias instancias de cliente, los mensajes detenidos en el búfer de un cliente no estarán disponibles para ninguna instancia de cliente.

Esta situación no ocurrirá si los mensajes llegan a una velocidad constante (en lugar de como un lote grande): el servicio nunca tiene los 10 MB completos de mensajes a la vez, por lo que puede balancear las cargas de mensajes de manera eficaz entre varios suscriptores.

Para abordar esta situación, usa una suscripción de envío o la API de extracción, ahora disponible en algunas de las bibliotecas cliente de Cloud, (consulta la sección de extracción síncrona) y en todas las bibliotecas cliente de la API. Si quieres obtener más información, consulta la documentación de bibliotecas cliente .

Extracción síncrona

Hay casos en que la extracción asíncrona no es la mejor opción para tu aplicación. Por ejemplo, la lógica de la aplicación podría depender de un patrón de sondeo para recuperar mensajes o requerir un límite preciso en una cantidad de mensajes recuperados por el cliente en un momento dado. Para asistir tales aplicaciones, el servicio admite un método de extracción síncrona.

Este es un ejemplo de código para extraer y confirmar una cantidad fija de mensajes:

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
        subscriptionId);
    SubscriberServiceApiClient subscriberClient =
        SubscriberServiceApiClient.Create();
    // Pull messages from server,
    // allowing an immediate response if there are no messages.
    PullResponse response = subscriberClient.Pull(
        subscriptionName, returnImmediately: true, maxMessages: 20);
    // Print out each received message.
    foreach (ReceivedMessage msg in response.ReceivedMessages)
    {
        string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
        Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
    }
    // If acknowledgement required, send to server.
    if (acknowledge)
    {
        subscriberClient.Acknowledge(subscriptionName,
            response.ReceivedMessages.Select(msg => msg.AckId));
    }

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 << 20) // 20MB
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
      // String projectId = "my-project-id";
      // String subscriptionId = "my-subscription-id";
      // int numOfMessages = 10;   // max number of messages to be pulled
      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setReturnImmediately(false) // return immediately if messages are not available
              .setSubscription(subscriptionName)
              .build();

      // use pullCallable().futureCall to asynchronously perform this operation
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        // handle received message
        // ...
        ackIds.add(message.getAckId());
      }
      // acknowledge received messages
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();
      // use acknowledgeCallable().futureCall to asynchronously perform this operation
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      return pullResponse.getReceivedMessagesList();
    }

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'YOUR_PROJECT_ID';
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';

    // Imports the Google Cloud client library. v1 is for the lower level
    // proto access.
    const {v1} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use.
    const subClient = new v1.SubscriberClient();

    async function synchronousPull() {
      const formattedSubscription = subClient.subscriptionPath(
        projectId,
        subscriptionName
      );

      // The maximum number of messages returned for this request.
      // Pub/Sub may return fewer than the number specified.
      const request = {
        subscription: formattedSubscription,
        maxMessages: 10,
      };

      // The subscriber pulls a specified number of messages.
      const [response] = await subClient.pull(request);

      // Process the messages.
      const ackIds = [];
      for (const message of response.receivedMessages) {
        console.log(`Received message: ${message.message.data}`);
        ackIds.push(message.ackId);
      }

      // Acknowledge all of the messages. You could also ackknowledge
      // these individually, but this is more efficient.
      const ackRequest = {
        subscription: formattedSubscription,
        ackIds: ackIds,
      };
      await subClient.acknowledge(ackRequest);

      console.log('Done.');
    }

    synchronousPull().catch(console.error);

PHP

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

use Google\Cloud\PubSub\PubSubClient;

    /**
     * Pulls all Pub/Sub messages for a subscription.
     *
     * @param string $projectId  The Google project ID.
     * @param string $subscriptionName  The Pub/Sub subscription name.
     */
    function pull_messages($projectId, $subscriptionName)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $subscription = $pubsub->subscription($subscriptionName);
        foreach ($subscription->pull() as $message) {
            printf('Message: %s' . PHP_EOL, $message->data());
            // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
            $subscription->acknowledge($message);
        }
    }

protocol

Solicitud:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

{
      "returnImmediately": "false",
      "maxMessages": "1"
    }
    

Respuesta:

200 OK

{
      "receivedMessages": [{
        "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
        "message": {
          "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
          "messageId": "19917247034"
        }
      }]
    }
    

Solicitud:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

{
      "ackIds": [
        "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
      ]
    }
    

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    NUM_MESSAGES = 3

    # The subscriber pulls a specific number of messages.
    response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

    ack_ids = []
    for received_message in response.received_messages:
        print("Received: {}".format(received_message.message.data))
        ack_ids.append(received_message.ack_id)

    # Acknowledges the received messages so they will not be sent again.
    subscriber.acknowledge(subscription_path, ack_ids)

    print(
        "Received and acknowledged {} messages. Done.".format(
            len(response.received_messages)
        )
    )

    subscriber.close()

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    subscription.pull.each do |message|
      puts "Message pulled: #{message.data}"
      message.acknowledge!
    end

Ten en cuenta que, para lograr una baja latencia de entrega de mensajes con extracción síncrona, es importante tener muchas solicitudes de extracción pendientes al mismo tiempo. A medida que aumenta la capacidad de procesamiento del tema, se necesitan más solicitudes de extracción. En general, se prefiere la extracción síncrona para aplicaciones sensibles a la latencia.

Extracción síncrona con administración de la asignación de tiempo

El procesamiento de un mensaje individual puede exceder el plazo de confirmación preconfigurado, también conocido como asignación de tiempo. Para evitar el reenvío de estos mensajes, las bibliotecas cliente proporcionan una manera de restablecer sus plazos de confirmación (excepto las bibliotecas cliente de Go, que modifican de forma automática los plazos de confirmación para los mensajes sondeados), como se muestra en los ejemplos a continuación:

go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
    	"context"
    	"fmt"
    	"io"
    	"time"

    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgsSync(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}
    	defer client.Close()

    	sub := client.Subscription(subID)

    	// Turn on synchronous mode. This makes the subscriber use the Pull RPC rather
    	// than the StreamingPull RPC, which is useful for guaranteeing MaxOutstandingMessages,
    	// the max number of messages the client will hold in memory at a time.
    	sub.ReceiveSettings.Synchronous = true
    	sub.ReceiveSettings.MaxOutstandingMessages = 10

    	// Receive messages for 5 seconds.
    	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    	defer cancel()

    	// Create a channel to handle messages to as they come in.
    	cm := make(chan *pubsub.Message)
    	// Handle individual messages in a goroutine.
    	go func() {
    		for {
    			select {
    			case msg := <-cm:
    				fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
    				msg.Ack()
    			case <-ctx.Done():
    				return
    			}
    		}
    	}()

    	// Receive blocks until the passed in context is done.
    	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    		cm <- msg
    	})
    	if err != nil && status.Code(err) != codes.Canceled {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	close(cm)

    	return nil
    }
    

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'YOUR_PROJECT_ID';
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';

    // Imports the Google Cloud client library. v1 is for the lower level
    // proto access.
    const {v1} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use.
    const subClient = new v1.SubscriberClient();

    async function synchronousPullWithLeaseManagement() {
      const formattedSubscription = subClient.subscriptionPath(
        projectId,
        subscriptionName
      );

      // The maximum number of messages returned for this request.
      // Pub/Sub may return fewer than the number specified.
      const maxMessages = 1;
      const newAckDeadlineSeconds = 30;
      const request = {
        subscription: formattedSubscription,
        maxMessages: maxMessages,
      };

      let isProcessed = false;

      // The worker function is meant to be non-blocking. It starts a long-
      // running process, such as writing the message to a table, which may
      // take longer than the default 10-sec acknowledge deadline.
      function worker(message) {
        console.log(`Processing "${message.message.data}"...`);

        setTimeout(() => {
          console.log(`Finished procesing "${message.message.data}".`);
          isProcessed = true;
        }, 30000);
      }

      // The subscriber pulls a specified number of messages.
      const [response] = await subClient.pull(request);

      // Obtain the first message.
      const message = response.receivedMessages[0];

      // Send the message to the worker function.
      worker(message);

      let waiting = true;
      while (waiting) {
        await new Promise(r => setTimeout(r, 10000));
        // If the message has been processed..
        if (isProcessed) {
          const ackRequest = {
            subscription: formattedSubscription,
            ackIds: [message.ackId],
          };

          //..acknowledges the message.
          await subClient.acknowledge(ackRequest);
          console.log(`Acknowledged: "${message.message.data}".`);
          // Exit after the message is acknowledged.
          waiting = false;
          console.log(`Done.`);
        } else {
          // If the message is not yet processed..
          const modifyAckRequest = {
            subscription: formattedSubscription,
            ackIds: [message.ackId],
            ackDeadlineSeconds: newAckDeadlineSeconds,
          };

          //..reset its ack deadline.
          await subClient.modifyAckDeadline(modifyAckRequest);

          console.log(
            `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
          );
        }
      }
    }

    synchronousPullWithLeaseManagement().catch(console.error);

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

import logging
    import multiprocessing
    import random
    import time

    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    NUM_MESSAGES = 2
    ACK_DEADLINE = 30
    SLEEP_TIME = 10

    # The subscriber pulls a specific number of messages.
    response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    def worker(msg):
        """Simulates a long-running process."""
        RUN_TIME = random.randint(1, 60)
        logger.info(
            "{}: Running {} for {}s".format(
                time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME
            )
        )
        time.sleep(RUN_TIME)

    # `processes` stores process as key and ack id and message as values.
    processes = dict()
    for message in response.received_messages:
        process = multiprocessing.Process(target=worker, args=(message,))
        processes[process] = (message.ack_id, message.message.data)
        process.start()

    while processes:
        for process in list(processes):
            ack_id, msg_data = processes[process]
            # If the process is still running, reset the ack deadline as
            # specified by ACK_DEADLINE once every while as specified
            # by SLEEP_TIME.
            if process.is_alive():
                # `ack_deadline_seconds` must be between 10 to 600.
                subscriber.modify_ack_deadline(
                    subscription_path,
                    [ack_id],
                    ack_deadline_seconds=ACK_DEADLINE,
                )
                logger.info(
                    "{}: Reset ack deadline for {} for {}s".format(
                        time.strftime("%X", time.gmtime()),
                        msg_data,
                        ACK_DEADLINE,
                    )
                )

            # If the processs is finished, acknowledges using `ack_id`.
            else:
                subscriber.acknowledge(subscription_path, [ack_id])
                logger.info(
                    "{}: Acknowledged {}".format(
                        time.strftime("%X", time.gmtime()), msg_data
                    )
                )
                processes.pop(process)

        # If there are still processes running, sleeps the thread.
        if processes:
            time.sleep(SLEEP_TIME)

    print(
        "Received and acknowledged {} messages. Done.".format(
            len(response.received_messages)
        )
    )

    subscriber.close()

Escalamiento

Es posible que debas implementar un mecanismo de escalamiento para que tu aplicación de suscriptor se mantenga al día con el volumen de mensajes. La forma de hacerlo depende de tu entorno, pero generalmente se basará en las métricas de trabajo pendiente que se ofrecen a través del servicio de supervisión del conjunto de operaciones de Google Cloud. Si quieres obtener detalles sobre cómo hacer esto para Compute Engine, consulta Ajusta la escala según las métricas de Stackdriver Monitoring.

Ve a la sección de Pub/Sub de la lista de métricas de GCP para saber qué métricas se pueden supervisar de manera programática.

Por último, como con todos los servicios distribuidos, puede que se vuelva a intentar cada solicitud de vez en cuando.

Detecta mensajes duplicados y fuerza reintentos

Cuando no confirmas un mensaje antes de que venza el plazo de confirmación, Pub/Sub vuelve a enviar el mensaje. Como resultado, Pub/Sub puede enviar mensajes duplicados. Usa el paquete de operaciones de Google Cloud para supervisar operaciones de confirmación con el código de respuesta expired para detectar esta condición. Si quieres obtener estos datos, selecciona la métrica Operaciones de confirmación de mensajes (Acknowledge message operations) y, luego, agrúpala o fíltrala por la etiqueta response_code. Ten en cuenta que response_code es una etiqueta del sistema en una métrica; no es una métrica.

Usa Stackdriver para buscar plazos límite de confirmación de mensajes vencidos.

Para reducir la tasa de duplicación, extiende el plazo del mensaje.

  • Las bibliotecas cliente manejan la extensión de plazos de forma automática, pero debes tener en cuenta que existen límites máximos predeterminados para la extensión que se pueden configurar.
  • Si creas tu propia biblioteca cliente, usa el método modifyAckDeadline para extender el plazo de confirmación.

Como alternativa, para forzar a Pub/Sub a volver a enviar un mensaje, establece modifyAckDeadline en 0.