Publier des messages

Ce document fournit des informations sur la publication de messages.

Une application d'éditeur crée et envoie des messages dans un sujet. Pub/Sub offre aux abonnés existants la distribution de chaque message au moins une fois ainsi que l'ordonnancement des messages, dans la mesure du possible, comme expliqué sur la page Présentation des abonnements.

Le flux général d'une application d'éditeur est organisé comme suit :

  1. Vous créez un message contenant vos données.
  2. Vous envoyez une requête au serveur Pub/Sub pour publier le message dans le sujet souhaité.

Prérequis

Reportez-vous au guide de démarrage des bibliothèques clientes pour configurer votre environnement dans le langage de programmation de votre choix.

Publier des messages dans un sujet

Lorsque vous utilisez JSON sur REST, les données du message doivent être encodées en base64. La requête entière, qui comporte un ou plusieurs messages, doit avoir une taille inférieure à 10 Mo après décodage. Notez que la charge utile du message ne doit pas être vide. Elle doit contenir un champ de données non vide ou au moins un attribut.

Les bibliothèques clientes, selon votre choix de langage de programmation, peuvent publier des messages en mode synchrone ou asynchrone. La publication asynchrone permet un traitement par lot et un débit supérieur dans votre application.

Toutes les bibliothèques clientes permettent la publication de messages en mode asynchrone. Consultez la documentation de référence de l'API pour le langage de programmation choisi afin de savoir si sa bibliothèque cliente est également compatible avec la publication de messages en mode synchrone, si vous préférez cette méthode.

Un ID généré par le serveur (unique dans le sujet) est affiché lors de la publication réussie d'un message.

Protocole

Requête :

La demande doit être authentifiée à l'aide d'un jeton d'accès dans l'en-tête Authorization. Pour obtenir un jeton d'accès pour les identifiants par défaut actuels de l'application, exécutez la commande suivante : gcloud auth application-default print-access-token.

POST     https://pubsub.googleapis.com/v1/projects/myproject/topics/mytopic:publish
Authorization: Bearer ACCESS_TOKEN

Spécifiez les champs suivants dans le corps de la requête :

{
  "messages": [
    {
      "attributes": {
        "key": "iana.org/language_tag",
        "value": "en"
      },
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ=="
    }
  ]
}

Réponse :

200 OK
{
  "messageIds": [
    "19916711285"
  ]
}

Ligne de commande

gcloud pubsub topics publish my-topic --message "hello"

C#

PublisherClient publisher = await PublisherClient.CreateAsync(
    new TopicName(projectId, topicId));

go

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
)

func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	var wg sync.WaitGroup
	var totalErrors uint64
	t := client.Topic(topicID)

	for i := 0; i < n; i++ {
		result := t.Publish(ctx, &pubsub.Message{
			Data: []byte("Message " + strconv.Itoa(i)),
		})

		wg.Add(1)
		go func(i int, res *pubsub.PublishResult) {
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			id, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
			fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
	}
	return nil
}

Java

ProjectTopicName topicName = ProjectTopicName.of("my-project-id", "my-topic-id");
Publisher publisher = null;

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.newBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  for (final String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Add an asynchronous callback to handle success / failure
    ApiFutures.addCallback(
        future,
        new ApiFutureCallback<String>() {

          @Override
          public void onFailure(Throwable throwable) {
            if (throwable instanceof ApiException) {
              ApiException apiException = ((ApiException) throwable);
              // details on the API exception
              System.out.println(apiException.getStatusCode().getCode());
              System.out.println(apiException.isRetryable());
            }
            System.out.println("Error publishing message : " + message);
          }

          @Override
          public void onSuccess(String messageId) {
            // Once published, returns server-assigned message ids (unique within the topic)
            System.out.println(messageId);
          }
        },
        MoreExecutors.directExecutor());
  }
} finally {
  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
  }
}

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubsub.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

python

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

futures = dict()

def get_callback(f, data):
    def callback(f):
        try:
            print(f.result())
            futures.pop(data)
        except:  # noqa
            print("Please handle {} for {}.".format(f.exception(), data))

    return callback

for i in range(10):
    data = str(i)
    futures.update({data: None})
    # When you publish a message, the client returns a future.
    future = publisher.publish(
        topic_path, data=data.encode("utf-8")  # data must be a bytestring.
    )
    futures[data] = future
    # Publish failures shall be handled in the callback function.
    future.add_done_callback(get_callback(future, data))

# Wait for all the publish futures to resolve before exiting.
while futures:
    time.sleep(5)

print("Published message with error handler.")

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Nouveaux attributs personnalisés

Vous pouvez intégrer des attributs personnalisés sous forme de métadonnées dans des messages Pub/Sub. Les attributs peuvent être des chaînes de texte ou des chaînes d'octets. Le schéma du message peut être représenté comme suit :

{
  "data": string,
  "attributes": {
    string: string,
    ...
  },
  "messageId": string,
  "publishTime": string
}

Le schéma JSON PubsubMessage est publié dans la documentation REST et RPC.

go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishCustomAttributes(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte("Hello world!"),
		Attributes: map[string]string{
			"origin":   "golang",
			"username": "gcp",
		},
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %v", err)
	}
	fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id)
	return nil
}

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Add two custom attributes, origin and username, to the message
const customAttributes = {
  origin: 'nodejs-sample',
  username: 'gcp',
};

const messageId = await pubsub
  .topic(topicName)
  .publish(dataBuffer, customAttributes);
console.log(`Message ${messageId} published.`);

python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u"Message number {}".format(n)
    # Data must be a bytestring
    data = data.encode("utf-8")
    # Add two attributes, origin and username, to the message
    future = publisher.publish(
        topic_path, data, origin="python-sample", username="gcp"
    )
    print(future.result())

print("Published messages with custom attributes.")

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
                    origin:   "ruby-sample",
                    username: "gcp" do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message with custom attributes published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Traiter des messages par lot pour équilibrer la latence et le débit

Les bibliothèques clientes Pub/Sub regroupent plusieurs messages en un seul appel au service. Des tailles de lots plus importantes augmentent le débit des messages (taux de messages envoyés par processeur). Le coût de ce traitement par lot est la latence affectant les messages individuels, qui sont mis en attente dans la mémoire jusqu'à ce que le lot correspondant soit rempli et prêt à être envoyé sur le réseau. Pour minimiser la latence, le traitement par lot doit être désactivé. Cette opération est particulièrement importante pour les applications qui publient un seul message dans le cadre d'une séquence requête-réponse. Un exemple courant de ce modèle est rencontré dans les applications sans serveur et basées sur des événements utilisant Cloud Functions ou App Engine.

Les messages peuvent être mis en lot en fonction de la taille de la requête (en octets), du nombre de messages et de l'heure. Vous pouvez remplacer les paramètres par défaut comme indiqué dans cet exemple :

C#

PublisherClient publisher = await PublisherClient.CreateAsync(
    new TopicName(projectId, topicId),
    settings: new PublisherClient.Settings
    {
        BatchingSettings = new Google.Api.Gax.BatchingSettings(
            elementCountThreshold: 100,
            byteCountThreshold: 10240,
            delayThreshold: TimeSpan.FromSeconds(3))
    });
// PublisherClient collects messages into appropriately sized
// batches.
var publishTasks =
    messageTexts.Select(text => publisher.PublishAsync(text));
foreach (Task<string> task in publishTasks)
{
    string message = await task;
    await Console.Out.WriteLineAsync($"Published message {message}");
}

go

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func publishWithSettings(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	t := client.Topic(topicID)
	t.PublishSettings.ByteThreshold = 5000
	t.PublishSettings.CountThreshold = 10
	t.PublishSettings.DelayThreshold = 100 * time.Millisecond

	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %v", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1 byte
long messageCountBatchSize = 10L; // default : 1 message

Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

// Publish request get triggered based on request size, messages count & time since last publish
BatchingSettings batchingSettings =
    BatchingSettings.newBuilder()
        .setElementCountThreshold(messageCountBatchSize)
        .setRequestByteThreshold(requestBytesThreshold)
        .setDelayThreshold(publishDelayThreshold)
        .build();

Publisher publisher =
    Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const batchPublisher = pubsub.topic(topicName, {
  batching: {
    maxMessages: maxMessages,
    maxMilliseconds: maxWaitTime,
  },
});

for (let i = 0; i < 10; i++) {
  (async () => {
    const messageId = await batchPublisher.publish(dataBuffer);
    console.log(`Message ${messageId} published.`);
  })();
}

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * The publisher should be used in conjunction with the `google-cloud-batch`
 * daemon, which should be running in the background.
 *
 * To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message    The message to publish.
 */
function publish_message_batch($projectId, $topicName, $message)
{
    // Check if the batch daemon is running.
    if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
        trigger_error(
            'The batch daemon is not running. Call ' .
            '`vendor/bin/google-cloud-batch daemon` from ' .
            'your project root to start the daemon.',
            E_USER_NOTICE
        );
    }

    $batchOptions = [
        'batchSize' => 100, // Max messages for each batch.
        'callPeriod' => 0.01, // Max time in seconds between each batch publish.
    ];

    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $publisher = $topic->batchPublisher([
        'batchOptions' => $batchOptions
    ]);

    for ($i = 0; $i < 10; $i++) {
        $publisher->publish(['data' => $message]);
    }

    print('Messages enqueued for publication.' . PHP_EOL);
}

python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

# Configure the batch to publish as soon as there is one kilobyte
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_bytes=1024, max_latency=1  # One kilobyte  # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u"Message number {}".format(n)
    # Data must be a bytestring
    data = data.encode("utf-8")
    future = publisher.publish(topic_path, data=data)
    print(future.result())

print("Published messages with batch settings.")

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish do |batch|
  10.times do |i|
    batch.publish "This is message \##{i}."
  end
end

puts "Messages published in batch."
# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_name, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}
10.times do |i|
  topic.publish_async "This is message \##{i}."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."

Réessayer d'exécuter des requêtes

Lorsqu'une publication échoue, elle est automatiquement relancée, sauf en cas d'erreurs qui ne justifient pas de nouvelles tentatives. Cet exemple de code illustre la création d'un éditeur avec des paramètres de nouvelle tentative personnalisés (notez que toutes les bibliothèques clientes ne sont pas compatibles avec les paramètres de nouvelle tentative. Consultez la documentation de référence sur les API pour le langage que vous avez choisi) :

Java

// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(100); // default: 100 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
Duration maxRetryDelay = Duration.ofSeconds(60); // default : 1 minute
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
double rpcTimeoutMultiplier = 1.0; // default: 1.0
Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 10 minutes
Duration totalTimeout = Duration.ofSeconds(600); // default: 10 minutes

RetrySettings retrySettings =
    RetrySettings.newBuilder()
        .setInitialRetryDelay(retryDelay)
        .setRetryDelayMultiplier(retryDelayMultiplier)
        .setMaxRetryDelay(maxRetryDelay)
        .setInitialRpcTimeout(initialRpcTimeout)
        .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
        .setMaxRpcTimeout(maxRpcTimeout)
        .setTotalTimeout(totalTimeout)
        .build();

Publisher publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

Node.js

// Imports the Google Cloud client library
const {v1} = require('@google-cloud/pubsub');

// Creates a publisher client
const client = new v1.PublisherClient({
  // optional auth parameters
});

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectId = 'my-project-id'
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

const formattedTopic = client.topicPath(projectId, topicName);
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messagesElement = {
  data: dataBuffer,
};
const messages = [messagesElement];
// Build the request
const request = {
  topic: formattedTopic,
  messages: messages,
};

// Retry settings control how the publisher handles retryable failures
// Default values are shown
const retrySettings = {
  retryCodes: [
    10, // 'ABORTED'
    1, // 'CANCELLED',
    4, // 'DEADLINE_EXCEEDED'
    13, // 'INTERNAL'
    8, // 'RESOURCE_EXHAUSTED'
    14, // 'UNAVAILABLE'
    2, // 'UNKNOWN'
  ],
  backoffSettings: {
    initialRetryDelayMillis: 100,
    retryDelayMultiplier: 1.3,
    maxRetryDelayMillis: 60000,
    initialRpcTimeoutMillis: 5000,
    rpcTimeoutMultiplier: 1.0,
    maxRpcTimeoutMillis: 600000,
    totalTimeoutMillis: 600000,
  },
};

const [response] = await client.publish(request, {retry: retrySettings});
console.log(`Message ${response.messageIds} published.`);

python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

# Configure the retry settings. Defaults will be overwritten.
retry_settings = {
    "interfaces": {
        "google.pubsub.v1.Publisher": {
            "retry_codes": {
                "publish": [
                    "ABORTED",
                    "CANCELLED",
                    "DEADLINE_EXCEEDED",
                    "INTERNAL",
                    "RESOURCE_EXHAUSTED",
                    "UNAVAILABLE",
                    "UNKNOWN",
                ]
            },
            "retry_params": {
                "messaging": {
                    "initial_retry_delay_millis": 100,  # default: 100
                    "retry_delay_multiplier": 1.3,  # default: 1.3
                    "max_retry_delay_millis": 60000,  # default: 60000
                    "initial_rpc_timeout_millis": 5000,  # default: 25000
                    "rpc_timeout_multiplier": 1.0,  # default: 1.0
                    "max_rpc_timeout_millis": 600000,  # default: 30000
                    "total_timeout_millis": 600000,  # default: 600000
                }
            },
            "methods": {
                "Publish": {
                    "retry_codes_name": "publish",
                    "retry_params_name": "messaging",
                }
            },
        }
    }
}

publisher = pubsub_v1.PublisherClient(client_config=retry_settings)
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u"Message number {}".format(n)
    # Data must be a bytestring
    data = data.encode("utf-8")
    future = publisher.publish(topic_path, data=data)
    print(future.result())

print("Published messages with retry settings.")

Les paramètres de nouvelle tentative contrôlent à la fois le nombre total de tentatives et l'intervalle exponentiel entre les tentatives (délai d'attente du client entre les nouvelles tentatives). Le délai PRC avant expiration initial correspond au temps d'attente du client avant d'effectuer une nouvelle tentative une fois le PRC initial terminé. Le délai avant expiration total correspond au temps d'attente du client avant d'arrêter d'effectuer de nouvelles tentatives. Pour relancer les requêtes de publication, le délai PRC avant expiration initial doit être inférieur au délai avant expiration total.

Une fois que le premier RPC échoue ou dépasse le délai, la logique de l'intervalle exponentiel entre les tentatives détermine à quel moment les nouvelles tentatives se produisent. À chaque nouvelle tentative, le délai RPC avant expiration augmente de la valeur de ce multiplicateur, jusqu'à atteindre le délai RPC avant expiration maximal. En outre, les paramètres de délai des nouvelles tentatives déterminent le temps d'attente du client entre l'obtention d'une erreur ou l'expiration du délai et le lancement d'une nouvelle requête.

Contrôle de simultanéité

La simultanéité n'est pas disponible avec tous les langages de programmation. Reportez-vous à la documentation de référence sur les API pour plus d'informations.

L'exemple suivant montre comment contrôler la simultanéité dans un éditeur :

go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	t := client.Topic(topicID)
	t.PublishSettings.NumGoroutines = 1

	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %v", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

// create a publisher with a single threaded executor
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
Publisher publisher =
    Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u"Message number {}".format(n)
    # Data must be a bytestring
    data = data.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data)
    print(future.result())

print("Published messages with futures.")

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!