Publica mensajes en temas

En este documento, se proporciona información sobre la publicación de mensajes.

Una aplicación de publicador crea y envía mensajes a un tema. Pub/Sub ofrece entrega de mensajes al menos una vez y orden de mejor esfuerzo a los suscriptores existentes.

El flujo general de una aplicación de publicador es el siguiente:

  1. Creación de un mensaje que contenga tus datos
  2. Envía una solicitud al servidor de Pub/Sub para publicar el mensaje en el tema especificado.

Antes de comenzar

Antes de configurar el flujo de trabajo de publicación, asegúrate de haber completado las siguientes tareas:

Funciones obligatorias

Si quieres obtener los permisos que necesitas para publicar mensajes en un tema, pídele a tu administrador que te otorgue el rol de IAM Publicador de Pub/Sub (roles/pubsub.publisher) en el tema. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso.

Es posible que también puedas obtener los permisos necesarios a través de los roles personalizados o de otros roles predefinidos.

Necesitas permisos adicionales para crear o actualizar temas y suscripciones.

Formato del mensaje

Un mensaje consiste en campos con los datos y los metadatos del mensaje. Especifica al menos uno de los siguientes elementos en el mensaje:

El servicio Pub/Sub agrega los siguientes campos al mensaje:

  • Un ID de mensaje único para el tema
  • Una marca de tiempo del momento en que el servicio de Pub/Sub recibe el mensaje

Para obtener más información sobre los mensajes, consulta Formato de los mensajes.

Publica mensajes

Puedes publicar mensajes con la consola de Google Cloud, Google Cloud CLI, la API de Pub/Sub y las bibliotecas cliente. Las bibliotecas cliente pueden publicar mensajes de manera asíncrona.

En los siguientes ejemplos, se muestra cómo publicar un mensaje en un tema.

Console

Para publicar un mensaje, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Temas de Pub/Sub.

    Ir a la página Temas de Pub/Sub

  2. Haz clic en el ID del tema.

  3. En la página Detalles del tema en Mensajes, haz clic en Publicar mensaje.

  4. En el campo Cuerpo del mensaje, ingresa los datos del mensaje.

  5. Haz clic en Publish (Publicar).

gcloud

Para publicar un mensaje, usa el comando gcloud pubsub topics publish:

gcloud pubsub topics publish TOPIC_ID \
  --message=MESSAGE_DATA \
  [--attribute=KEY="VALUE",...]

Reemplaza lo siguiente:

  • TOPIC_ID: El ID del tema
  • MESSAGE_DATA: Una string con los datos del mensaje
  • KEY: La clave de un atributo de mensaje
  • VALUE: El valor de la clave del atributo del mensaje

REST

Para publicar un mensaje, envía una solicitud POST como la siguiente:

POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish
Content-Type: application/json
Authorization: Bearer $(gcloud auth application-default print-access-token)

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto con el tema
  • TOPIC_ID: El ID del tema

Especifica los siguientes campos en el cuerpo de la solicitud:

{
  "messages": [
    {
      "attributes": {
        "KEY": "VALUE",
        ...
      },
      "data": "MESSAGE_DATA",
    }
  ]
}

Reemplaza lo siguiente:

  • KEY: La clave de un atributo de mensaje
  • VALUE: El valor de la clave del atributo del mensaje
  • MESSAGE_DATA: Una string codificada en base64 con los datos del mensaje

El mensaje debe contener un campo de datos no vacío o al menos un atributo.

Si la solicitud se realiza correctamente, la respuesta es un objeto JSON con el ID de mensaje. El siguiente ejemplo es una respuesta con un ID de mensaje:

{
  "messageIds": [
    "19916711285",
  ]
}

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  auto message_id = publisher.Publish(
      pubsub::MessageBuilder{}.SetData("Hello World!").Build());
  auto done = message_id.then([](future<StatusOr<std::string>> f) {
    auto id = f.get();
    if (!id) throw std::move(id).status();
    std::cout << "Hello World! published with id=" << *id << "\n";
  });
  // Block until the message is published
  done.get();
}

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishMessagesAsyncSample
{
    public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
)

func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	var wg sync.WaitGroup
	var totalErrors uint64
	t := client.Topic(topicID)

	for i := 0; i < n; i++ {
		result := t.Publish(ctx, &pubsub.Message{
			Data: []byte("Message " + strconv.Itoa(i)),
		})

		wg.Add(1)
		go func(i int, res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			id, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
			fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
	}
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class PublishWithErrorHandlerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithErrorHandlerExample(projectId, topicId);
  }

  public static void publishWithErrorHandlerExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      List<String> messages = Arrays.asList("first message", "second message");

      for (final String message : messages) {
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle success / failure
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // details on the API exception
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + message);
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic)
                System.out.println("Published message ID: " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  try {
    const messageId = await pubSubClient
      .topic(topicNameOrId)
      .publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(`Received error while publishing: ${error.message}`);
    process.exitCode = 1;
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId: string, data: string) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  try {
    const messageId = await pubSubClient
      .topic(topicNameOrId)
      .publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(
      `Received error while publishing: ${(error as Error).message}`
    );
    process.exitCode = 1;
  }
}

PHP

Antes de probar esta muestra, sigue las instrucciones de configuración de PHP en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

use Google\Cloud\PubSub\MessageBuilder;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $topic->publish((new MessageBuilder)->setData($message)->build());

    print('Message published' . PHP_EOL);
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback

for i in range(10):
    data = str(i)
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    # Non-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")

Rita

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id

begin
  topic.publish_async "This is a test message." do |result|
    raise "Failed to publish the message." unless result.succeeded?
    puts "Message published asynchronously."
  end

  # Stop the async_publisher to send all queued messages immediately.
  topic.async_publisher.stop.wait!
rescue StandardError => e
  puts "Received error while publishing: #{e.message}"
end

Después de que publicas un mensaje, el servicio de Pub/Sub le muestra el ID del mensaje al publicador.

Usa atributos para publicar un mensaje

Puedes incorporar atributos personalizados como metadatos en mensajes de Pub/Sub. Los atributos se usan para proporcionar información adicional sobre el mensaje, como su prioridad, origen o destino. Los atributos también se pueden usar para filtrar mensajes en la suscripción.

Sigue estos lineamientos para usar atributos en tus mensajes:

  • Los atributos pueden ser cadenas de texto o cadenas de bytes.

  • Puedes tener un máximo de 100 atributos por mensaje.

  • Las claves de atributo no deben comenzar con goog y no deben exceder los 256 bytes.

  • Los valores del atributo no deben superar los 1,024 bytes.

El esquema del mensaje se puede representar de la siguiente manera:

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string,
  "orderingKey": string
}

En el caso de los duplicados del lado de la publicación, es posible ver diferentes valores de publishTime para el mismo mensaje original del cliente, incluso con el mismo messageId.

El esquema PubsubMessage de JSON se publica como parte de la documentación de REST y RPC. Puedes usar atributos personalizados para las marcas de tiempo de eventos.

En los siguientes ejemplos, se muestra cómo publicar un mensaje con atributos en un tema.

Console

Para publicar un mensaje con atributos, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Temas.

    Ir a la página Temas de Pub/Sub

  2. Haz clic en el tema en el que deseas publicar mensajes.

  3. En la página de detalles del tema, haz clic en Mensajes.

  4. Haz clic en Publish message.

  5. En el campo Cuerpo del mensaje, ingresa los datos del mensaje.

  6. En Atributos del mensaje, haz clic en Agregar un atributo.

  7. Ingresa un par clave-valor.

  8. Si es necesario, agrega más atributos.

  9. Haz clic en Publish (Publicar).

gcloud

gcloud pubsub topics publish my-topic --message="hello" \
  --attribute="origin=gcloud-sample,username=gcp,eventTime='2021-01-01T12:00:00Z'"

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  std::vector<future<void>> done;
  for (int i = 0; i != 10; ++i) {
    auto message_id = publisher.Publish(
        pubsub::MessageBuilder{}
            .SetData("Hello World! [" + std::to_string(i) + "]")
            .SetAttribute("origin", "cpp-sample")
            .SetAttribute("username", "gcp")
            .Build());
    done.push_back(message_id.then([i](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Message " << i << " published with id=" << *id << "\n";
    }));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.


using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Threading.Tasks;

public class PublishMessageWithCustomAttributesAsyncSample
{
    public async Task PublishMessageWithCustomAttributesAsync(string projectId, string topicId, string messageText)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        var pubsubMessage = new PubsubMessage
        {
            // The data is any arbitrary ByteString. Here, we're using text.
            Data = ByteString.CopyFromUtf8(messageText),
            // The attributes provide metadata in a string-to-string dictionary.
            Attributes =
            {
                { "year", "2020" },
                { "author", "unknown" }
            }
        };
        string message = await publisher.PublishAsync(pubsubMessage);
        Console.WriteLine($"Published message {message}");
    }
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishCustomAttributes(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Hello world!"),
		Attributes: map[string]string{
			"origin":   "golang",
			"username": "gcp",
		},
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id)
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.


import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithCustomAttributesExample(projectId, topicId);
  }

  public static void publishWithCustomAttributesExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "first message";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage =
          PubsubMessage.newBuilder()
              .setData(data)
              .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
              .build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published a message with custom attributes: " + messageId);

    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessageWithCustomAttributes(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Add two custom attributes, origin and username, to the message
  const customAttributes = {
    origin: 'nodejs-sample',
    username: 'gcp',
  };

  const messageId = await pubSubClient
    .topic(topicNameOrId)
    .publishMessage({data: dataBuffer, attributes: customAttributes});
  console.log(`Message ${messageId} published.`);
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # Add two attributes, origin and username, to the message
    future = publisher.publish(
        topic_path, data, origin="python-sample", username="gcp"
    )
    print(future.result())

print(f"Published messages with custom attributes to {topic_path}.")

Rita

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
                    origin:   "ruby-sample",
                    username: "gcp" do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message with custom attributes published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Cómo usar claves de ordenamiento para publicar un mensaje

Si deseas recibir mensajes en orden en tus clientes suscriptores, debes configurar tus clientes publicadores para que publiquen mensajes con claves de ordenamiento.

Para comprender el concepto de claves de ordenamiento, consulta Mensajes de pedidos.

A continuación, se incluye una lista de consideraciones clave para los mensajes ordenados para clientes publicadores:

  • Realizar pedidos en un solo cliente publicador: Cuando un solo cliente publicador publica mensajes con la misma clave de ordenamiento en la misma región, el cliente suscriptor recibe esos mensajes en el orden exacto en el que se publicaron. Por ejemplo, si un cliente publicador publica los mensajes 1, 2 y 3 con la clave de ordenamiento A, el cliente suscriptor los recibe en el pedido 1, 2 y 3.

  • Realizar pedidos en varios clientes publicadores: El orden de los mensajes que reciben los clientes suscriptores es coherente con el orden en que se publicaron en la misma región, incluso cuando varios clientes publicadores usan la misma clave de ordenamiento. Sin embargo, los clientes publicadores no tienen conocimiento de este pedido.

    Por ejemplo, si los clientes del publicador X e Y publican mensajes con la clave de orden A y Pub/Sub recibe el mensaje de X antes que el de Y, todos los clientes suscriptores reciben el mensaje de X antes que el de Y. Si se requiere un orden estricto de los mensajes entre diferentes clientes publicadores, esos clientes deben implementar un mecanismo de coordinación adicional para asegurarse de no publicar mensajes con la misma clave de ordenamiento de manera simultánea. Por ejemplo, se puede usar un servicio de bloqueo para mantener la propiedad de una clave de ordenamiento durante la publicación.

  • Pedidos entre regiones: El ordenamiento de los mensajes solo se espera para los mensajes publicados en la misma región. Por lo tanto, asegúrate de que tus clientes publicadores usen los extremos del servicio de ubicación para publicar mensajes en la misma región para la misma clave de ordenamiento. Luego, los clientes suscriptores pueden recibir estos mensajes en orden.

  • Fallas de publicación: Cuando falla la publicación con una clave de ordenamiento, fallan los mensajes en cola de la misma clave de ordenamiento del publicador, incluidas las solicitudes de publicación futuras de esta clave de ordenamiento. Deberás reanudar la publicación con claves de orden cuando se produzcan tales fallas. Para ver un ejemplo de cómo reanudar la operación de publicación, consulta Cómo reintentar las solicitudes con claves de ordenamiento.

Puedes publicar mensajes con claves de ordenamiento mediante la consola de Google Cloud, Google Cloud CLI, la API de Pub/Sub o las bibliotecas cliente.

Console

Para publicar un mensaje con atributos, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Temas.

    Ir a la página Temas de Pub/Sub

  2. Haz clic en el tema en el que deseas publicar mensajes.

  3. En la página de detalles del tema, haz clic en Mensajes.

  4. Haz clic en Publish message.

  5. En el campo Cuerpo del mensaje, ingresa los datos del mensaje.

  6. En el campo Ordenamiento de mensajes, ingresa una clave de ordenamiento.

  7. Haz clic en Publish (Publicar).

gcloud

Para publicar un mensaje con una clave de ordenamiento, usa el comando gcloud pubsub topics publish y la marca --ordering-key:

gcloud pubsub topics publish TOPIC_ID \
  --message=MESSAGE_DATA \
  --ordering-key=ORDERING_KEY

Reemplaza lo siguiente:

  • TOPIC_ID: El ID del tema
  • MESSAGE_DATA: Una string con los datos del mensaje
  • ORDERING_KEY: Una string con clave de ordenamiento

REST

Para publicar un mensaje con una clave de ordenamiento, envía una solicitud POST como la siguiente:

POST  https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish
Content-Type: application/json
Authorization: Bearer $(gcloud auth application-default print-access-token)

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto con el tema
  • TOPIC_ID: El ID del tema

Especifica los siguientes campos en el cuerpo de la solicitud:

{
  "messages": [
    {
      "attributes": {
        "KEY": "VALUE",
        ...
      },
      "data": "MESSAGE_DATA",
      "ordering_key": "ORDERING_KEY",
    }
  ]
}

Reemplaza lo siguiente:

  • KEY: La clave de un atributo de mensaje
  • VALUE: El valor de la clave del atributo del mensaje
  • MESSAGE_DATA: Una string codificada en base64 con los datos del mensaje
  • ORDERING_KEY: Una string con clave de ordenamiento

El mensaje debe contener un campo de datos no vacío o al menos un atributo.

Si la solicitud se realiza correctamente, la respuesta es un objeto JSON con el ID de mensaje. El siguiente ejemplo es una respuesta con un ID de mensaje:

{
  "messageIds": [
    "19916711285",
  ]
}

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
  struct SampleData {
    std::string ordering_key;
    std::string data;
  } data[] = {
      {"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
      {"key1", "message4"}, {"key1", "message5"},
  };
  std::vector<future<void>> done;
  for (auto& datum : data) {
    auto message_id =
        publisher.Publish(pubsub::MessageBuilder{}
                              .SetData("Hello World! [" + datum.data + "]")
                              .SetOrderingKey(datum.ordering_key)
                              .Build());
    std::string ack_id = datum.ordering_key + "#" + datum.data;
    done.push_back(message_id.then([ack_id](future<StatusOr<std::string>> f) {
      auto id = f.get();
      if (!id) throw std::move(id).status();
      std::cout << "Message " << ack_id << " published with id=" << *id
                << "\n";
    }));
  }
  publisher.Flush();
  // Block until all the messages are published (optional)
  for (auto& f : done) f.get();
}

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C#.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishOrderedMessagesAsyncSample
{
    public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        var customSettings = new PublisherClient.Settings
        {
            EnableMessageOrdering = true
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            // Sending messages to the same region ensures they are received in order even when multiple publishers are used.
            Endpoint = "us-east1-pubsub.googleapis.com:443",
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = keysAndMessages.Select(async keyAndMessage =>
        {
            try
            {
                string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

func publishWithOrderingKey(w io.Writer, projectID, topicID string) {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()

	// Sending messages to the same region ensures they are received in order
	// even when multiple publishers are used.
	client, err := pubsub.NewClient(ctx, projectID,
		option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
	if err != nil {
		fmt.Fprintf(w, "pubsub.NewClient: %v", err)
		return
	}
	defer client.Close()

	var wg sync.WaitGroup
	var totalErrors uint64
	t := client.Topic(topicID)
	t.EnableMessageOrdering = true

	messages := []struct {
		message     string
		orderingKey string
	}{
		{
			message:     "message1",
			orderingKey: "key1",
		},
		{
			message:     "message2",
			orderingKey: "key2",
		},
		{
			message:     "message3",
			orderingKey: "key1",
		},
		{
			message:     "message4",
			orderingKey: "key2",
		},
	}

	for _, m := range messages {
		res := t.Publish(ctx, &pubsub.Message{
			Data:        []byte(m.message),
			OrderingKey: m.orderingKey,
		})

		wg.Add(1)
		go func(res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			_, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Printf("Failed to publish: %s\n", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
		}(res)
	}

	wg.Wait()

	if totalErrors > 0 {
		fmt.Fprintf(w, "%d of 4 messages did not publish successfully", totalErrors)
		return
	}

	fmt.Fprint(w, "Published 4 messages with ordering keys successfully\n")
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class PublishWithOrderingKeys {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Choose an existing topic.
    String topicId = "your-topic-id";

    publishWithOrderingKeysExample(projectId, topicId);
  }

  public static void publishWithOrderingKeysExample(String projectId, String topicId)
      throws IOException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    // Create a publisher and set message ordering to true.
    Publisher publisher =
        Publisher.newBuilder(topicName)
            // Sending messages to the same region ensures they are received in order
            // even when multiple publishers are used.
            .setEndpoint("us-east1-pubsub.googleapis.com:443")
            .setEnableMessageOrdering(true)
            .build();

    try {
      Map<String, String> messages = new LinkedHashMap<String, String>();
      messages.put("message1", "key1");
      messages.put("message2", "key2");
      messages.put("message3", "key1");
      messages.put("message4", "key2");

      for (Map.Entry<String, String> entry : messages.entrySet()) {
        ByteString data = ByteString.copyFromUtf8(entry.getKey());
        PubsubMessage pubsubMessage =
            PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle publish success / failure.
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // Details on the API exception.
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + pubsubMessage.getData());
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic).
                System.out.println(pubsubMessage.getData() + " : " + messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      // When finished with the publisher, shutdown to free up resources.
      publisher.shutdown();
      publisher.awaitTermination(1, TimeUnit.MINUTES);
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub({
  // Sending messages to the same region ensures they are received in order
  // even when multiple publishers are used.
  apiEndpoint: 'us-east1-pubsub.googleapis.com:443',
});

async function publishOrderedMessage(topicNameOrId, data, orderingKey) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Be sure to set an ordering key that matches other messages
  // you want to receive in order, relative to each other.
  const message = {
    data: dataBuffer,
    orderingKey: orderingKey,
  };

  const publishOptions = {
    messageOrdering: true,
  };

  // Publishes the message
  const messageId = await pubSubClient
    .topic(topicNameOrId, publishOptions)
    .publishMessage(message);

  console.log(`Message ${messageId} published.`);

  return messageId;
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
    publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
    ("message1", "key1"),
    ("message2", "key2"),
    ("message3", "key1"),
    ("message4", "key2"),
]:
    # Data must be a bytestring
    data = message[0].encode("utf-8")
    ordering_key = message[1]
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
    print(future.result())

print(f"Published messages with ordering keys to {topic_path}.")

Rita

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new endpoint: "us-east1-pubsub.googleapis.com:443"

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
  topic.publish_async "This is message ##{i}.",
                      ordering_key: "ordering-key"
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!
puts "Messages published with ordering key."

Cómo supervisar a un publicador

Cloud Monitoring proporciona una serie de métricas para supervisar temas.

Para supervisar un tema y mantener un publicador en buen estado, consulta Mantén un publicador en buen estado.

¿Qué sigue?