Como receber mensagens usando pull

O Pub/Sub é compatível com entrega de mensagens por push e pull. Para ter uma visão geral e comparar assinaturas de pull e push, consulte esta página. Este documento descreve a entrega por pull. Para ver uma discussão da entrega por push, consulte o Guia do assinante de push.

Pull assíncrono

O uso do pull assíncrono oferece uma maior capacidade em seu aplicativo, não exigindo que ele bloqueie novas mensagens. As mensagens podem ser recebidas no aplicativo usando-se um detector de mensagens de longa duração e reconhecidas uma de cada vez, conforme mostrado no exemplo abaixo. Os clientes Java, Python, .NET, Go e Ruby usam a API de serviço streamingPull para implementar a API do cliente assíncrona com eficiência.

Nem todas as bibliotecas de cliente são compatíveis com mensagens de pull assíncrono. Para saber mais sobre mensagens de pull síncrono, consulte Pull síncrono.

Para mais informações, consulte a documentação de referência da API da sua linguagem de programação.

C#

Antes de tentar esse exemplo, siga as instruções de configuração do C# em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
        subscriptionId);
    SubscriberClient subscriber = await SubscriberClient.CreateAsync(
        subscriptionName);
    // SubscriberClient runs your message handle function on multiple
    // threads to maximize throughput.
    Task startTask = subscriber.StartAsync(
        async (PubsubMessage message, CancellationToken cancel) =>
        {
            string text =
                Encoding.UTF8.GetString(message.Data.ToArray());
            await Console.Out.WriteLineAsync(
                $"Message {message.MessageId}: {text}");
            return acknowledge ? SubscriberClient.Reply.Ack
                : SubscriberClient.Reply.Nack;
        });
    // Run for 3 seconds.
    await Task.Delay(3000);
    await subscriber.StopAsync(CancellationToken.None);

go

Antes de tentar essa amostra, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
    	"context"
    	"fmt"
    	"io"
    	"sync"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgs(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	// Publish 10 messages on the topic.
    	var results []*pubsub.PublishResult
    	for i := 0; i < 10; i++ {
    		res := topic.Publish(ctx, &pubsub.Message{
    			Data: []byte(fmt.Sprintf("hello world #%d", i)),
    		})
    		results = append(results, res)
    	}

    	// Check that all messages were published.
    	for _, r := range results {
    		_, err := r.Get(ctx)
    		if err != nil {
    			return fmt.Errorf("Get: %v", err)
    		}
    	}
    	// Consume 10 messages.
    	var mu sync.Mutex
    	received := 0
    	sub := client.Subscription(subID)
    	cctx, cancel := context.WithCancel(ctx)
    	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
    		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
    		msg.Ack()
    		mu.Lock()
    		defer mu.Unlock()
    		received++
    		if received == 10 {
    			cancel()
    		}
    	})
    	if err != nil {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	return nil
    }
    

java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.

String projectId = "my-project-id";
    String subscriptionId = "my-subscription-id";

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);
    // Instantiate an asynchronous message receiver
    MessageReceiver receiver =
        new MessageReceiver() {
          @Override
          public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            // handle incoming message, then ack/nack the received message
            System.out.println("Id : " + message.getMessageId());
            System.out.println("Data : " + message.getData().toStringUtf8());
            consumer.ack();
          }
        };

    Subscriber subscriber = null;
    try {
      // Create a subscriber for "my-subscription-id" bound to the message receiver
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      subscriber.startAsync().awaitRunning();
      // Allow the subscriber to run indefinitely unless an unrecoverable error occurs
      subscriber.awaitTerminated();
    } finally {
      // Stop receiving messages
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }

node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
    // const timeout = 60;

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    function listenForMessages() {
      // References an existing subscription
      const subscription = pubSubClient.subscription(subscriptionName);

      // Create an event handler to handle messages
      let messageCount = 0;
      const messageHandler = message => {
        console.log(`Received message ${message.id}:`);
        console.log(`\tData: ${message.data}`);
        console.log(`\tAttributes: ${message.attributes}`);
        messageCount += 1;

        // "Ack" (acknowledge receipt of) the message
        message.ack();
      };

      // Listen for new messages until timeout is hit
      subscription.on('message', messageHandler);

      setTimeout(() => {
        subscription.removeListener('message', messageHandler);
        console.log(`${messageCount} message(s) received.`);
      }, timeout * 1000);
    }

    listenForMessages();

python

Antes de tentar essa amostra, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO timeout = 5.0  # "How long the subscriber should listen for
    # messages in seconds"

    subscriber = pubsub_v1.SubscriberClient()
    # The `subscription_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/subscriptions/{subscription_name}`
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    def callback(message):
        print("Received message: {}".format(message))
        message.ack()

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback
    )
    print("Listening for messages on {}..\n".format(subscription_path))

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result(timeout=timeout)
        except:  # noqa
            streaming_pull_future.cancel()

Como processar atributos personalizados

Nesta amostra, você verá como enviar mensagens pull de maneira assíncrona e recuperar os atributos personalizados dos metadados:

python

Antes de tentar essa amostra, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO timeout = 5.0  # "How long the subscriber should listen for
    # messages in seconds"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    def callback(message):
        print("Received message: {}".format(message.data))
        if message.attributes:
            print("Attributes:")
            for key in message.attributes:
                value = message.attributes.get(key)
                print("{}: {}".format(key, value))
        message.ack()

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback
    )
    print("Listening for messages on {}..\n".format(subscription_path))

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result(timeout=timeout)
        except:  # noqa
            streaming_pull_future.cancel()

ruby

Antes de tentar essa amostra, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"
      unless received_message.attributes.empty?
        puts "Attributes:"
        received_message.attributes.each do |key, value|
          puts "#{key}: #{value}"
        end
      end
      received_message.acknowledge!
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!

Escuta de erros

Esta amostra exibe como lidar com erros que ocorrem durante a assinatura nas mensagens:

go

Antes de tentar essa amostra, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
    	"context"
    	"fmt"
    	"io"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgsError(w io.Writer, projectID, subID string) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	// If the service returns a non-retryable error, Receive returns that error after
    	// all of the outstanding calls to the handler have returned.
    	err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
    		msg.Ack()
    	})
    	if err != nil {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	return nil
    }
    

java

Antes de tentar essa amostra, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

subscriber.addListener(
        new Subscriber.Listener() {
          public void failed(Subscriber.State from, Throwable failure) {
            // Handle error.
          }
        },
        MoreExecutors.directExecutor());

node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
    // const timeout = 10;

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    function listenForErrors() {
      // References an existing subscription
      const subscription = pubSubClient.subscription(subscriptionName);

      // Create an event handler to handle messages
      const messageHandler = function(message) {
        // Do something with the message
        console.log(`Message: ${message}`);

        // "Ack" (acknowledge receipt of) the message
        message.ack();
      };

      // Create an event handler to handle errors
      const errorHandler = function(error) {
        // Do something with the error
        console.error(`ERROR: ${error}`);
        throw error;
      };

      // Listen for new messages/errors until timeout is hit
      subscription.on('message', messageHandler);
      subscription.on('error', errorHandler);

      setTimeout(() => {
        subscription.removeListener('message', messageHandler);
        subscription.removeListener('error', errorHandler);
      }, timeout * 1000);
    }

    listenForErrors();

python

Antes de tentar essa amostra, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

from google.cloud import pubsub_v1

    # TODO project_id        = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pubsub subscription name"
    # TODO timeout = 5.0  # "How long the subscriber should listen for
    # messages in seconds"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    def callback(message):
        print("Received message: {}".format(message))
        message.ack()

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback
    )
    print("Listening for messages on {}..\n".format(subscription_path))

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        try:
            streaming_pull_future.result(timeout=timeout)
        except Exception as e:
            streaming_pull_future.cancel()
            print(
                "Listening for messages on {} threw an exception: {}.".format(
                    subscription_name, e
                )
            )

ruby

Antes de tentar essa amostra, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"
      received_message.acknowledge!
    end
    # Propagate expection from child threads to the main thread as soon as it is
    # raised. Exceptions happened in the callback thread are collected in the
    # callback thread pool and do not propagate to the main thread
    Thread.abort_on_exception = true

    begin
      subscriber.start
      # Let the main thread sleep for 60 seconds so the thread for listening
      # messages does not quit
      sleep 60
      subscriber.stop.wait!
    rescue Exception => e
      puts "Exception #{e.inspect}: #{e.message}"
      raise "Stopped listening for messages."
    end

Controle de fluxo de mensagens

O cliente assinante pode processar e reconhecer as mensagens mais lentamente do que o Pub/Sub as envia. Nesse caso:

  • É possível que um cliente tenha um backlog de mensagens por não ter capacidade para processar o volume de mensagens recebidas, mas outro cliente na rede pode ter essa capacidade. O segundo cliente pode reduzir o backlog da assinatura, mas não consegue fazê-lo porque o primeiro cliente mantém um lease nas mensagens recebidas. Isso reduz a taxa de processamento geral, porque as mensagens ficam presas no primeiro cliente.

  • Como a biblioteca de cliente estende repetidamente o prazo de confirmação para as mensagens acumuladas, elas continuam a consumir recursos de memória, CPU e largura de banda. Dessa forma, o cliente assinante pode ficar sem recursos (como memória). Isso pode afetar negativamente a capacidade e a latência das mensagens de processamento.

Para atenuar os problemas acima, controle a taxa com que o assinante recebe as mensagens com os recursos de controle de fluxo do assinante. Esses recursos de controle de fluxo são ilustrados nas amostras a seguir:

C#

Antes de tentar esse exemplo, siga as instruções de configuração do C# em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
        subscriptionId);
    SubscriberClient subscriber = await SubscriberClient.CreateAsync(
        subscriptionName,
        settings: new SubscriberClient.Settings()
        {
            AckExtensionWindow = TimeSpan.FromSeconds(4),
            Scheduler = Google.Api.Gax.SystemScheduler.Instance,
            AckDeadline = TimeSpan.FromSeconds(10),
            FlowControlSettings = new Google.Api.Gax
                .FlowControlSettings(
                maxOutstandingElementCount: 100,
                maxOutstandingByteCount: 10240)
        });
    // SubscriberClient runs your message handle function on multiple
    // threads to maximize throughput.
    Task startTask = subscriber.StartAsync(
        async (PubsubMessage message, CancellationToken cancel) =>
        {
            string text =
                Encoding.UTF8.GetString(message.Data.ToArray());
            await Console.Out.WriteLineAsync(
                $"Message {message.MessageId}: {text}");
            return acknowledge ? SubscriberClient.Reply.Ack
                : SubscriberClient.Reply.Nack;
        });
    // Run for 3 seconds.
    await Task.Delay(3000);
    await subscriber.StopAsync(CancellationToken.None);

go

Antes de tentar essa amostra, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
    	"context"
    	"fmt"
    	"io"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgsSettings(w io.Writer, projectID, subID string) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	sub := client.Subscription(subID)
    	sub.ReceiveSettings.Synchronous = true
    	// MaxOutstandingMessages is the maximum number of unprocessed messages the
    	// client will pull from the server before pausing.
    	//
    	// This is only guaranteed when ReceiveSettings.Synchronous is set to true.
    	// When Synchronous is set to false, the StreamingPull RPC is used which
    	// can pull a single large batch of messages at once that is greater than
    	// MaxOustandingMessages before pausing. For more info, see
    	// https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages.
    	sub.ReceiveSettings.MaxOutstandingMessages = 10
    	// MaxOutstandingBytes is the maximum size of unprocessed messages,
    	// that the client will pull from the server before pausing. Similar
    	// to MaxOutstandingMessages, this may be exceeded with a large batch
    	// of messages since we cannot control the size of a batch of messages
    	// from the server (even with the synchronous Pull RPC).
    	sub.ReceiveSettings.MaxOutstandingBytes = 1e10
    	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
    		msg.Ack()
    	})
    	if err != nil {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	return nil
    }
    

java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.

FlowControlSettings flowControlSettings =
        FlowControlSettings.newBuilder()
            .setMaxOutstandingElementCount(10_000L)
            .setMaxOutstandingRequestBytes(1_000_000_000L)
            .build();
    Subscriber subscriber =
        Subscriber.newBuilder(subscriptionName, receiver)
            .setFlowControlSettings(flowControlSettings)
            .build();

node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
    // const maxInProgress = 5;
    // const timeout = 10;

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    async function subscribeWithFlowControlSettings() {
      const subscriberOptions = {
        flowControl: {
          maxMessages: maxInProgress,
        },
      };

      // References an existing subscription.
      // Note that flow control settings are not persistent across subscribers.
      const subscription = pubSubClient.subscription(
        subscriptionName,
        subscriberOptions
      );

      console.log(
        `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
      );

      const messageHandler = message => {
        console.log(`Received message: ${message.id}`);
        console.log(`\tData: ${message.data}`);
        console.log(`\tAttributes: ${message.attributes}`);

        // "Ack" (acknowledge receipt of) the message
        message.ack();
      };

      subscription.on(`message`, messageHandler);

      setTimeout(() => {
        subscription.close();
      }, timeout * 1000);
    }
    

python

Antes de tentar essa amostra, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO timeout = 5.0  # "How long the subscriber should listen for
    # messages in seconds"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    def callback(message):
        print("Received message: {}".format(message.data))
        message.ack()

    # Limit the subscriber to only have ten outstanding messages at a time.
    flow_control = pubsub_v1.types.FlowControl(max_messages=10)

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback, flow_control=flow_control
    )
    print("Listening for messages on {}..\n".format(subscription_path))

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result(timeout=timeout)
        except:  # noqa
            streaming_pull_future.cancel()

ruby

Antes de tentar essa amostra, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    subscriber   = subscription.listen inventory: 10 do |received_message|
      puts "Received message: #{received_message.data}"
      received_message.acknowledge!
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!

De modo mais geral, a necessidade do controle de fluxo indica que as mensagens são publicadas com uma taxa maior do que são consumidas. Se esse for um estado constante, em vez de um pico transitório no volume de mensagens, pense em aumentar o número de instâncias do cliente assinante.

Controle de simultaneidade

A compatibilidade com a simultaneidade depende da linguagem de programação. Para implementações de linguagem compatíveis com linhas de execução paralelas, como Java e Go, as bibliotecas de cliente são a escolha padrão para o número de linhas de execução. Essa escolha pode não ser ideal para o aplicativo. Por exemplo, caso ache que o aplicativo assinante não está acompanhando o volume de mensagens recebidas nem está ligado à CPU, aumente a contagem de linhas de execução. Para operações de processamento de mensagens intensivas da CPU, diminua o número de linhas de execução.

A amostra seguir ilustra como controlar a simultaneidade em um assinante:

go

Antes de tentar essa amostra, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
    	"context"
    	"fmt"
    	"io"
    	"runtime"
    	"time"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgsConcurrenyControl(w io.Writer, projectID, subID string) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}
    	defer client.Close()

    	sub := client.Subscription(subID)
    	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
    	// concurrency settings. Otherwise, NumGoroutines will be set to 1.
    	sub.ReceiveSettings.Synchronous = false
    	// NumGoroutines is the number of goroutines sub.Receive will spawn to pull messages concurrently.
    	sub.ReceiveSettings.NumGoroutines = runtime.NumCPU()

    	// Receive messages for 10 seconds.
    	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    	defer cancel()

    	// Create a channel to handle messages to as they come in.
    	cm := make(chan *pubsub.Message)
    	// Handle individual messages in a goroutine.
    	go func() {
    		for {
    			select {
    			case msg := <-cm:
    				fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
    				msg.Ack()
    			case <-ctx.Done():
    				return
    			}
    		}
    	}()

    	// Receive blocks until the context is cancelled or an error occurs.
    	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    		cm <- msg
    	})
    	if err != nil {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	close(cm)

    	return nil
    }
    

java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.

// Provides an executor service for processing messages. The default
    // `executorProvider` used by the subscriber has a default thread count of 5.
    ExecutorProvider executorProvider =
        InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

    // `setParallelPullCount` determines how many StreamingPull streams the
    // subscriber will open to receive message. It defaults to 1.
    // `setExecutorProvider` configures an executor for the subscriber to
    // process messages.
    // Here, the subscriber is configured to open 2 streams for receiving
    // messages, each stream creates a new executor with 4 threads to help
    // process the message callbacks. In total 2x4=8 threads are used for
    // message processing.
    Subscriber subscriber =
        Subscriber.newBuilder(subscriptionName, receiver)
            .setParallelPullCount(2)
            .setExecutorProvider(executorProvider)
            .build();

ruby

Antes de tentar essa amostra, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    # Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
    # for sending acknowledgements and/or delays
    subscriber   = subscription.listen streams: 2, threads: {
      callback: 4,
      push:     2
    } do |received_message|
      puts "Received message: #{received_message.data}"
      received_message.acknowledge!
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!

A compatibilidade com a simultaneidade depende da linguagem de programação. Consulte a documentação de referência de APIs para mais informações.

StreamingPull

O serviço Pub/Sub tem duas APIs para recuperar mensagens:

Sempre que possível, as bibliotecas de cliente do Cloud usam StreamingPull (em inglês) para máxima capacidade e menor latência. Embora possa ser possível que o uso da API StreamingPull diretamente nunca venha a acontecer, é importante entender algumas propriedades essenciais do StreamingPull e como ela difere do método pull mais tradicional.

O método pull depende de um modelo de solicitação/resposta:

  1. O cliente envia uma solicitação de mensagens ao servidor.
  2. O servidor responde com zero ou mais mensagens e fecha a conexão.

A API do serviço StreamingPull depende de uma conexão bidirecional persistente para receber várias mensagens à medida que elas são disponibilizadas:

  1. O cliente envia uma solicitação ao servidor para estabelecer uma conexão.
  2. O servidor envia continuamente mensagens para o cliente conectado.
  3. A conexão é eventualmente encerrada pelo cliente ou pelo servidor.

StreamingPull tem uma taxa de erro de 100%, o que é esperado

Os streams do StreamingPull são sempre encerrados com um status não OK. Observe que, ao contrário das RPCs normais, o status aqui é simplesmente uma indicação de que o stream foi interrompido, não de que as solicitações estão falhando. Portanto, ainda que a API StreamingPull tenha uma taxa de erro de 100% aparentemente surpreendente, ela foi projetada dessa maneira.

Como diagnosticar erros do StreamingPull

Como os streams do StreamingPull sempre terminam com um erro, não é útil examinar as métricas de encerramento de stream enquanto diagnostica erros. Em vez disso, concentre-se na métrica de operação de mensagem StreamingPull (subscription/streaming_pull_message_operation_count). Procure estes erros:

  • Erros FAILED_PRECONDITION podem ocorrer nos casos a seguir:
    • O Pub/Sub tenta descriptografar uma mensagem com uma chave desativada do Cloud KMS.
    • As assinaturas podem ser temporariamente suspensas se houver mensagens no backlog da assinatura criptografadas com uma chave do Cloud KMS desativada.
  • Erros UNAVAILABLE

StreamingPull: como lidar com grandes backlogs de pequenas mensagens

A pilha gRPC StreamingPull está otimizada para alta capacidade e, portanto, armazena mensagens em buffers. Isso pode ter algumas consequências se você está tentando processar grandes backlogs de pequenas mensagens, em vez de um stream estável de novas mensagens. Nessas condições, você pode ver mensagens entregues várias vezes, e o balanço de carga delas pode não ser realizado com eficácia entre os clientes.

O buffer entre o serviço Pub/Sub e o espaço de usuário da biblioteca de cliente é de aproximadamente 10 MB. Para entender o impacto desse buffer no comportamento da biblioteca de cliente, analise este exemplo:

  • Há um backlog de 10.000 mensagens de 1 KB em uma assinatura.
  • Cada mensagem leva um segundo para ser processada sequencialmente por uma instância de cliente com uma única linha de execução.
  • A primeira instância de cliente para estabelecer uma conexão de StreamingPull com o serviço para essa inscrição preencherá seu buffer com todas as 10.000 mensagens.
  • Leva 10.000 segundos (quase três horas) para processar o buffer.
  • Nesse período, algumas das mensagens armazenadas em buffer excedem o prazo de confirmação e são reenviadas para o mesmo cliente, resultando em duplicatas.
  • Quando várias instâncias de cliente estão em execução, as mensagens presas no buffer de um cliente não ficam disponíveis para nenhuma instância de cliente.

Essa situação não ocorrerá se as mensagens chegarem a uma taxa constante (e não como um único lote grande): o serviço nunca tem todos os 10 MB de mensagens por vez e, portanto, consegue balancear a carga de maneira eficaz entre vários assinantes.

Para resolver essa situação, use uma assinatura push ou a API Pull, atualmente disponível em algumas das bibliotecas de cliente do Cloud (consulte a seção Pull síncrono) e todas as bibliotecas de cliente da API. Para saber mais, consulte a documentação das bibliotecas de cliente.

Pull síncrono

Há casos em que o pull assíncrono não é adequado para seu aplicativo. Por exemplo, a lógica do aplicativo pode depender de um padrão de pesquisa para recuperar mensagens ou exigir um limite preciso em um número de mensagens recuperadas pelo cliente a qualquer momento. A compatibilidade do serviço com esses aplicativos depende do método de pull síncrono.

Veja a seguir um código de amostra para enviar por pull e confirmar um número fixo de mensagens:

C#

Antes de tentar esse exemplo, siga as instruções de configuração do C# em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
        subscriptionId);
    SubscriberServiceApiClient subscriberClient =
        SubscriberServiceApiClient.Create();
    // Pull messages from server,
    // allowing an immediate response if there are no messages.
    PullResponse response = subscriberClient.Pull(
        subscriptionName, returnImmediately: true, maxMessages: 20);
    // Print out each received message.
    foreach (ReceivedMessage msg in response.ReceivedMessages)
    {
        string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
        Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
    }
    // If acknowledgement required, send to server.
    if (acknowledge)
    {
        subscriberClient.Acknowledge(subscriptionName,
            response.ReceivedMessages.Select(msg => msg.AckId));
    }

java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.

SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 << 20) // 20MB
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
      // String projectId = "my-project-id";
      // String subscriptionId = "my-subscription-id";
      // int numOfMessages = 10;   // max number of messages to be pulled
      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setReturnImmediately(false) // return immediately if messages are not available
              .setSubscription(subscriptionName)
              .build();

      // use pullCallable().futureCall to asynchronously perform this operation
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        // handle received message
        // ...
        ackIds.add(message.getAckId());
      }
      // acknowledge received messages
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();
      // use acknowledgeCallable().futureCall to asynchronously perform this operation
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      return pullResponse.getReceivedMessagesList();
    }

node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'YOUR_PROJECT_ID';
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';

    // Imports the Google Cloud client library. v1 is for the lower level
    // proto access.
    const {v1} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use.
    const subClient = new v1.SubscriberClient();

    async function synchronousPull() {
      const formattedSubscription = subClient.subscriptionPath(
        projectId,
        subscriptionName
      );

      // The maximum number of messages returned for this request.
      // Pub/Sub may return fewer than the number specified.
      const request = {
        subscription: formattedSubscription,
        maxMessages: 10,
      };

      // The subscriber pulls a specified number of messages.
      const [response] = await subClient.pull(request);

      // Process the messages.
      const ackIds = [];
      for (const message of response.receivedMessages) {
        console.log(`Received message: ${message.message.data}`);
        ackIds.push(message.ackId);
      }

      // Acknowledge all of the messages. You could also ackknowledge
      // these individually, but this is more efficient.
      const ackRequest = {
        subscription: formattedSubscription,
        ackIds: ackIds,
      };
      await subClient.acknowledge(ackRequest);

      console.log('Done.');
    }

    synchronousPull().catch(console.error);

php

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

use Google\Cloud\PubSub\PubSubClient;

    /**
     * Pulls all Pub/Sub messages for a subscription.
     *
     * @param string $projectId  The Google project ID.
     * @param string $subscriptionName  The Pub/Sub subscription name.
     */
    function pull_messages($projectId, $subscriptionName)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $subscription = $pubsub->subscription($subscriptionName);
        foreach ($subscription->pull() as $message) {
            printf('Message: %s' . PHP_EOL, $message->data());
            // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
            $subscription->acknowledge($message);
        }
    }

protocol

Solicitação:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

{
      "returnImmediately": "false",
      "maxMessages": "1"
    }
    

Resposta:

200 OK

{
      "receivedMessages": [{
        "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
        "message": {
          "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
          "messageId": "19917247034"
        }
      }]
    }
    

Solicitação:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

{
      "ackIds": [
        "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
      ]
    }
    

python

Antes de tentar essa amostra, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    NUM_MESSAGES = 3

    # The subscriber pulls a specific number of messages.
    response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

    ack_ids = []
    for received_message in response.received_messages:
        print("Received: {}".format(received_message.message.data))
        ack_ids.append(received_message.ack_id)

    # Acknowledges the received messages so they will not be sent again.
    subscriber.acknowledge(subscription_path, ack_ids)

    print(
        "Received and acknowledged {} messages. Done.".format(
            len(response.received_messages)
        )
    )

    subscriber.close()

ruby

Antes de tentar essa amostra, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    subscription.pull.each do |message|
      puts "Message pulled: #{message.data}"
      message.acknowledge!
    end

Para conseguir baixa latência de entrega de mensagens com pull síncrono, é importante ter muitas solicitações de pull pendentes ao mesmo tempo. À medida que a capacidade do tópico aumenta, mais solicitações de pull são necessárias. Em geral, o pull assíncrono é indicado para aplicativos sensíveis à latência.

Pull síncrono com gerenciamento de lease

O processamento de uma mensagem individual pode exceder o prazo de confirmação pré-configurado, também conhecido como lease. Para evitar que essas mensagens sejam enviadas novamente, as bibliotecas de cliente permitem redefinir prazos de confirmação (exceto a biblioteca de cliente Go, que modifica automaticamente os prazos de confirmação de mensagens pesquisadas), conforme mostrado nas amostras abaixo:

go

Antes de tentar essa amostra, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
    	"context"
    	"fmt"
    	"io"
    	"time"

    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgsSync(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}
    	defer client.Close()

    	sub := client.Subscription(subID)

    	// Turn on synchronous mode. This makes the subscriber use the Pull RPC rather
    	// than the StreamingPull RPC, which is useful for guaranteeing MaxOutstandingMessages,
    	// the max number of messages the client will hold in memory at a time.
    	sub.ReceiveSettings.Synchronous = true
    	sub.ReceiveSettings.MaxOutstandingMessages = 10

    	// Receive messages for 5 seconds.
    	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    	defer cancel()

    	// Create a channel to handle messages to as they come in.
    	cm := make(chan *pubsub.Message)
    	// Handle individual messages in a goroutine.
    	go func() {
    		for {
    			select {
    			case msg := <-cm:
    				fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
    				msg.Ack()
    			case <-ctx.Done():
    				return
    			}
    		}
    	}()

    	// Receive blocks until the passed in context is done.
    	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    		cm <- msg
    	})
    	if err != nil && status.Code(err) != codes.Canceled {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	close(cm)

    	return nil
    }
    

node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'YOUR_PROJECT_ID';
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';

    // Imports the Google Cloud client library. v1 is for the lower level
    // proto access.
    const {v1} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use.
    const subClient = new v1.SubscriberClient();

    async function synchronousPullWithLeaseManagement() {
      const formattedSubscription = subClient.subscriptionPath(
        projectId,
        subscriptionName
      );

      // The maximum number of messages returned for this request.
      // Pub/Sub may return fewer than the number specified.
      const maxMessages = 1;
      const newAckDeadlineSeconds = 30;
      const request = {
        subscription: formattedSubscription,
        maxMessages: maxMessages,
      };

      let isProcessed = false;

      // The worker function is meant to be non-blocking. It starts a long-
      // running process, such as writing the message to a table, which may
      // take longer than the default 10-sec acknowledge deadline.
      function worker(message) {
        console.log(`Processing "${message.message.data}"...`);

        setTimeout(() => {
          console.log(`Finished procesing "${message.message.data}".`);
          isProcessed = true;
        }, 30000);
      }

      // The subscriber pulls a specified number of messages.
      const [response] = await subClient.pull(request);

      // Obtain the first message.
      const message = response.receivedMessages[0];

      // Send the message to the worker function.
      worker(message);

      let waiting = true;
      while (waiting) {
        await new Promise(r => setTimeout(r, 10000));
        // If the message has been processed..
        if (isProcessed) {
          const ackRequest = {
            subscription: formattedSubscription,
            ackIds: [message.ackId],
          };

          //..acknowledges the message.
          await subClient.acknowledge(ackRequest);
          console.log(`Acknowledged: "${message.message.data}".`);
          // Exit after the message is acknowledged.
          waiting = false;
          console.log(`Done.`);
        } else {
          // If the message is not yet processed..
          const modifyAckRequest = {
            subscription: formattedSubscription,
            ackIds: [message.ackId],
            ackDeadlineSeconds: newAckDeadlineSeconds,
          };

          //..reset its ack deadline.
          await subClient.modifyAckDeadline(modifyAckRequest);

          console.log(
            `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
          );
        }
      }
    }

    synchronousPullWithLeaseManagement().catch(console.error);

python

Antes de tentar essa amostra, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

import logging
    import multiprocessing
    import random
    import time

    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    NUM_MESSAGES = 2
    ACK_DEADLINE = 30
    SLEEP_TIME = 10

    # The subscriber pulls a specific number of messages.
    response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    def worker(msg):
        """Simulates a long-running process."""
        RUN_TIME = random.randint(1, 60)
        logger.info(
            "{}: Running {} for {}s".format(
                time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME
            )
        )
        time.sleep(RUN_TIME)

    # `processes` stores process as key and ack id and message as values.
    processes = dict()
    for message in response.received_messages:
        process = multiprocessing.Process(target=worker, args=(message,))
        processes[process] = (message.ack_id, message.message.data)
        process.start()

    while processes:
        for process in list(processes):
            ack_id, msg_data = processes[process]
            # If the process is still running, reset the ack deadline as
            # specified by ACK_DEADLINE once every while as specified
            # by SLEEP_TIME.
            if process.is_alive():
                # `ack_deadline_seconds` must be between 10 to 600.
                subscriber.modify_ack_deadline(
                    subscription_path,
                    [ack_id],
                    ack_deadline_seconds=ACK_DEADLINE,
                )
                logger.info(
                    "{}: Reset ack deadline for {} for {}s".format(
                        time.strftime("%X", time.gmtime()),
                        msg_data,
                        ACK_DEADLINE,
                    )
                )

            # If the processs is finished, acknowledges using `ack_id`.
            else:
                subscriber.acknowledge(subscription_path, [ack_id])
                logger.info(
                    "{}: Acknowledged {}".format(
                        time.strftime("%X", time.gmtime()), msg_data
                    )
                )
                processes.pop(process)

        # If there are still processes running, sleeps the thread.
        if processes:
            time.sleep(SLEEP_TIME)

    print(
        "Received and acknowledged {} messages. Done.".format(
            len(response.received_messages)
        )
    )

    subscriber.close()

Escalonamento

Pode ser necessário implementar um mecanismo de escalonamento no seu aplicativo de assinante para acompanhar o volume das mensagens. Como fazer isso depende do seu ambiente, mas ele geralmente será baseado nas métricas de backlog oferecidas por meio do Pacote de operações do Google Cloud do serviço de monitoramento. Para ver detalhes sobre como fazer isso para o Compute Engine, consulte Como escalonar com base em métricas do Cloud Monitoring.

Acesse a seção Pub/Sub da página "lista de métricas do GCP" para saber quais métricas podem ser monitoradas programaticamente.

Assim como ocorre com qualquer serviço distribuído, de vez em quando você terá de repetir cada solicitação.

Como lidar com duplicatas e forçar novas tentativas

Quando você não confirma uma mensagem antes do vencimento do prazo de confirmação, o Pub/Sub reenvia a mensagem. Como resultado, o Pub/Sub pode enviar mensagens duplicadas. Use o pacote de operações do Google Cloud para monitorar as operações de confirmação com o código de resposta expired para detectar essa condição. Para conseguir esses dados, selecione a métrica Confirmar operações de mensagens e agrupe ou filtre pelo rótulo response_code. Observe que response_code é um rótulo do sistema em uma métrica. Ele não é uma métrica.

Use o Stackdriver para pesquisar prazos de confirmação de mensagens vencidos

Para reduzir a taxa de duplicação, prolongue o prazo da mensagem.

  • As bibliotecas de cliente prolongam o prazo automaticamente, mas há limites padrão de configuração de quanto o prazo pode ser prolongado.
  • Use o método modifyAckDeadline para estender o prazo de confirmação se você estiver criando sua própria biblioteca de cliente.

Como alternativa, para forçar o Pub/Sub a repetir uma mensagem, defina modifyAckDeadline para 0.