Como receber mensagens usando pull

O Pub/Sub é compatível com entrega de mensagens por push e pull. Para ter uma visão geral e comparar assinaturas de pull e push, consulte esta página. Este documento descreve a entrega por pull. Para ver uma discussão da entrega por push, consulte o Guia do assinante de push.

Pull assíncrono

O uso do pull assíncrono oferece uma maior capacidade em seu aplicativo, não exigindo que ele bloqueie novas mensagens. As mensagens podem ser recebidas no aplicativo usando-se um detector de mensagens de longa duração e reconhecidas uma de cada vez, conforme mostrado no exemplo abaixo. Os clientes Java, Python, .NET, Go e Ruby usam a API de serviço streamingPull para implementar a API do cliente assíncrona com eficiência.

Nem todas as bibliotecas de cliente são compatíveis com mensagens de pull assíncrono. Para saber mais sobre mensagens de pull síncrono, consulte Pull síncrono.

Para ver mais informações, consulte a documentação de referência da API da sua linguagem de programação.

C#

Antes de testar esta amostra, siga as instruções de configuração do C# no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para C# (em inglês).

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName);
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Go (em inglês).

import (
	"context"
	"fmt"
	"io"
	"sync"

	"cloud.google.com/go/pubsub"
)

func pullMsgs(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// Publish 10 messages on the topic.
	var results []*pubsub.PublishResult
	for i := 0; i < 10; i++ {
		res := topic.Publish(ctx, &pubsub.Message{
			Data: []byte(fmt.Sprintf("hello world #%d", i)),
		})
		results = append(results, res)
	}

	// Check that all messages were published.
	for _, r := range results {
		_, err := r.Get(ctx)
		if err != nil {
			return fmt.Errorf("Get: %v", err)
		}
	}
	// Consume 10 messages.
	var mu sync.Mutex
	received := 0
	sub := client.Subscription(subID)
	cctx, cancel := context.WithCancel(ctx)
	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
		mu.Lock()
		defer mu.Unlock()
		received++
		if received == 10 {
			cancel()
		}
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

java

Antes de testar esta amostra, siga as instruções de configuração do Java no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Java (em inglês).

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

ProjectSubscriptionName subscriptionName =
    ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync().awaitRunning();
  // Allow the subscriber to run indefinitely unless an unrecoverable error occurs
  subscriber.awaitTerminated();
} finally {
  // Stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

node.js

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Node.js (em inglês).

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Python (em inglês).

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
    time.sleep(60)

Como processar atributos personalizados

Nesta amostra, você verá como enviar mensagens pull de maneira assíncrona e recuperar os atributos personalizados dos metadados:

python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Python (em inglês).

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message.data))
    if message.attributes:
        print("Attributes:")
        for key in message.attributes:
            value = message.attributes.get(key)
            print("{}: {}".format(key, value))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
    time.sleep(60)

ruby

Antes de testar esta amostra, siga as instruções de configuração do Ruby no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Ruby (em inglês).

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Como detectar erros

Esta amostra exibe como lidar com erros que ocorrem durante a assinatura nas mensagens:

go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Go (em inglês).

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsError(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// If the service returns a non-retryable error, Receive returns that error after
	// all of the outstanding calls to the handler have returned.
	err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

java

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Go (em inglês).

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

node.js

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Node.js (em inglês).

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function(message) {
  // Do something with the message
  console.log(`Message: ${message}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Create an event handler to handle errors
const errorHandler = function(error) {
  // Do something with the error
  console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);

setTimeout(() => {
  subscription.removeListener(`message`, messageHandler);
  subscription.removeListener(`error`, errorHandler);
}, timeout * 1000);

python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Python (em inglês).

from google.cloud import pubsub_v1

# TODO project_id        = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pubsub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
try:
    # When timeout is unspecified, the result method waits indefinitely.
    future.result(timeout=30)
except Exception as e:
    print(
        "Listening for messages on {} threw an Exception: {}.".format(
            subscription_name, e
        )
    )

ruby

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Go (em inglês).

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => e
  puts "Exception #{e.inspect}: #{e.message}"
  raise "Stopped listening for messages."
end

Controle de fluxo de mensagens

O cliente assinante pode processar e reconhecer as mensagens mais lentamente do que o Pub/Sub as envia. Nesse caso:

  • É possível que um cliente tenha um backlog de mensagens por não ter capacidade para processar o volume de mensagens recebidas, mas outro cliente na rede tem. Este pode reduzir o backlog da assinatura, mas não consegue fazê-lo porque o primeiro cliente mantém um lease nas mensagens recebidas. Isso reduz a taxa de processamento geral, porque as mensagens ficam presas no primeiro cliente.

  • Como a biblioteca de cliente estende repetidamente o prazo de confirmação para as mensagens acumuladas, elas continuam a consumir recursos de memória, CPU e largura de banda. Dessa forma, o cliente assinante pode ficar sem recursos (como memória). Isso pode afetar negativamente a capacidade e a latência das mensagens de processamento.

Para atenuar os problemas acima, controle a taxa com que o assinante recebe as mensagens com os recursos de controle de fluxo do assinante. Esses recursos de controle de fluxo são ilustrados nas amostras a seguir:

C#

Antes de testar esta amostra, siga as instruções de configuração do C# no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para C# (em inglês).

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName,
    settings: new SubscriberClient.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        AckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandingByteCount: 10240)
    });
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Go (em inglês).

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsSettings(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	sub := client.Subscription(subID)
	sub.ReceiveSettings.Synchronous = true
	// MaxOutstandingMessages is the maximum number of unprocessed messages the
	// client will pull from the server before pausing.
	//
	// This is only guaranteed when ReceiveSettings.Synchronous is set to true.
	// When Synchronous is set to false, the StreamingPull RPC is used which
	// can pull a single large batch of messages at once that is greater than
	// MaxOustandingMessages before pausing. For more info, see
	// https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages.
	sub.ReceiveSettings.MaxOutstandingMessages = 10
	// MaxOutstandingBytes is the maximum size of unprocessed messages,
	// that the client will pull from the server before pausing. Similar
	// to MaxOutstandingMessages, this may be exceeded with a large batch
	// of messages since we cannot control the size of a batch of messages
	// from the server (even with the synchronous Pull RPC).
	sub.ReceiveSettings.MaxOutstandingBytes = 1e10
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

java

Antes de testar esta amostra, siga as instruções de configuração do Java no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Java (em inglês).

FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder()
        .setMaxOutstandingElementCount(10_000L)
        .setMaxOutstandingRequestBytes(1_000_000_000L)
        .build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

node.js

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Node.js (em inglês).

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const maxInProgress = 5;
// const timeout = 10;

const subscriberOptions = {
  flowControl: {
    maxMessages: maxInProgress,
  },
};

// References an existing subscription.
// Note that flow control settings are not persistent across subscribers.
const subscription = pubsub.subscription(subscriptionName, subscriberOptions);

console.log(
  `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
);

const messageHandler = message => {
  console.log(`Received message: ${message.id}`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.close();
}, timeout * 1000);

python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Python (em inglês).

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

def callback(message):
    print("Received message: {}".format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control
)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
    time.sleep(60)

ruby

Antes de testar esta amostra, siga as instruções de configuração do Ruby no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Ruby (em inglês).

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

De modo mais geral, a necessidade do controle de fluxo indica que as mensagens são publicadas com uma taxa maior do que são consumidas. Se esse for um estado constante, em vez de um pico transitório no volume de mensagens, pense em aumentar o número de instâncias do cliente assinante.

Controle de simultaneidade

A compatibilidade com a simultaneidade depende da linguagem de programação. Para implementações de linguagem compatíveis com linhas de execução paralelas, como Java e Go, as bibliotecas de cliente são a escolha padrão para o número de linhas de execução. Essa escolha pode não ser ideal para o aplicativo. Por exemplo, caso ache que o aplicativo assinante não está acompanhando o volume de mensagens recebidas nem está ligado à CPU, aumente a contagem de linhas de execução. Para operações de processamento de mensagens intensivas da CPU, diminua o número de linhas de execução.

A amostra seguir ilustra como controlar a simultaneidade em um assinante:

go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Go (em inglês).

import (
	"context"
	"fmt"
	"io"
	"runtime"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsConcurrenyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
	// concurrency settings. Otherwise, NumGoroutines will be set to 1.
	sub.ReceiveSettings.Synchronous = false
	// NumGoroutines is the number of goroutines sub.Receive will spawn to pull messages concurrently.
	sub.ReceiveSettings.NumGoroutines = runtime.NumCPU()

	// Receive messages for 10 seconds.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Create a channel to handle messages to as they come in.
	cm := make(chan *pubsub.Message)
	// Handle individual messages in a goroutine.
	go func() {
		for {
			select {
			case msg := <-cm:
				fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
				msg.Ack()
			case <-ctx.Done():
				return
			}
		}
	}()

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		cm <- msg
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	close(cm)

	return nil
}

java

Antes de testar esta amostra, siga as instruções de configuração do Java no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Java (em inglês).

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

ruby

Antes de testar esta amostra, siga as instruções de configuração do Ruby no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Ruby (em inglês).

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

A compatibilidade com a simultaneidade depende da linguagem de programação. Consulte a documentação de referência de APIs para mais informações.

StreamingPull

O serviço Pub/Sub tem duas APIs para recuperar mensagens:

Sempre que possível, as bibliotecas de cliente do Cloud usam StreamingPull (em inglês) para máxima capacidade e menor latência. Talvez você nunca use a API StreamingPull diretamente, mas é importante entender algumas propriedades cruciais dela e como ela é diferente do método pull tradicional.

O método pull depende de um modelo de solicitação/resposta:

  1. O aplicativo envia uma solicitação por mensagens.
  2. O servidor responde com zero ou mais mensagens e fecha a conexão.

A API do serviço StreamingPull depende de uma conexão bidirecional permanente para receber várias mensagens à medida que elas se tornam disponíveis, enviar confirmações e modificar prazos de confirmação:

  1. O cliente envia uma solicitação para o serviço a fim de estabelecer uma conexão.
  2. O cliente usa essa conexão para trocar dados de mensagens.
  3. A solicitação, ou seja, a conexão bidirecional, é encerrada pelo cliente ou pelo servidor.

StreamingPull tem uma taxa de erro de 100%, o que é esperado

Os streams do StreamingPull sempre são encerrados com um código de erro. Diferentemente do que ocorre em RPCs regulares, esse erro é apenas um indicativo de que o stream foi interrompido, não de que há falha nas solicitações. Portanto, ainda que a API StreamingPull tenha uma taxa de erro de 100% que pareça surpreendente, ela foi projetada dessa maneira.

Como diagnosticar erros do StreamingPull

Como os fluxos do StreamingPull sempre terminam com um erro, não adianta examinar as métricas de solicitação do StreamingPull para diagnosticar erros. Em vez disso, concentre-se nas métricas de operação de mensagens do StreamingPull. Procure por estes erros:

  • Erros FAILED_PRECONDITION podem ocorrer nos casos a seguir:
    • Pub/Sub tenta descriptografar uma mensagem com uma chave desativada do Cloud KMS.
    • O stream é encerrado devido a uma assinatura suspensa. As assinaturas podem estar temporariamente suspensas porque há mensagens no backlog de assinaturas que são protegidas por uma chave desativada do Cloud KMS.
  • Erros UNAVAILABLE

StreamingPull: como lidar com grandes backlogs de pequenas mensagens

A pilha gRPC StreamingPull está otimizada para alta capacidade e, portanto, armazena mensagens em buffers. Isso pode ter algumas consequências se você está tentando processar grandes backlogs de pequenas mensagens, em vez de um stream estável de novas mensagens. Nessas condições, você pode ver mensagens entregues várias vezes, e o balanço de carga delas pode não ser realizado com eficácia entre os clientes.

O buffer entre o serviço Pub/Sub e o espaço de usuário da biblioteca de cliente é de aproximadamente 10 MB. Para entender o impacto desse buffer no comportamento da biblioteca de cliente, analise este exemplo:

  • Há um backlog de 10.000 mensagens de 1 KB em uma assinatura.
  • Cada mensagem leva um segundo para ser processada sequencialmente por uma instância de cliente com um único thread.
  • Para estabelecer uma conexão do StreamingPull com o serviço para essa assinatura, a primeira instância de cliente receberá um buffer com todas as 10.000 mensagens.
  • Leva 10.000 segundos (quase três horas) para processar o buffer.
  • Nesse período, algumas das mensagens excedem o prazo de confirmação e são reenviadas para o mesmo cliente, resultando em duplicatas.
  • Quando várias instâncias de cliente estão em execução, as mensagens presas no buffer não estarão disponíveis em nenhuma instância além da primeira.

Essa situação não ocorrerá se as mensagens chegarem a uma taxa estável, e não como um único lote grande. O serviço nunca tem todos os 10 MB de mensagens de uma só vez, por isso o balanceamento de carga das mensagens pode ser realizado com eficácia em vários assinantes.

Para lidar com a situação, use uma assinatura de push ou uma API de pull, atualmente disponível em algumas das bibliotecas de cliente do Cloud (consulte a seção Pull síncrono) e em todas as bibliotecas de cliente da API. Para saber mais, consulte a documentação das bibliotecas de cliente.

Pull síncrono

Há casos em que o pull assíncrono não é adequado para seu aplicativo. Por exemplo, a lógica do aplicativo pode depender de um padrão de pesquisa para recuperar mensagens ou exigir um limite preciso em um número de mensagens recuperadas pelo cliente a qualquer momento. A compatibilidade do serviço com esses aplicativos depende do método de pull síncrono.

Veja a seguir um código de amostra para enviar por pull e confirmar um número fixo de mensagens:

C#

Antes de testar esta amostra, siga as instruções de configuração do C# no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para C# (em inglês).

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient =
    SubscriberServiceApiClient.Create();
// Pull messages from server,
// allowing an immediate response if there are no messages.
PullResponse response = subscriberClient.Pull(
    subscriptionName, returnImmediately: true, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
    string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
    Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
}
// If acknowledgement required, send to server.
if (acknowledge)
{
    subscriberClient.Acknowledge(subscriptionName,
        response.ReceivedMessages.Select(msg => msg.AckId));
}

java

Antes de testar esta amostra, siga as instruções de configuração do Java no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Java (em inglês).

SubscriberStubSettings subscriberStubSettings =
    SubscriberStubSettings.newBuilder()
        .setTransportChannelProvider(
            SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                .setMaxInboundMessageSize(20 << 20) // 20MB
                .build())
        .build();

try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

php

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Node.js (em inglês).

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

protocolo

Solicitação:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

{
  "returnImmediately": "false",
  "maxMessages": "1"
}

Resposta:

200 OK

{
  "receivedMessages": [{
    "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
    "message": {
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
      "messageId": "19917247034"
    }
  }]
}

Solicitação:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Python (em inglês).

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

NUM_MESSAGES = 3

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

ack_ids = []
for received_message in response.received_messages:
    print("Received: {}".format(received_message.message.data))
    ack_ids.append(received_message.ack_id)

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)

print(
    "Received and acknowledged {} messages. Done.".format(
        len(response.received_messages)
    )
)

ruby

Antes de testar esta amostra, siga as instruções de configuração do Ruby no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Ruby (em inglês).

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

Para conseguir baixa latência de entrega de mensagens com pull síncrono, é importante ter muitas solicitações de pull pendentes ao mesmo tempo. À medida que a taxa de transferência do tópico aumenta, mais solicitações de pull são necessárias. Em geral, o pull assíncrono é indicado para aplicativos sensíveis à latência.

Pull síncrono com gerenciamento de lease

O processamento de uma mensagem individual pode exceder o prazo de confirmação pré-configurado, também conhecido como lease. Para evitar que essas mensagens sejam enviadas novamente, as bibliotecas de cliente permitem redefinir prazos de confirmação (exceto a biblioteca de cliente Go, que modifica automaticamente os prazos de confirmação de mensagens pesquisadas), conforme mostrado nas amostras abaixo:

go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Go (em inglês).

import (
	"context"
	"fmt"
	"io"
	"time"

	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"cloud.google.com/go/pubsub"
)

func pullMsgsSync(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Turn on synchronous mode. This makes the subscriber use the Pull RPC rather
	// than the StreamingPull RPC, which is useful for guaranteeing MaxOutstandingMessages,
	// the max number of messages the client will hold in memory at a time.
	sub.ReceiveSettings.Synchronous = true
	sub.ReceiveSettings.MaxOutstandingMessages = 10

	// Receive messages for 5 seconds.
	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	// Create a channel to handle messages to as they come in.
	cm := make(chan *pubsub.Message)
	// Handle individual messages in a goroutine.
	go func() {
		for {
			select {
			case msg := <-cm:
				fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
				msg.Ack()
			case <-ctx.Done():
				return
			}
		}
	}()

	// Receive blocks until the passed in context is done.
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		cm <- msg
	})
	if err != nil && status.Code(err) != codes.Canceled {
		return fmt.Errorf("Receive: %v", err)
	}
	close(cm)

	return nil
}

node.js

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Node.js (em inglês).

// Imports the Google Cloud client library
const pubsub = require('@google-cloud/pubsub');

const client = new pubsub.v1.SubscriberClient();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectName = 'your-project';
// const subscriptionName = 'your-subscription';

const formattedSubscription = client.subscriptionPath(
  projectName,
  subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
  subscription: formattedSubscription,
  maxMessages: maxMessages,
};

let isProcessed = false;

// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
  console.log(`Processing "${message.message.data}"...`);

  setTimeout(() => {
    console.log(`Finished procesing "${message.message.data}".`);
    isProcessed = true;
  }, 30000);
}

// The subscriber pulls a specified number of messages.
const [response] = await client.pull(request);
// Obtain the first message.
const message = response.receivedMessages[0];
// Send the message to the worker function.
worker(message);

let waiting = true;
while (waiting) {
  await new Promise(r => setTimeout(r, 10000));
  // If the message has been processed..
  if (isProcessed) {
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
    };

    //..acknowledges the message.
    await client.acknowledge(ackRequest);
    console.log(`Acknowledged: "${message.message.data}".`);
    // Exit after the message is acknowledged.
    waiting = false;
    console.log(`Done.`);
  } else {
    // If the message is not yet processed..
    const modifyAckRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
      ackDeadlineSeconds: newAckDeadlineSeconds,
    };

    //..reset its ack deadline.
    await client.modifyAckDeadline(modifyAckRequest);

    console.log(
      `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
    );
  }
}

python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub para Python (em inglês).

import logging
import multiprocessing
import random
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name
)

NUM_MESSAGES = 2
ACK_DEADLINE = 30
SLEEP_TIME = 10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
    """Simulates a long-running process."""
    RUN_TIME = random.randint(1, 60)
    logger.info(
        "{}: Running {} for {}s".format(
            time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME
        )
    )
    time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
    process = multiprocessing.Process(target=worker, args=(message,))
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is still running, reset the ack deadline as
        # specified by ACK_DEADLINE once every while as specified
        # by SLEEP_TIME.
        if process.is_alive():
            # `ack_deadline_seconds` must be between 10 to 600.
            subscriber.modify_ack_deadline(
                subscription_path,
                [ack_id],
                ack_deadline_seconds=ACK_DEADLINE,
            )
            logger.info(
                "{}: Reset ack deadline for {} for {}s".format(
                    time.strftime("%X", time.gmtime()),
                    msg_data,
                    ACK_DEADLINE,
                )
            )

        # If the processs is finished, acknowledges using `ack_id`.
        else:
            subscriber.acknowledge(subscription_path, [ack_id])
            logger.info(
                "{}: Acknowledged {}".format(
                    time.strftime("%X", time.gmtime()), msg_data
                )
            )
            processes.pop(process)

    # If there are still processes running, sleeps the thread.
    if processes:
        time.sleep(SLEEP_TIME)

print(
    "Received and acknowledged {} messages. Done.".format(
        len(response.received_messages)
    )
)

Escalonamento

Pode ser necessário implementar um mecanismo de escalonamento no seu aplicativo de assinante para acompanhar o volume das mensagens. A maneira de fazer isso depende do seu ambiente, mas geralmente ela se baseia nas métricas de backlog oferecidas pelo serviço de monitoramento do Stackdriver. Para ver detalhes sobre como fazer isso para o Compute Engine, consulte Como escalonar com base em métricas do Cloud Monitoring.

Acesse a seção Pub/Sub da página lista de métricas do GCP para saber quais métricas podem ser monitoradas programaticamente.

Assim como ocorre com qualquer serviço distribuído, de vez em quando você terá de repetir cada solicitação.

Como lidar com duplicatas e forçar novas tentativas

Quando você não confirma uma mensagem antes do vencimento do prazo de confirmação, o Pub/Sub reenvia a mensagem. Como resultado, o Pub/Sub pode enviar mensagens duplicadas. Use o Stackdriver para monitorar operações de confirmação com o código de resposta expired para detectar essa condição. Para conseguir esses dados, selecione a métrica Confirmar operações de mensagens e agrupe ou filtre pelo rótulo response_code. Observe que response_code é um rótulo do sistema em uma métrica. Ele não é uma métrica.

Use o Stackdriver para pesquisar prazos de confirmação de mensagens vencidos

Para reduzir a taxa de duplicação, prolongue o prazo da mensagem.

  • As bibliotecas de cliente prolongam o prazo automaticamente, mas há limites padrão de quanto o prazo pode ser prolongado.
  • Use o método modifyAckDeadline para estender o prazo de confirmação se você estiver criando sua própria biblioteca de cliente.

Como alternativa, para forçar o Pub/Suba repetir uma mensagem, defina modifyAckDeadline para 0.