Como receber mensagens usando pull

O Cloud Pub/Sub é compatível com entrega de mensagens por push e pull. Para uma visão geral e comparação das assinaturas de pull e de push, consulte a Visão geral do assinante. Este documento descreve a entrega por pull. Para ver uma discussão da entrega por push, consulte o Guia do assinante de push.

Pull assíncrono

O uso do pull assíncrono oferece uma maior capacidade em seu aplicativo, não exigindo que ele bloqueie novas mensagens. As mensagens podem ser recebidas no aplicativo usando-se um detector de mensagens de longa duração e reconhecidas uma de cada vez, conforme mostrado no exemplo abaixo. Os clientes Java, Python, .NET, Go e Ruby usam a API de serviço streamingPull para implementar a API do cliente assíncrona com eficiência.

Nem todas as bibliotecas de cliente são compatíveis com mensagens de pull de maneira assíncrona. Para saber mais sobre mensagens de pull síncrono, consulte Pull síncrono.

Para ver mais informações, consulte a documentação de referência da API da sua linguagem de programação.

C#

Antes de testar este exemplo, siga as instruções de configuração do C# no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName);
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

Antes de testar este exemplo, siga as instruções de configuração do Go no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Go (em inglês).

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subName)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	msg.Ack()
	fmt.Printf("Got message: %q\n", string(msg.Data))
	mu.Lock()
	defer mu.Unlock()
	received++
	if received == 10 {
		cancel()
	}
})
if err != nil {
	return err
}

Java

Antes de testar essa amostra, siga as instruções de configuração do Java no guia de início rápido do Cloud Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API do Cloud Pub/Sub para Java.

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

ProjectSubscriptionName subscriptionName =
    ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

Antes de testar este exemplo, siga as instruções de configuração do Node.js no Guia de início rápido do Cloud Pub/Sub: como usar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Node.js.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

Python

Antes de testar este exemplo, siga as instruções de configuração do Python no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Python.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Como processar atributos personalizados

Nesta amostra, você verá como enviar mensagens pull de maneira assíncrona e recuperar os atributos personalizados dos metadados:

Python

Antes de testar este exemplo, siga as instruções de configuração do Python no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Python.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

Antes de testar este exemplo, siga as instruções de configuração do Ruby no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Como detectar erros

Esta amostra exibe como lidar com erros que ocorrem durante a assinatura nas mensagens:

Go

Antes de testar este exemplo, siga as instruções de configuração do Go no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Go (em inglês).

// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err := client.Subscription(subName).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

Antes de testar essa amostra, siga as instruções de configuração do Java no guia de início rápido do Cloud Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API do Cloud Pub/Sub para Java.

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

Node.js

Antes de testar este exemplo, siga as instruções de configuração do Node.js no Guia de início rápido do Cloud Pub/Sub: como usar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Node.js.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function(message) {
  // Do something with the message
  console.log(`Message: ${message}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Create an event handler to handle errors
const errorHandler = function(error) {
  // Do something with the error
  console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);

setTimeout(() => {
  subscription.removeListener(`message`, messageHandler);
  subscription.removeListener(`error`, errorHandler);
}, timeout * 1000);

Python

Antes de testar este exemplo, siga as instruções de configuração do Python no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Python.

from google.cloud import pubsub_v1

# TODO project           = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pubsub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
try:
    # When timeout is unspecified, the result method waits indefinitely.
    future.result(timeout=30)
except Exception as e:
    print(
        'Listening for messages on {} threw an Exception: {}.'.format(
            subscription_name, e))

Ruby

Antes de testar este exemplo, siga as instruções de configuração do Ruby no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => ex
  puts "Exception #{ex.inspect}: #{ex.message}"
  raise "Stopped listening for messages."
end

Controle de fluxo de mensagens

O cliente assinante pode processar e reconhecer as mensagens mais lentamente do que o Cloud Pub/Sub as envia. Neste caso:

  • É possível que um cliente tenha um backlog de mensagens por não ter capacidade para processar o volume de mensagens recebidas, mas outro cliente na rede tem. Este pode reduzir o backlog da assinatura, mas não consegue fazê-lo porque o primeiro cliente mantém um lease nas mensagens recebidas. Isso reduz a taxa de processamento geral, porque as mensagens ficam presas no primeiro cliente.
  • Como a biblioteca de cliente estende repetidamente o prazo de confirmação para as mensagens acumuladas, elas continuam a consumir recursos de memória, CPU e largura de banda. Dessa forma, o cliente assinante pode ficar sem recursos (como memória). Isso pode afetar negativamente a capacidade e a latência das mensagens de processamento.

Para atenuar os problemas acima, controle a taxa com que o assinante recebe as mensagens com os recursos de controle de fluxo do assinante. Esses recursos são ilustrados nas amostras a seguir:

C#

Antes de testar este exemplo, siga as instruções de configuração do C# no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName,
    settings: new SubscriberClient.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        AckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandingByteCount: 10240)
    });
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

Antes de testar este exemplo, siga as instruções de configuração do Go no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Go (em inglês).

sub := client.Subscription(subName)
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

Antes de testar essa amostra, siga as instruções de configuração do Java no guia de início rápido do Cloud Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API do Cloud Pub/Sub para Java.

FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder()
        .setMaxOutstandingElementCount(10_000L)
        .setMaxOutstandingRequestBytes(1_000_000_000L)
        .build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

Node.js

Antes de testar este exemplo, siga as instruções de configuração do Node.js no Guia de início rápido do Cloud Pub/Sub: como usar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Node.js.

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const subscriptionName = 'my-sub';
// const maxInProgress = 5;
// const maxBytes = 10000;

const topic = pubsub.topic(topicName);

const options = {
  flowControl: {
    maxBytes: maxBytes,
    maxMessages: maxInProgress,
  },
};

const subscription = topic.subscription(subscriptionName, options);

// Creates a new subscription
// Note that flow control configurations are not persistent
const [newSubscription] = await subscription.get({
  autoCreate: true,
});
console.log(
  `Subscription ${
    newSubscription.name
  } created with a maximum of ${maxInProgress} unprocessed messages.`
);

Python

Antes de testar este exemplo, siga as instruções de configuração do Python no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Python.

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

Antes de testar este exemplo, siga as instruções de configuração do Ruby no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
De modo mais geral, a necessidade do controle de fluxo indica que as mensagens são publicadas com uma taxa maior do que são consumidas. Se esse for um estado constante, em vez de um pico transitório no volume de mensagens, pense em aumentar o número de instâncias do cliente assinante.

Controle de simultaneidade

A compatibilidade com a simultaneidade depende da linguagem de programação. Para implementações de linguagem que aceitem linhas de execução paralelas, como Java e Go, as bibliotecas de cliente fazem uma escolha padrão do número de linhas de execução. Essa escolha pode não ser ideal para o aplicativo. Por exemplo, caso ache que o aplicativo assinante não está acompanhando o volume de mensagens recebidas nem está ligado à CPU, aumente a contagem de linhas de execução. Para operações de processamento de mensagens intensivas da CPU, diminua o número de linhas de execução.

O exemplo a seguir ilustra como controlar a simultaneidade em um assinante:

Java

Antes de testar essa amostra, siga as instruções de configuração do Java no guia de início rápido do Cloud Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API do Cloud Pub/Sub para Java.

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

Ruby

Antes de testar este exemplo, siga as instruções de configuração do Ruby no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

A compatibilidade com a simultaneidade depende da linguagem de programação. Consulte a documentação de referência da API para mais informações.

StreamingPull

O serviço Cloud Pub/Sub tem duas APIs para recuperar mensagens:

Sempre que possível, as bibliotecas de cliente do Cloud usam o StreamingPull, um RPC de streaming bidirecional, para máxima capacidade e menor latência. Talvez você nunca use a API StreamingPull diretamente, mas é importante entender algumas propriedades cruciais dela e como ela é diferente do método pull tradicional.

O método pull depende de um modelo de solicitação/resposta:

  1. O aplicativo envia uma solicitação por mensagens.
  2. O servidor responde com zero ou mais mensagens e fecha a conexão.

A API do serviço StreamingPull depende de uma conexão bidirecional permanente para receber várias mensagens à medida que elas se tornam disponíveis, enviar confirmações e modificar prazos de confirmação:

  1. O cliente envia uma solicitação para o serviço a fim de estabelecer uma conexão.
  2. O cliente usa essa conexão para trocar dados de mensagens.
  3. A solicitação, ou seja, a conexão bidirecional, é encerrada pelo cliente ou pelo servidor.

StreamingPull tem uma taxa de erro de 100%, o que é esperado

Os streams do StreamingPull sempre são encerrados com um código de erro recuperável. Diferentemente do que ocorre em RPCs regulares, esse erro é apenas um indicativo de que o stream foi interrompido, não de que há falha nas solicitações. Portanto, ainda que a API StreamingPull tenha uma taxa de erro de 100% que pareça surpreendente, ela foi projetada dessa maneira. Para diagnosticar erros do StreamingPull, recomendamos concentrar-se nas métricas das operações de mensagens do StreamingPull, não nas métricas das solicitações dele.

StreamingPull: como lidar com grandes backlogs de pequenas mensagens

A pilha gRPC StreamingPull está otimizada para alta capacidade e, portanto, armazena mensagens em buffers. Isso pode ter algumas consequências se você está tentando processar grandes backlogs de pequenas mensagens, em vez de um stream estável de novas mensagens. Nessas condições, você pode ver mensagens entregues várias vezes, e o balanço de carga delas pode não ser realizado com eficácia entre os clientes.

O buffer entre o serviço Cloud Pub/Sub e o espaço de usuário da biblioteca de cliente é de aproximadamente 10 MB. Para entender o impacto desse buffer no comportamento da biblioteca de cliente, analise este exemplo:

  • Há um backlog de 10.000 mensagens de 1 KB em uma assinatura.
  • Cada mensagem leva um segundo para ser processada sequencialmente por uma instância de cliente com um único thread.
  • Para estabelecer uma conexão do StreamingPull com o serviço para essa assinatura, a primeira instância de cliente receberá um buffer com todas as 10.000 mensagens.
  • Leva 10.000 segundos (quase três horas) para processar o buffer.
  • Nesse período, algumas das mensagens excedem o prazo de confirmação e são reenviadas para o mesmo cliente, resultando em duplicatas.
  • Quando várias instâncias de cliente estão em execução, as mensagens presas no buffer não estarão disponíveis em nenhuma instância além da primeira.

Essa situação não ocorrerá se as mensagens chegarem a uma taxa estável, e não como um único lote grande. O serviço nunca tem todos os 10 MB de mensagens de uma só vez, por isso o balanceamento de carga das mensagens pode ser realizado com eficácia em vários assinantes.

Para lidar com a situação, use uma assinatura de push ou uma API de pull, atualmente disponível em algumas das bibliotecas de cliente do Cloud (consulte a seção Pull síncrono) e em todas as bibliotecas de cliente da API. Para saber mais, consulte a documentação das bibliotecas de cliente.

Pull síncrono

Há casos em que o pull assíncrono não é adequado para seu aplicativo. Por exemplo, a lógica do aplicativo pode depender de um padrão de pesquisa para recuperar mensagens ou exigir um limite preciso em um número de mensagens recuperadas pelo cliente a qualquer momento. A compatibilidade do serviço e da maioria das bibliotecas de cliente com esses aplicativos depende do método de pull síncrono.

Veja um exemplo de código para enviar por pull e confirmar um número fixo de mensagens:

Protocolo

Solicitação:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
  "returnImmediately": "false",
  "maxMessages": "1"
}

Resposta:

200 OK
{
  "receivedMessages": [
    {
      "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
      "message": {
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "19917247034"
      }
    }
  ]
}

Solicitação:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

Resposta:

200 OK

C#

Antes de testar este exemplo, siga as instruções de configuração do C# no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para C#.

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient =
    SubscriberServiceApiClient.Create();
// Pull messages from server,
// allowing an immediate response if there are no messages.
PullResponse response = subscriberClient.Pull(
    subscriptionName, returnImmediately: true, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
    string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
    Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
}
// If acknowledgement required, send to server.
if (acknowledge)
{
    subscriberClient.Acknowledge(subscriptionName,
        response.ReceivedMessages.Select(msg => msg.AckId));
}

Java

Antes de testar essa amostra, siga as instruções de configuração do Java no guia de início rápido do Cloud Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API do Cloud Pub/Sub para Java.

SubscriberStubSettings subscriberStubSettings =
    SubscriberStubSettings.newBuilder()
        .setTransportChannelProvider(
            SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                .setMaxInboundMessageSize(20 << 20) // 20MB
                .build())
        .build();

try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

Node.js

Antes de testar este exemplo, siga as instruções de configuração do Node.js no Guia de início rápido do Cloud Pub/Sub: como usar as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Node.js.

// Imports the Google Cloud client library
const pubsub = require('@google-cloud/pubsub');

const client = new pubsub.v1.SubscriberClient();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectName = 'your-project';
// const subscriptionName = 'your-subscription';

const formattedSubscription = client.subscriptionPath(
  projectName,
  subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
  subscription: formattedSubscription,
  maxMessages: maxMessages,
};

let isProcessed = false;

// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
  console.log(`Processing "${message.message.data}"...`);

  setTimeout(() => {
    console.log(`Finished procesing "${message.message.data}".`);
    isProcessed = true;
  }, 30000);
}

// The subscriber pulls a specified number of messages.
const [response] = await client.pull(request);
// Obtain the first message.
const message = response.receivedMessages[0];
// Send the message to the worker function.
worker(message);

let waiting = true;
while (waiting) {
  await new Promise(r => setTimeout(r, 10000));
  // If the message has been processed..
  if (isProcessed) {
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
    };

    //..acknowledges the message.
    await client.acknowledge(ackRequest);
    console.log(`Acknowledged: "${message.message.data}".`);
    // Exit after the message is acknowledged.
    waiting = false;
    console.log(`Done.`);
  } else {
    // If the message is not yet processed..
    const modifyAckRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
      ackDeadlineSeconds: newAckDeadlineSeconds,
    };

    //..reset its ack deadline.
    await client.modifyAckDeadline(modifyAckRequest);

    console.log(
      `Reset ack deadline for "${
        message.message.data
      }" for ${newAckDeadlineSeconds}s.`
    );
  }
}

PHP

Antes de testar este exemplo, siga as instruções de configuração do PHP no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

Antes de testar este exemplo, siga as instruções de configuração do Python no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Python.

import logging
import multiprocessing
import random
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

NUM_MESSAGES = 2
ACK_DEADLINE = 30
SLEEP_TIME = 10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
    """Simulates a long-running process."""
    RUN_TIME = random.randint(1, 60)
    logger.info('{}: Running {} for {}s'.format(
        time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
    time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
    process = multiprocessing.Process(target=worker, args=(message,))
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is still running, reset the ack deadline as
        # specified by ACK_DEADLINE once every while as specified
        # by SLEEP_TIME.
        if process.is_alive():
            # `ack_deadline_seconds` must be between 10 to 600.
            subscriber.modify_ack_deadline(
                subscription_path,
                [ack_id],
                ack_deadline_seconds=ACK_DEADLINE)
            logger.info('{}: Reset ack deadline for {} for {}s'.format(
                time.strftime("%X", time.gmtime()),
                msg_data, ACK_DEADLINE))

        # If the processs is finished, acknowledges using `ack_id`.
        else:
            subscriber.acknowledge(subscription_path, [ack_id])
            logger.info("{}: Acknowledged {}".format(
                time.strftime("%X", time.gmtime()), msg_data))
            processes.pop(process)

    # If there are still processes running, sleeps the thread.
    if processes:
        time.sleep(SLEEP_TIME)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))

Ruby

Antes de testar este exemplo, siga as instruções de configuração do Ruby no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Ruby.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

Para conseguir baixa latência de entrega de mensagens com pull síncrono, é importante ter muitas solicitações de pull pendentes ao mesmo tempo. À medida que a taxa de transferência do tópico aumenta, mais solicitações de pull são necessárias. Em geral, o pull assíncrono é indicado para aplicativos sensíveis à latência.

Nem todas as bibliotecas de cliente são compatíveis com um número fixo de mensagens de pull síncrono. Consulte a documentação de referência do API da linguagem de programação escolhida para ver mais detalhes.

Escalonamento

Pode ser necessário implementar um mecanismo de escalonamento no seu aplicativo de assinante para acompanhar o volume das mensagens. A maneira de fazer isso depende do seu ambiente, mas geralmente ela se baseia nas métricas de backlog oferecidas pelo serviço do Stackdriver Monitoring. Para ver detalhes sobre como fazer isso para o Compute Engine, consulte Como escalonar com base em métricas do Cloud Monitoring.

Pesquise "PubSub" na página Métricas compatíveis do Cloud Monitoring para ver quais métricas podem ser monitoradas de modo programático.

Assim como ocorre com qualquer serviço distribuído, de vez em quando você terá de repetir cada solicitação.

Como lidar com mensagens duplicadas

O Cloud Pub/Sub pode enviar mensagens duplicadas. Por exemplo, quando você não confirma uma mensagem antes do prazo de confirmação, o Cloud Pub/Sub reenvia a mensagem. Use o Stackdriver para monitorar operações de confirmação com o código de resposta expired para detectar essa condição. Uma maneira de conseguir esses dados é usando a métrica Operações de mensagem de confirmação, agrupada pelo response_code.

Use o Stackdriver para pesquisar prazos de confirmação de mensagens vencidos

Para reduzir a taxa de duplicação, prolongue o prazo da mensagem:

  • As bibliotecas de cliente prolongam o prazo automaticamente, mas há limites padrão de quanto o prazo pode ser prolongado.
  • Se estiver criando uma biblioteca de cliente própria, use o método modifyAckDeadline para prolongar o prazo de confirmação.
Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…