Publier des messages

Ce document fournit des informations sur la publication de messages. Pour en savoir plus sur la création, la suppression et l'administration de sujets et d'abonnements, consultez la page Gérer les sujets et les abonnements. Pour en savoir plus sur la réception de messages, consultez le Guide pour les abonnés.

Une application d'éditeur crée et envoie des messages dans un sujet. Cloud Pub/Sub offre aux abonnés existants la distribution de chaque message au moins une fois ainsi que l'ordonnancement des messages, dans la mesure du possible, comme expliqué sur la page Présentation des abonnements.

Le flux général d'une application d'éditeur est organisé comme suit :

  1. Vous créez un message contenant vos données.
  2. Vous envoyez une requête au serveur Cloud Pub/Sub pour publier le message dans le sujet souhaité.

Prérequis

Reportez-vous au guide de démarrage des bibliothèques clientes pour configurer votre environnement dans le langage de programmation de votre choix.

Publier des messages dans un sujet

Lorsque vous utilisez JSON sur REST, les données du message doivent être encodées en base64. La requête entière, qui comporte un ou plusieurs messages, doit avoir une taille inférieure à 10 Mo après décodage. Notez que la charge utile du message ne doit pas être vide. Elle doit contenir un champ de données non vide ou au moins un attribut.

Les bibliothèques clientes, selon votre choix de langage de programmation, peuvent publier des messages en mode synchrone ou asynchrone. La publication asynchrone permet un traitement par lot et un débit supérieur dans votre application.

Toutes les bibliothèques clientes permettent la publication de messages en mode asynchrone. Consultez la documentation de référence de l'API pour le langage de programmation choisi afin de savoir si sa bibliothèque cliente est également compatible avec la publication de messages en mode synchrone, si vous préférez cette méthode.

Un ID généré par le serveur (unique dans le sujet) est affiché lors de la publication réussie d'un message.

Protocole

Requête :

POST     https://pubsub.googleapis.com/v1/projects/myproject/topics/mytopic:publish
{
  "messages": [
    {
      "attributes": {
        "key": "iana.org/language_tag",
        "value": "en"
      },
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ=="
    }
  ]
}
Réponse :
200 OK
{
  "messageIds": [
    "19916711285"
  ]
}

Ligne de commande

gcloud pubsub topics publish my-topic --message "hello"

C#

PublisherClient publisher = await PublisherClient.CreateAsync(
    new TopicName(projectId, topicId));

Go

var wg sync.WaitGroup
var totalErrors uint64
t := client.Topic(topic)

for i := 0; i < n; i++ {
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Message " + strconv.Itoa(i)),
	})

	wg.Add(1)
	go func(i int, res *pubsub.PublishResult) {
		defer wg.Done()
		// The Get method blocks until a server-generated ID or
		// an error is returned for the published message.
		id, err := res.Get(ctx)
		if err != nil {
			// Error handling code can be added here.
			log.Output(1, fmt.Sprintf("Failed to publish: %v", err))
			atomic.AddUint64(&totalErrors, 1)
			return
		}
		fmt.Printf("Published message %d; msg ID: %v\n", i, id)
	}(i, result)
}

wg.Wait()

if totalErrors > 0 {
	return errors.New(
		fmt.Sprintf("%d of %d messages did not publish successfully",
			totalErrors, n))
}
return nil

Java

ProjectTopicName topicName = ProjectTopicName.of("my-project-id", "my-topic-id");
Publisher publisher = null;

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.newBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  for (final String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Add an asynchronous callback to handle success / failure
    ApiFutures.addCallback(
        future,
        new ApiFutureCallback<String>() {

          @Override
          public void onFailure(Throwable throwable) {
            if (throwable instanceof ApiException) {
              ApiException apiException = ((ApiException) throwable);
              // details on the API exception
              System.out.println(apiException.getStatusCode().getCode());
              System.out.println(apiException.isRetryable());
            }
            System.out.println("Error publishing message : " + message);
          }

          @Override
          public void onSuccess(String messageId) {
            // Once published, returns server-assigned message ids (unique within the topic)
            System.out.println(messageId);
          }
        },
        MoreExecutors.directExecutor());
  }
} finally {
  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
  }
}

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubsub.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # When you publish a message, the client returns a Future.
    message_future = publisher.publish(topic_path, data=data)
    message_future.add_done_callback(callback)

print('Published message IDs:')

# We must keep the main thread from exiting to allow it to process
# messages in the background.
while True:
    time.sleep(60)

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Nouveaux attributs personnalisés

Vous pouvez intégrer des attributs personnalisés sous forme de métadonnées dans des messages Pub/Sub. Les attributs peuvent être des chaînes de texte ou des chaînes d'octets. Le schéma du message peut être représenté comme suit :

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string
}

Le schéma JSON PubsubMessage est publié dans la documentation REST et RPC.

Go

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte("Hello world!"),
	Attributes: map[string]string{
		"origin":   "golang",
		"username": "gcp",
	},
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published message with custom attributes; msg ID: %v\n", id)

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Add two custom attributes, origin and username, to the message
const customAttributes = {
  origin: 'nodejs-sample',
  username: 'gcp',
};

const messageId = await pubsub
  .topic(topicName)
  .publish(dataBuffer, customAttributes);
console.log(`Message ${messageId} published.`);

Python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # Add two attributes, origin and username, to the message
    publisher.publish(
        topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
                    origin:   "ruby-sample",
                    username: "gcp" do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message with custom attributes published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Traiter des messages par lot pour équilibrer la latence et le débit

Les bibliothèques clientes Cloud Pub/Sub regroupent plusieurs messages en un seul appel au service. Des tailles de lots plus importantes augmentent le débit des messages (taux de messages envoyés par processeur). Le coût de ce traitement par lot est la latence affectant les messages individuels, qui sont mis en attente dans la mémoire jusqu'à ce que le lot correspondant soit rempli et prêt à être envoyé sur le réseau. Pour minimiser la latence, le traitement par lot doit être désactivé. Cela est particulièrement important pour les applications qui publient un seul message dans le cadre d'une séquence requête-réponse. Un exemple courant de ce modèle est rencontré dans les applications sans serveur et basées sur des événements utilisant Cloud Functions ou App Engine.

Les messages peuvent être mis en lot en fonction de la taille de la requête (en octets), du nombre de messages et de l'heure. Vous pouvez remplacer les paramètres par défaut comme indiqué dans cet exemple :

C#

PublisherClient publisher = await PublisherClient.CreateAsync(
    new TopicName(projectId, topicId),
    settings: new PublisherClient.Settings
    {
        BatchingSettings = new Google.Api.Gax.BatchingSettings(
            elementCountThreshold: 100,
            byteCountThreshold: 10240,
            delayThreshold: TimeSpan.FromSeconds(3))
    });
// PublisherClient collects messages into appropriately sized
// batches.
var publishTasks =
    messageTexts.Select(text => publisher.PublishAsync(text));
foreach (Task<string> task in publishTasks)
{
    string message = await task;
    await Console.Out.WriteLineAsync($"Published message {message}");
}

Go

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	ByteThreshold:  5000,
	CountThreshold: 10,
	DelayThreshold: 100 * time.Millisecond,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1 byte
long messageCountBatchSize = 10L; // default : 1 message

Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

// Publish request get triggered based on request size, messages count & time since last publish
BatchingSettings batchingSettings =
    BatchingSettings.newBuilder()
        .setElementCountThreshold(messageCountBatchSize)
        .setRequestByteThreshold(requestBytesThreshold)
        .setDelayThreshold(publishDelayThreshold)
        .build();

Publisher publisher =
    Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const [messageId] = await pubsub
  .topic(topicName, {
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })
  .publish(dataBuffer);
console.log(`Message ${messageId} published.`);

Python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_bytes=1024,  # One kilobyte
    max_latency=1,  # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    publisher.publish(topic_path, data=data)

print('Published messages.')

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish do |batch|
  10.times do |i|
    batch.publish "This is message \##{i}."
  end
end

puts "Messages published in batch."
# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_name, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
10.times do |i|
  topic.publish_async "This is message \##{i}."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."

Réessayer d'exécuter des requêtes

Lorsqu'une publication échoue, elle est automatiquement relancée, sauf en cas d'erreurs qui ne justifient pas de nouvelles tentatives. Cet exemple de code illustre la création d'un éditeur avec des paramètres de nouvelle tentative personnalisés (notez que toutes les bibliothèques clientes ne sont pas compatibles avec les paramètres de nouvelle tentative. Consultez la documentation de référence sur les API pour le langage que vous avez choisi) :

Java

// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(5); // default: 5 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 2.0
Duration maxRetryDelay = Duration.ofSeconds(600); // default : Long.MAX_VALUE
Duration totalTimeout = Duration.ofSeconds(10); // default: 10 seconds
Duration initialRpcTimeout = Duration.ofSeconds(10); // default: 10 seconds
Duration maxRpcTimeout = Duration.ofSeconds(10); // default: 10 seconds

RetrySettings retrySettings =
    RetrySettings.newBuilder()
        .setInitialRetryDelay(retryDelay)
        .setRetryDelayMultiplier(retryDelayMultiplier)
        .setMaxRetryDelay(maxRetryDelay)
        .setTotalTimeout(totalTimeout)
        .setInitialRpcTimeout(initialRpcTimeout)
        .setMaxRpcTimeout(maxRpcTimeout)
        .build();

Publisher publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

Node.js

// Imports the Google Cloud client library
const {v1} = require('@google-cloud/pubsub');

// Creates a publisher client
const client = new v1.PublisherClient({
  // optional auth parameters
});

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectId = 'my-project-id'
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

const formattedTopic = client.topicPath(projectId, topicName);
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messagesElement = {
  data: dataBuffer,
};
const messages = [messagesElement];
// Build the request
const request = {
  topic: formattedTopic,
  messages: messages,
};

// Retry settings control how the publisher handles retryable failures
// Default values are shown
const retrySettings = {
  retryCodes: [
    10, // 'ABORTED'
    1, // 'CANCELLED',
    4, // 'DEADLINE_EXCEEDED'
    13, // 'INTERNAL'
    8, // 'RESOURCE_EXHAUSTED'
    14, // 'UNAVAILABLE'
    2, // 'UNKNOWN'
  ],
  backoffSettings: {
    initialRetryDelayMillis: 100,
    retryDelayMultiplier: 1.3,
    maxRetryDelayMillis: 60000,
    initialRpcTimeoutMillis: 12000,
    rpcTimeoutMultiplier: 1.0,
    maxRpcTimeoutMillis: 30000,
    totalTimeoutMillis: 600000,
  },
};

const [response] = await client.publish(request, {retry: retrySettings});
console.log(`Message ${response.messageIds} published.`);

Contrôle de simultanéité

La simultanéité n'est pas disponible avec tous les langages de programmation. Reportez-vous à la documentation de référence sur les API pour plus d'informations.

L'exemple suivant montre comment contrôler la simultanéité dans un éditeur :

Go

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	NumGoroutines: 1,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

// create a publisher with a single threaded executor
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
Publisher publisher =
    Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

Python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

# When you publish a message, the client returns a Future. This Future
# can be used to track when the message is published.
futures = []

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    message_future = publisher.publish(topic_path, data=data)
    futures.append(message_future)

print('Published message IDs:')
for future in futures:
    # result() blocks until the message is published.
    print(future.result())

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…