Como publicar mensagens

Neste documento, você verá informações sobre a publicação de mensagens. Para informações sobre como criar, excluir e administrar tópicos e assinaturas, consulte Como gerenciar tópicos e assinaturas. Para mais informações sobre como receber mensagens, consulte o Guia do assinante.

Um aplicativo do editor cria e envia mensagens para um tópico. O Cloud Pub/Sub oferece uma entrega de mensagens pelo menos uma vez e tenta ao máximo entregar as mensagens em ordem para os assinantes atuais, conforme explicado na Visão geral do assinante.

Este é o fluxo geral de um aplicativo do editor:

  1. Criar uma mensagem contendo seus dados.
  2. Enviar uma solicitação ao servidor do Cloud Pub/Sub para publicar a mensagem no tópico pretendido.

Configurar

Consulte o Guia de primeiros passos das bibliotecas de cliente para configurar seu ambiente na linguagem de programação que preferir.

Publicar mensagens em um tópico

Para usar JSON com REST, é necessário que os dados da mensagem sejam codificados em base64. Toda a solicitação, incluindo uma ou mais mensagens, precisa ter menos do que 10 MB após a decodificação. Observe que o payload da mensagem não pode estar vazio. Ele precisa conter um campo de dados não vazio ou pelo menos um atributo.

Dependendo da escolha da linguagem de programação, as bibliotecas de cliente podem publicar mensagens de maneira síncrona ou assíncrona. A publicação assíncrona permite o processamento por lotes e uma maior capacidade no aplicativo.

Todas as bibliotecas de cliente são compatíveis com a publicação de mensagens de maneira assíncrona. Consulte a documentação de referência de APIs da linguagem de programação escolhida para conferir se a biblioteca de cliente também é compatível com a publicação de mensagens de maneira síncrona, se essa for sua opção preferida.

Um código gerado pelo servidor, exclusivo dentro do tópico, é retornado quando uma mensagem é publicada.

Protocolo

Solicitação:

POST     https://pubsub.googleapis.com/v1/projects/myproject/topics/mytopic:publish
{
  "messages": [
    {
      "attributes": {
        "key": "iana.org/language_tag",
        "value": "en"
      },
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ=="
    }
  ]
}
Saída:
200 OK
{
  "messageIds": [
    "19916711285"
  ]
}

linha de comando

gcloud pubsub topics publish my-topic --message "hello"

C#

PublisherClient publisher = await PublisherClient.CreateAsync(
    new TopicName(projectId, topicId));

go

var wg sync.WaitGroup
var totalErrors uint64
t := client.Topic(topic)

for i := 0; i < n; i++ {
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Message " + strconv.Itoa(i)),
	})

	wg.Add(1)
	go func(i int, res *pubsub.PublishResult) {
		defer wg.Done()
		// The Get method blocks until a server-generated ID or
		// an error is returned for the published message.
		id, err := res.Get(ctx)
		if err != nil {
			// Error handling code can be added here.
			log.Output(1, fmt.Sprintf("Failed to publish: %v", err))
			atomic.AddUint64(&totalErrors, 1)
			return
		}
		fmt.Printf("Published message %d; msg ID: %v\n", i, id)
	}(i, result)
}

wg.Wait()

if totalErrors > 0 {
	return errors.New(
		fmt.Sprintf("%d of %d messages did not publish successfully",
			totalErrors, n))
}
return nil

java

ProjectTopicName topicName = ProjectTopicName.of("my-project-id", "my-topic-id");
Publisher publisher = null;

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.newBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  for (final String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Add an asynchronous callback to handle success / failure
    ApiFutures.addCallback(
        future,
        new ApiFutureCallback<String>() {

          @Override
          public void onFailure(Throwable throwable) {
            if (throwable instanceof ApiException) {
              ApiException apiException = ((ApiException) throwable);
              // details on the API exception
              System.out.println(apiException.getStatusCode().getCode());
              System.out.println(apiException.isRetryable());
            }
            System.out.println("Error publishing message : " + message);
          }

          @Override
          public void onSuccess(String messageId) {
            // Once published, returns server-assigned message ids (unique within the topic)
            System.out.println(messageId);
          }
        },
        MoreExecutors.directExecutor());
  }
} finally {
  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
  }
}

node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubsub.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);

php

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

python

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # When you publish a message, the client returns a Future.
    message_future = publisher.publish(topic_path, data=data)
    message_future.add_done_callback(callback)

print('Published message IDs:')

# We must keep the main thread from exiting to allow it to process
# messages in the background.
while True:
    time.sleep(60)

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Novos atributos personalizados

É possível incorporar atributos personalizados, como metadados, nas mensagens do Pub/Sub. Os atributos podem ser strings de texto ou de bytes. O esquema da mensagem pode ser representado da seguinte forma:

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string
}

O esquema JSON do PubsubMessage é publicado como parte da documentação REST e RPC.

go

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte("Hello world!"),
	Attributes: map[string]string{
		"origin":   "golang",
		"username": "gcp",
	},
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published message with custom attributes; msg ID: %v\n", id)

node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Add two custom attributes, origin and username, to the message
const customAttributes = {
  origin: 'nodejs-sample',
  username: 'gcp',
};

const messageId = await pubsub
  .topic(topicName)
  .publish(dataBuffer, customAttributes);
console.log(`Message ${messageId} published.`);

python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # Add two attributes, origin and username, to the message
    publisher.publish(
        topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
                    origin:   "ruby-sample",
                    username: "gcp" do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message with custom attributes published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Como agrupar em lote para equilibrar a latência e a capacidade

As bibliotecas de clientes do Cloud Pub/Sub agrupam várias mensagens em uma única chamada. Lotes maiores aumentam a capacidade das mensagens, ou seja, a taxa de mensagens enviadas por CPU. O custo do agrupamento é a latência das mensagens individuais, que são enfileiradas na memória até que o lote correspondente seja completamente preenchido e esteja pronto para ser enviado pela rede. Para minimizar a latência, desative o agrupamento. Isso é particularmente importante para aplicativos que publicam uma única mensagem como parte de uma sequência de solicitação e resposta. Um exemplo comum desse padrão pode ser observado em aplicativos sem servidor e orientados a eventos que usam o Cloud Functions ou o App Engine.

As mensagens podem ser agrupadas com base no tamanho da solicitação em bytes, na quantidade de mensagens e no tempo. É possível modificar as configurações padrão como mostrado no exemplo abaixo:

C#

PublisherClient publisher = await PublisherClient.CreateAsync(
    new TopicName(projectId, topicId),
    settings: new PublisherClient.Settings
    {
        BatchingSettings = new Google.Api.Gax.BatchingSettings(
            elementCountThreshold: 100,
            byteCountThreshold: 10240,
            delayThreshold: TimeSpan.FromSeconds(3))
    });
// PublisherClient collects messages into appropriately sized
// batches.
var publishTasks =
    messageTexts.Select(text => publisher.PublishAsync(text));
foreach (Task<string> task in publishTasks)
{
    string message = await task;
    await Console.Out.WriteLineAsync($"Published message {message}");
}

go

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	ByteThreshold:  5000,
	CountThreshold: 10,
	DelayThreshold: 100 * time.Millisecond,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

java

// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1 byte
long messageCountBatchSize = 10L; // default : 1 message

Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

// Publish request get triggered based on request size, messages count & time since last publish
BatchingSettings batchingSettings =
    BatchingSettings.newBuilder()
        .setElementCountThreshold(messageCountBatchSize)
        .setRequestByteThreshold(requestBytesThreshold)
        .setDelayThreshold(publishDelayThreshold)
        .build();

Publisher publisher =
    Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const [messageId] = await pubsub
  .topic(topicName, {
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })
  .publish(dataBuffer);
console.log(`Message ${messageId} published.`);

python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_bytes=1024,  # One kilobyte
    max_latency=1,  # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    publisher.publish(topic_path, data=data)

print('Published messages.')

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish do |batch|
  10.times do |i|
    batch.publish "This is message \##{i}."
  end
end

puts "Messages published in batch."
# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_name, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
10.times do |i|
  topic.publish_async "This is message \##{i}."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."

Como reenviar solicitações

As publicações com falha são automaticamente reenviadas, com a exceção de erros que não possibilitem novas tentativas. Este exemplo de código demonstra a criação de um editor com configurações de reenvio personalizadas. Nem todas as bibliotecas de cliente são compatíveis com essas configurações, por isso consulte a documentação de referência de APIs para a linguagem escolhida.

java

// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(5); // default: 5 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 2.0
Duration maxRetryDelay = Duration.ofSeconds(600); // default : Long.MAX_VALUE
Duration totalTimeout = Duration.ofSeconds(10); // default: 10 seconds
Duration initialRpcTimeout = Duration.ofSeconds(10); // default: 10 seconds
Duration maxRpcTimeout = Duration.ofSeconds(10); // default: 10 seconds

RetrySettings retrySettings =
    RetrySettings.newBuilder()
        .setInitialRetryDelay(retryDelay)
        .setRetryDelayMultiplier(retryDelayMultiplier)
        .setMaxRetryDelay(maxRetryDelay)
        .setTotalTimeout(totalTimeout)
        .setInitialRpcTimeout(initialRpcTimeout)
        .setMaxRpcTimeout(maxRpcTimeout)
        .build();

Publisher publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

node.js

// Imports the Google Cloud client library
const {v1} = require('@google-cloud/pubsub');

// Creates a publisher client
const client = new v1.PublisherClient({
  // optional auth parameters
});

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectId = 'my-project-id'
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

const formattedTopic = client.topicPath(projectId, topicName);
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messagesElement = {
  data: dataBuffer,
};
const messages = [messagesElement];
// Build the request
const request = {
  topic: formattedTopic,
  messages: messages,
};

// Retry settings control how the publisher handles retryable failures
// Default values are shown
const retrySettings = {
  retryCodes: [
    10, // 'ABORTED'
    1, // 'CANCELLED',
    4, // 'DEADLINE_EXCEEDED'
    13, // 'INTERNAL'
    8, // 'RESOURCE_EXHAUSTED'
    14, // 'UNAVAILABLE'
    2, // 'UNKNOWN'
  ],
  backoffSettings: {
    initialRetryDelayMillis: 100,
    retryDelayMultiplier: 1.3,
    maxRetryDelayMillis: 60000,
    initialRpcTimeoutMillis: 12000,
    rpcTimeoutMultiplier: 1.0,
    maxRpcTimeoutMillis: 30000,
    totalTimeoutMillis: 600000,
  },
};

const [response] = await client.publish(request, {retry: retrySettings});
console.log(`Message ${response.messageIds} published.`);

Controle de simultaneidade

A compatibilidade com a simultaneidade depende da linguagem de programação. Consulte a documentação de referência de APIs para mais informações.

O exemplo a seguir ilustra como controlar a simultaneidade em um editor:

go

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	NumGoroutines: 1,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

java

// create a publisher with a single threaded executor
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
Publisher publisher =
    Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

# When you publish a message, the client returns a Future. This Future
# can be used to track when the message is published.
futures = []

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    message_future = publisher.publish(topic_path, data=data)
    futures.append(message_future)

print('Published message IDs:')
for future in futures:
    # result() blocks until the message is published.
    print(future.result())

ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…