Publicar mensagens em tópicos

Este documento fornece informações sobre a publicação de mensagens.

Uma aplicação de publicador cria e envia mensagens para um tópico. O Pub/Sub oferece o fornecimento de mensagens, pelo menos, uma vez e a ordenação de melhor esforço aos subscritores existentes.

O fluxo geral de uma candidatura de publicador é o seguinte:

  1. Crie uma mensagem com os seus dados.
  2. Enviar um pedido ao servidor Pub/Sub para publicar a mensagem no tópico especificado.

Antes de começar

Antes de configurar o fluxo de trabalho de publicação, certifique-se de que concluiu as seguintes tarefas:

Funções necessárias

Para receber as autorizações de que precisa para publicar mensagens num tópico, peça ao seu administrador para lhe conceder a função de IAM Publicador do Pub/Sub (roles/pubsub.publisher) no tópico. Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.

Precisa de autorizações adicionais para criar ou atualizar tópicos e subscrições.

Formato da mensagem

Uma mensagem consiste em campos com os dados e os metadados da mensagem. Especifique, pelo menos, uma das seguintes opções na mensagem:

O serviço Pub/Sub adiciona os seguintes campos à mensagem:

  • Um ID da mensagem exclusivo do tópico
  • Uma data/hora de quando o serviço Pub/Sub recebe a mensagem

Para saber mais sobre as mensagens, consulte o artigo Formato de mensagem.

Publique mensagens

Pode publicar mensagens com a Google Cloud consola, a CLI do Google Cloud, a API Pub/Sub e as bibliotecas cliente. As bibliotecas de cliente podem publicar mensagens de forma assíncrona.

Os exemplos seguintes demonstram como publicar uma mensagem num tópico.

Consola

Para publicar uma mensagem, siga estes passos:

  1. Na Google Cloud consola, aceda à página Tópicos do Pub/Sub.

    Aceda à página de tópicos do Pub/Sub

  2. Clique no ID do tópico.

  3. Na página Detalhes do tópico em Mensagens, clique em Publicar mensagem.

  4. No campo Corpo da mensagem, introduza os dados da mensagem.

  5. Clique em Publicar.

gcloud

Para publicar uma mensagem, use o comando gcloud pubsub topics publish:

gcloud pubsub topics publish TOPIC_ID \
  --message=MESSAGE_DATA \
  [--attribute=KEY="VALUE",...]

Substitua o seguinte:

  • TOPIC_ID: o ID do tópico
  • MESSAGE_DATA: uma string com os dados da mensagem
  • KEY: a chave de um atributo de mensagem
  • VALUE: o valor da chave do atributo da mensagem

REST

Para publicar uma mensagem, envie um pedido POST semelhante ao seguinte:

POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish
Content-Type: application/json
Authorization: Bearer $(gcloud auth application-default print-access-token)

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto do projeto com o tópico
  • TOPIC_ID: o ID do tópico

Especifique os seguintes campos no corpo do pedido:

{
  "messages": [
    {
      "attributes": {
        "KEY": "VALUE",
        ...
      },
      "data": "MESSAGE_DATA",
    }
  ]
}

Substitua o seguinte:

  • KEY: a chave de um atributo de mensagem
  • VALUE: o valor da chave do atributo da mensagem
  • MESSAGE_DATA: uma string codificada em base64 com os dados da mensagem

A mensagem tem de conter um campo de dados não vazio ou, pelo menos, um atributo.

Se o pedido for bem-sucedido, a resposta é um objeto JSON com o ID da mensagem. O exemplo seguinte é uma resposta com um ID de mensagem:

{
  "messageIds": [
    "19916711285",
  ]
}

C++

Antes de experimentar este exemplo, siga as instruções de configuração do C++ no artigo Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API C++ do Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  auto message_id = publisher.Publish(
      pubsub::MessageBuilder{}.SetData("Hello World!").Build());
  auto done = message_id.then([](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
    std::cout << "Hello World! published with id=" << *id << "\n";
  });
  // Block until the message is published
  done.get();
}

C#

Antes de experimentar este exemplo, siga as instruções de configuração do C# em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API C# do Pub/Sub.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishMessagesAsyncSample
{
    public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        // PublisherClient instance should be shutdown after use.
        // The TimeSpan specifies for how long to attempt to publish locally queued messages.
        await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));
        return publishedMessageCount;
    }
}

Ir

O exemplo seguinte usa a versão principal da biblioteca de cliente Go Pub/Sub (v2). Se ainda estiver a usar a biblioteca v1, consulte o guia de migração para a v2. Para ver uma lista de exemplos de código da v1, consulte os exemplos de código descontinuados.

Antes de experimentar este exemplo, siga as instruções de configuração do Go em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do Pub/Sub.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub/v2"
)

func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	var wg sync.WaitGroup
	var totalErrors uint64

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	// Reuse this publisher for all publish calls to send messages in batches.
	publisher := client.Publisher(topicID)

	for i := 0; i < n; i++ {
		result := publisher.Publish(ctx, &pubsub.Message{
			Data: []byte("Message " + strconv.Itoa(i)),
		})

		wg.Add(1)
		go func(i int, res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			id, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
			fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
	}
	return nil
}

Java

Antes de experimentar este exemplo, siga as instruções de configuração do Java no artigo Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API Java do Pub/Sub.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PublishWithErrorHandlerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithErrorHandlerExample(projectId, topicId);
  }

  public static void publishWithErrorHandlerExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      List<String> messages = Arrays.asList("first message", "second message");

      for (final String message : messages) {
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle success / failure
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // details on the API exception
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + message);
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic)
                System.out.println("Published message ID: " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Antes de experimentar este exemplo, siga as instruções de configuração do Node.js em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Node.js do Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  try {
    const messageId = await topic.publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(`Received error while publishing: ${error.message}`);
    process.exitCode = 1;
  }
}

Node.js

Antes de experimentar este exemplo, siga as instruções de configuração do Node.js em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Node.js do Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId: string, data: string) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  try {
    const messageId = await topic.publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(
      `Received error while publishing: ${(error as Error).message}`,
    );
    process.exitCode = 1;
  }
}

PHP

Antes de experimentar este exemplo, siga as instruções de configuração do PHP no artigo Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API PHP Pub/Sub.

use Google\Cloud\PubSub\MessageBuilder;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $topic->publish((new MessageBuilder)->setData($message)->build());

    print('Message published' . PHP_EOL);
}

Python

Antes de experimentar este exemplo, siga as instruções de configuração do Python em Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API Python Pub/Sub.

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback

for i in range(10):
    data = str(i)
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    # Non-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")

Ruby

O exemplo seguinte usa a biblioteca cliente Ruby Pub/Sub v3. Se ainda estiver a usar a biblioteca v2, consulte o guia de migração para a v3. Para ver uma lista de exemplos de código do Ruby v2, consulte os exemplos de código descontinuados.

Antes de experimentar este exemplo, siga as instruções de configuração do Ruby em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Ruby Pub/Sub.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id

begin
  publisher.publish_async "This is a test message." do |result|
    raise "Failed to publish the message." unless result.succeeded?
    puts "Message published asynchronously."
  end
  # Stop the async_publisher to send all queued messages immediately.
  publisher.async_publisher.stop.wait!
rescue StandardError => e
  puts "Received error while publishing: #{e.message}"
end

Depois de publicar uma mensagem, o serviço Pub/Sub devolve o ID da mensagem ao publicador.

Use atributos para publicar uma mensagem

Pode incorporar atributos personalizados como metadados em mensagens do Pub/Sub. Os atributos são usados para fornecer informações adicionais sobre a mensagem, como a respetiva prioridade, origem ou destino. Também é possível usar atributos para filtrar mensagens na subscrição.

Siga estas diretrizes para usar atributos nas suas mensagens:

  • Os atributos podem ser strings de texto ou strings de bytes.

  • Pode ter, no máximo, 100 atributos por mensagem.

  • As chaves de atributos não podem começar com goog e não podem exceder 256 bytes.

  • Os valores dos atributos não podem exceder 1024 bytes.

O esquema de mensagens pode ser representado da seguinte forma:

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string,
  "orderingKey": string
}

No caso de duplicados do lado da publicação, é possível ver valores publishTime diferentes para a mesma mensagem original do lado do cliente, mesmo com o mesmo messageId.

O esquema JSON PubsubMessage é publicado como parte da documentação REST e RPC. Pode usar atributos personalizados para as datas/horas dos eventos.

Os exemplos seguintes demonstram como publicar uma mensagem com atributos num tópico.

Consola

Para publicar uma mensagem com atributos, siga estes passos:

  1. Na Google Cloud consola, aceda à página Tópicos.

    Aceda à página de tópicos do Pub/Sub

  2. Clique no tópico para o qual quer publicar mensagens.

  3. Na página de detalhes do tópico, clique em Mensagens.

  4. Clique em Publicar mensagem.

  5. No campo Corpo da mensagem, introduza os dados da mensagem.

  6. Em Atributos de mensagens, clique em Adicionar um atributo.

  7. Introduza um par de chave-valor.

  8. Adicione mais atributos, se necessário.

  9. Clique em Publicar.

gcloud

gcloud pubsub topics publish my-topic --message="hello" \
  --attribute="origin=gcloud-sample,username=gcp,eventTime='2021-01-01T12:00:00Z'"

C++

Antes de experimentar este exemplo, siga as instruções de configuração do C++ no artigo Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API C++ do Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  std::vector<future<void>> done;
  for (int i = 0; i != 10; ++i) {
    auto message_id = publisher.Publish(
        pubsub::MessageBuilder{}
            .SetData("Hello World! [" + std::to_string(i) + "]")
            .SetAttribute("origin", "cpp-sample")
            .SetAttribute("username", "gcp")
            .Build());
    done.push_back(message_id.then([i](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Message " << i << " published with id=" << *id << "\n";
    }));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Antes de experimentar este exemplo, siga as instruções de configuração do C# em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API C# do Pub/Sub.


using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Threading.Tasks;

public class PublishMessageWithCustomAttributesAsyncSample
{
    public async Task PublishMessageWithCustomAttributesAsync(string projectId, string topicId, string messageText)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        var pubsubMessage = new PubsubMessage
        {
            // The data is any arbitrary ByteString. Here, we're using text.
            Data = ByteString.CopyFromUtf8(messageText),
            // The attributes provide metadata in a string-to-string dictionary.
            Attributes =
            {
                { "year", "2020" },
                { "author", "unknown" }
            }
        };
        string message = await publisher.PublishAsync(pubsubMessage);
        Console.WriteLine($"Published message {message}");
        // PublisherClient instance should be shutdown after use.
        // The TimeSpan specifies for how long to attempt to publish locally queued messages.
        await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));
    }
}

Ir

O exemplo seguinte usa a versão principal da biblioteca de cliente Go Pub/Sub (v2). Se ainda estiver a usar a biblioteca v1, consulte o guia de migração para a v2. Para ver uma lista de exemplos de código da v1, consulte os exemplos de código descontinuados.

Antes de experimentar este exemplo, siga as instruções de configuração do Go em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func publishCustomAttributes(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	// Reuse this publisher for all publish calls to send messages in batches.
	publisher := client.Publisher(topicID)
	result := publisher.Publish(ctx, &pubsub.Message{
		Data: []byte("Hello world!"),
		Attributes: map[string]string{
			"origin":   "golang",
			"username": "gcp",
		},
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id)
	return nil
}

Java

Antes de experimentar este exemplo, siga as instruções de configuração do Java no artigo Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API Java do Pub/Sub.


import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithCustomAttributesExample(projectId, topicId);
  }

  public static void publishWithCustomAttributesExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "first message";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder()
              .setData(data)
              .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
              .build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a message with custom attributes: " + messageId);

    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Antes de experimentar este exemplo, siga as instruções de configuração do Node.js em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Node.js do Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessageWithCustomAttributes(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Add two custom attributes, origin and username, to the message
  const customAttributes = {
    origin: 'nodejs-sample',
    username: 'gcp',
  };

  // Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId);

  const messageId = await topic.publishMessage({
    data: dataBuffer,
    attributes: customAttributes,
  });
  console.log(`Message ${messageId} published.`);
}

Python

Antes de experimentar este exemplo, siga as instruções de configuração do Python em Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API Python Pub/Sub.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # Add two attributes, origin and username, to the message
    future = publisher.publish(
        topic_path, data, origin="python-sample", username="gcp"
    )
    print(future.result())

print(f"Published messages with custom attributes to {topic_path}.")

Ruby

O exemplo seguinte usa a biblioteca cliente Ruby Pub/Sub v3. Se ainda estiver a usar a biblioteca v2, consulte o guia de migração para a v3. Para ver uma lista de exemplos de código do Ruby v2, consulte os exemplos de código descontinuados.

Antes de experimentar este exemplo, siga as instruções de configuração do Ruby em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Ruby Pub/Sub.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id

# Add two attributes, origin and username, to the message
publisher.publish_async "This is a test message.",
                        origin:   "ruby-sample",
                        username: "gcp" do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message with custom attributes published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!

Use as teclas de ordenação para publicar uma mensagem

Para receber mensagens por ordem nos clientes subscritores, tem de configurar os clientes publicadores para publicarem mensagens com chaves de ordenação.

Para compreender o conceito de ordenação de chaves, consulte o artigo Ordene mensagens.

Segue-se uma lista de considerações importantes para a ordenação de mensagens para clientes de publicadores:

  • Ordenação num único cliente publicador: quando um único cliente publicador publica mensagens com a mesma chave de ordenação na mesma região, o cliente subscritor recebe essas mensagens na ordem exata em que foram publicadas. Por exemplo, se um cliente publicador publicar as mensagens 1, 2 e 3 com a chave de ordenação A, o cliente subscritor recebe-as pela ordem 1, 2 e 3.

  • Ordenação em vários clientes publicadores: a ordem das mensagens recebidas pelos clientes subscritores é consistente com a ordem em que foram publicadas na mesma região, mesmo quando vários clientes publicadores usam a mesma chave de ordenação. No entanto, os próprios clientes publicadores não têm conhecimento desta encomenda.

    Por exemplo, se os clientes publicadores X e Y publicarem mensagens com a chave de ordenação A, e a mensagem de X for recebida pelo Pub/Sub antes da mensagem de Y, todos os clientes subscritores recebem a mensagem de X antes da mensagem de Y. Se for necessária uma ordem rigorosa das mensagens em diferentes clientes publicadores, esses clientes têm de implementar um mecanismo de coordenação adicional para garantir que não publicam mensagens com a mesma chave de ordenação em simultâneo. Por exemplo, pode usar um serviço de bloqueio para manter a propriedade de uma chave de ordenação durante a publicação.

  • Encomendar em várias regiões: a garantia de entrega ordenada só se aplica quando as publicações de uma chave de ordenação estão na mesma região. Se a sua aplicação de publicador publicar mensagens com a mesma chave de ordenação em diferentes regiões, não é possível aplicar a ordem nessas publicações. Os subscritores podem estabelecer ligação a qualquer região, e a garantia de ordenação é mantida.

    Quando executa a sua aplicação no Google Cloud, por predefinição, esta liga-se ao ponto final do Pub/Sub na mesma região. Por conseguinte, executar a sua aplicação numa única região dentro de Google Cloud geralmente garante que está a interagir com uma única região.

    Quando executa a sua aplicação de publicador fora deGoogle Cloud ou em várias regiões, pode garantir que está a estabelecer ligação a uma única região através de um ponto final de localização quando configura o seu cliente do Pub/Sub. Todos os pontos finais de localização do Pub/Sub apontam para regiões únicas. Para saber mais acerca dos pontos finais de localização, consulte os pontos finais do Pub/Sub. Para ver uma lista de todos os pontos finais de localização do Pub/Sub, consulte o artigo Lista de pontos finais de localização.

  • Falhas na publicação: quando a publicação com uma chave de ordenação falha, as mensagens em fila com a mesma chave de ordenação na publicação falham, incluindo pedidos de publicação futuros desta chave de ordenação. Tem de retomar a publicação com chaves de ordenação quando ocorrem essas falhas. Para ver um exemplo de como retomar a operação de publicação, consulte Volte a tentar pedidos com chaves de ordenação.

Pode publicar mensagens com chaves de ordenação através da Google Cloud consola, da CLI do Google Cloud, da API Pub/Sub ou das bibliotecas cliente.

Consola

Para publicar uma mensagem com atributos, siga estes passos:

  1. Na Google Cloud consola, aceda à página Tópicos.

    Aceda à página de tópicos do Pub/Sub

  2. Clique no tópico para o qual quer publicar mensagens.

  3. Na página de detalhes do tópico, clique em Mensagens.

  4. Clique em Publicar mensagem.

  5. No campo Corpo da mensagem, introduza os dados da mensagem.

  6. No campo Ordem das mensagens, introduza uma chave de ordenação.

  7. Clique em Publicar.

gcloud

Para publicar uma mensagem com uma chave de ordenação, use o comando gcloud pubsub topics publish e o sinalizador --ordering-key:

gcloud pubsub topics publish TOPIC_ID \
  --message=MESSAGE_DATA \
  --ordering-key=ORDERING_KEY

Substitua o seguinte:

  • TOPIC_ID: o ID do tópico
  • MESSAGE_DATA: uma string com os dados da mensagem
  • ORDERING_KEY: uma string com uma chave de ordenação

REST

Para publicar uma mensagem com uma chave de ordenação, envie um pedido POST como o seguinte:

POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish
Content-Type: application/json
Authorization: Bearer $(gcloud auth application-default print-access-token)

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto do projeto com o tópico
  • TOPIC_ID: o ID do tópico

Especifique os seguintes campos no corpo do pedido:

{
  "messages": [
    {
      "attributes": {
        "KEY": "VALUE",
        ...
      },
      "data": "MESSAGE_DATA",
      "ordering_key": "ORDERING_KEY",
    }
  ]
}

Substitua o seguinte:

  • KEY: a chave de um atributo de mensagem
  • VALUE: o valor da chave do atributo da mensagem
  • MESSAGE_DATA: uma string codificada em base64 com os dados da mensagem
  • ORDERING_KEY: uma string com uma chave de ordenação

A mensagem tem de conter um campo de dados não vazio ou, pelo menos, um atributo.

Se o pedido for bem-sucedido, a resposta é um objeto JSON com o ID da mensagem. O exemplo seguinte é uma resposta com um ID de mensagem:

{
  "messageIds": [
    "19916711285",
  ]
}

C++

Antes de experimentar este exemplo, siga as instruções de configuração do C++ no artigo Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API C++ do Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  struct SampleData {
    std::string ordering_key;
    std::string data;
  } data[] = {
      {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
      {"key1", "message4"}, {"key1", "message5"},
  };
  std::vector<future<void>> done;
  for (auto& datum : data) {
    auto message_id =
        publisher.Publish(pubsub::MessageBuilder{}
                              .SetData("Hello World! [" + datum.data + "]")
                              .SetOrderingKey(datum.ordering_key)
                              .Build());
    std::string ack_id = datum.ordering_key + "#" + datum.data;
    done.push_back(message_id.then([ack_id](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Message " << ack_id << " published with id=" << *id
                << "\n";
    }));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Antes de experimentar este exemplo, siga as instruções de configuração do C# em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API C# do Pub/Sub.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishOrderedMessagesAsyncSample
{
    public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        var customSettings = new PublisherClient.Settings
        {
            EnableMessageOrdering = true
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            // Sending messages to the same region ensures they are received in order even when multiple publishers are used.
            Endpoint = "us-east1-pubsub.googleapis.com:443",
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = keysAndMessages.Select(async keyAndMessage =>
        {
            try
            {
                string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        // PublisherClient instance should be shutdown after use.
        // The TimeSpan specifies for how long to attempt to publish locally queued messages.
        await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));
        return publishedMessageCount;
    }
}

Ir

O exemplo seguinte usa a versão principal da biblioteca de cliente Go Pub/Sub (v2). Se ainda estiver a usar a biblioteca v1, consulte o guia de migração para a v2. Para ver uma lista de exemplos de código da v1, consulte os exemplos de código descontinuados.

Antes de experimentar este exemplo, siga as instruções de configuração do Go em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do Pub/Sub.

import (
	"context"
	"fmt"
	"io"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub/v2"
	"google.golang.org/api/option"
)

func publishWithOrderingKey(w io.Writer, projectID, topicID string) {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering key are in the same region.
	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
	client, err := pubsub.NewClient(ctx, projectID,
		option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
	if err != nil {
		fmt.Fprintf(w, "pubsub.NewClient: %v", err)
		return
	}
	defer client.Close()

	var wg sync.WaitGroup
	var totalErrors uint64

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	// Reuse this publisher for all publish calls to send messages in batches.
	publisher := client.Publisher(topicID)
	publisher.EnableMessageOrdering = true

	messages := []struct {
		message     string
		orderingKey string
	}{
		{
			message:     "message1",
			orderingKey: "key1",
		},
		{
			message:     "message2",
			orderingKey: "key2",
		},
		{
			message:     "message3",
			orderingKey: "key1",
		},
		{
			message:     "message4",
			orderingKey: "key2",
		},
	}

	for _, m := range messages {
		result := publisher.Publish(ctx, &pubsub.Message{
			Data:        []byte(m.message),
			OrderingKey: m.orderingKey,
		})

		wg.Add(1)
		go func(res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			_, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Printf("Failed to publish: %s\n", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
		}(result)
	}

	wg.Wait()

	if totalErrors > 0 {
		fmt.Fprintf(w, "%d of 4 messages did not publish successfully", totalErrors)
		return
	}

	fmt.Fprint(w, "Published 4 messages with ordering keys successfully\n")
}

Java

Antes de experimentar este exemplo, siga as instruções de configuração do Java no artigo Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API Java do Pub/Sub.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class PublishWithOrderingKeys {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    publishWithOrderingKeysExample(projectId, topicId);
  }

  public static void publishWithOrderingKeysExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    // Create a publisher and set message ordering to true.
    Publisher publisher =
        Publisher.newBuilder(topicName)
            // Sending messages to the same region ensures they are received in order
            // even when multiple publishers are used.
            .setEndpoint("us-east1-pubsub.googleapis.com:443")
            .setEnableMessageOrdering(true)
            .build();

    try {
      Map<String, String> messages = new LinkedHashMap<String, String>();
      messages.put("message1", "key1");
      messages.put("message2", "key2");
      messages.put("message3", "key1");
      messages.put("message4", "key2");

      for (Map.Entry<String, String> entry : messages.entrySet()) {
        ByteString data = ByteString.copyFromUtf8(entry.getKey());
        PubsubMessage pubsubMessage =
            PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle publish success / failure.
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // Details on the API exception.
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + pubsubMessage.getData());
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic).
                System.out.println(pubsubMessage.getData() + " : " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      // When finished with the publisher, shutdown to free up resources.
      publisher.shutdown();
      publisher.awaitTermination(1, TimeUnit.MINUTES);
    }
  }
}

Node.js

Antes de experimentar este exemplo, siga as instruções de configuração do Node.js em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Node.js do Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub({
  // Sending messages to the same region ensures they are received in order
  // even when multiple publishers are used.
  apiEndpoint: 'us-east1-pubsub.googleapis.com:443',
});

async function publishOrderedMessage(topicNameOrId, data, orderingKey) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Be sure to set an ordering key that matches other messages
  // you want to receive in order, relative to each other.
  const message = {
    data: dataBuffer,
    orderingKey: orderingKey,
  };

  // Cache topic objects (publishers) and reuse them.
  //
  // Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
  // key are in the same region. For list of locational endpoints for Pub/Sub, see:
  // https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
  const publishOptions = {
    messageOrdering: true,
  };
  const topic = pubSubClient.topic(topicNameOrId, publishOptions);

  // Publishes the message
  const messageId = await topic.publishMessage(message);

  console.log(`Message ${messageId} published.`);

  return messageId;
}

Python

Antes de experimentar este exemplo, siga as instruções de configuração do Python em Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API Python Pub/Sub.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]:
    # Data must be a bytestring
    data = message[0].encode("utf-8")
    ordering_key = message[1]
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    print(future.result())

print(f"Published messages with ordering keys to {topic_path}.")

Ruby

O exemplo seguinte usa a biblioteca cliente Ruby Pub/Sub v3. Se ainda estiver a usar a biblioteca v2, consulte o guia de migração para a v3. Para ver uma lista de exemplos de código do Ruby v2, consulte os exemplos de código descontinuados.

Antes de experimentar este exemplo, siga as instruções de configuração do Ruby em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Ruby Pub/Sub.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
publisher = pubsub.publisher topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}

publisher.enable_message_ordering!
10.times do |i|
  publisher.publish_async "This is message ##{i}.",
                          ordering_key: "ordering-key"
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop!
puts "Messages published with ordering key."

Monitorize um publicador

O Cloud Monitoring oferece várias métricas para monitorizar tópicos.

Para monitorizar um tópico e manter um publicador em boas condições, consulte o artigo Mantenha um publicador em boas condições.

O que se segue?