Como publicar mensagens

Neste documento, você verá informações sobre a publicação de mensagens.

Um aplicativo do editor cria e envia mensagens para um tópico. O Pub/Sub oferece entrega da mensagem pelo menos uma vez e ordenação dos assinantes existentes no modelo de "melhor esforço" (best-effort), como explicado na Visão geral do assinante.

Este é o fluxo geral de um aplicativo do editor:

  1. Criar uma mensagem contendo seus dados.
  2. Enviar uma solicitação para o servidor do Pub/Sub publicar a mensagem no tópico desejado.

Configurar

Consulte o Guia de primeiros passos das bibliotecas de cliente para configurar seu ambiente na linguagem de programação que preferir.

Publicar mensagens em um tópico

Para usar JSON com REST, é necessário que os dados da mensagem sejam codificados em base64. Toda a solicitação, incluindo uma ou mais mensagens, precisa ter menos do que 10 MB após a decodificação. Observe que o payload da mensagem não pode estar vazio. Ele precisa conter um campo de dados não vazio ou pelo menos um atributo.

Dependendo da escolha da linguagem de programação, as bibliotecas de cliente podem publicar mensagens de maneira síncrona ou assíncrona. A publicação assíncrona permite o processamento por lotes e uma maior capacidade no aplicativo.

Todas as bibliotecas de cliente são compatíveis com a publicação de mensagens de maneira assíncrona. Consulte a documentação de referência de APIs da linguagem de programação escolhida para conferir se a biblioteca de cliente também é compatível com a publicação de mensagens de maneira síncrona, se essa for sua opção preferida.

Um ID gerado pelo servidor, exclusivo dentro do tópico, é retornado quando uma mensagem é publicada.

Protocolo

Solicitação:

A solicitação precisa ser autenticada com um token de acesso no cabeçalho Authorization. Para conseguir um token de acesso para o Application Default Credentials: gcloud auth application-default print-access-token.

    POST     https://pubsub.googleapis.com/v1/projects/myproject/topics/mytopic:publish
    Authorization: Bearer ACCESS_TOKEN
    

Especifique os campos a seguir no corpo da solicitação:

    {
      "messages": [
        {
          "attributes": {
            "key": "iana.org/language_tag",
            "value": "en"
          },
          "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ=="
        }
      ]
    }
    

Resposta:

200 OK
    {
      "messageIds": [
        "19916711285"
      ]
    }
    

linha de comando

    gcloud pubsub topics publish my-topic --message "hello"
    

C#

PublisherClient publisher = await PublisherClient.CreateAsync(
        new TopicName(projectId, topicId));
    var publishTasks =
        messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                await Console.Out.WriteLineAsync($"Published message {message}");
            }
            catch (Exception exception)
            {
                await Console.Out.WriteLineAsync($"An error ocurred when publishing message {text}:");
                await Console.Out.WriteLineAsync(exception.Message);
            }
        });
    await Task.WhenAll(publishTasks);

go

import (
    	"context"
    	"fmt"
    	"io"
    	"strconv"
    	"sync"
    	"sync/atomic"

    	"cloud.google.com/go/pubsub"
    )

    func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	var wg sync.WaitGroup
    	var totalErrors uint64
    	t := client.Topic(topicID)

    	for i := 0; i < n; i++ {
    		result := t.Publish(ctx, &pubsub.Message{
    			Data: []byte("Message " + strconv.Itoa(i)),
    		})

    		wg.Add(1)
    		go func(i int, res *pubsub.PublishResult) {
    			defer wg.Done()
    			// The Get method blocks until a server-generated ID or
    			// an error is returned for the published message.
    			id, err := res.Get(ctx)
    			if err != nil {
    				// Error handling code can be added here.
    				fmt.Fprintf(w, "Failed to publish: %v", err)
    				atomic.AddUint64(&totalErrors, 1)
    				return
    			}
    			fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
    		}(i, result)
    	}

    	wg.Wait()

    	if totalErrors > 0 {
    		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
    	}
    	return nil
    }
    

java

ProjectTopicName topicName = ProjectTopicName.of("my-project-id", "my-topic-id");
    Publisher publisher = null;

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      List<String> messages = Arrays.asList("first message", "second message");

      for (final String message : messages) {
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> future = publisher.publish(pubsubMessage);

        // Add an asynchronous callback to handle success / failure
        ApiFutures.addCallback(
            future,
            new ApiFutureCallback<String>() {

              @Override
              public void onFailure(Throwable throwable) {
                if (throwable instanceof ApiException) {
                  ApiException apiException = ((ApiException) throwable);
                  // details on the API exception
                  System.out.println(apiException.getStatusCode().getCode());
                  System.out.println(apiException.isRetryable());
                }
                System.out.println("Error publishing message : " + message);
              }

              @Override
              public void onSuccess(String messageId) {
                // Once published, returns server-assigned message ids (unique within the topic)
                System.out.println(messageId);
              }
            },
            MoreExecutors.directExecutor());
      }
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }

node.js

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const topicName = 'YOUR_TOPIC_NAME';
    // const data = JSON.stringify({foo: 'bar'});

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    async function publishMessage() {
      /**
       * TODO(developer): Uncomment the following lines to run the sample.
       */
      // const topicName = 'my-topic';

      // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
      const dataBuffer = Buffer.from(data);

      const messageId = await pubSubClient.topic(topicName).publish(dataBuffer);
      console.log(`Message ${messageId} published.`);
    }

    publishMessage().catch(console.error);

php

use Google\Cloud\PubSub\PubSubClient;

    /**
     * Publishes a message for a Pub/Sub topic.
     *
     * @param string $projectId  The Google project ID.
     * @param string $topicName  The Pub/Sub topic name.
     * @param string $message  The message to publish.
     */
    function publish_message($projectId, $topicName, $message)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $topic = $pubsub->topic($topicName);
        $topic->publish(['data' => $message]);
        print('Message published' . PHP_EOL);
    }

python

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
    import time

    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"

    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)

    futures = dict()

    def get_callback(f, data):
        def callback(f):
            try:
                print(f.result())
                futures.pop(data)
            except:  # noqa
                print("Please handle {} for {}.".format(f.exception(), data))

        return callback

    for i in range(10):
        data = str(i)
        futures.update({data: None})
        # When you publish a message, the client returns a future.
        future = publisher.publish(
            topic_path, data=data.encode("utf-8")  # data must be a bytestring.
        )
        futures[data] = future
        # Publish failures shall be handled in the callback function.
        future.add_done_callback(get_callback(future, data))

    # Wait for all the publish futures to resolve before exiting.
    while futures:
        time.sleep(5)

    print("Published message with error handler.")

ruby

# project_id = "Your Google Cloud Project ID"
    # topic_name = "Your Pubsub topic name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    topic = pubsub.topic topic_name
    topic.publish_async "This is a test message." do |result|
      raise "Failed to publish the message." unless result.succeeded?
      puts "Message published asynchronously."
    end

    # Stop the async_publisher to send all queued messages immediately.
    topic.async_publisher.stop.wait!

Novos atributos personalizados

É possível incorporar atributos personalizados como metadados nas mensagens do Pub/Sub. Os atributos podem ser strings de texto ou de bytes. O esquema da mensagem pode ser representado da seguinte forma:

    {
      "data": string,
      "attributes": {
        string: string,
        ...
      },
      "messageId": string,
      "publishTime": string
    }
    

O esquema JSON do PubsubMessage é publicado como parte da documentação do REST e RPC.

go

import (
    	"context"
    	"fmt"
    	"io"

    	"cloud.google.com/go/pubsub"
    )

    func publishCustomAttributes(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	t := client.Topic(topicID)
    	result := t.Publish(ctx, &pubsub.Message{
    		Data: []byte("Hello world!"),
    		Attributes: map[string]string{
    			"origin":   "golang",
    			"username": "gcp",
    		},
    	})
    	// Block until the result is returned and a server-generated
    	// ID is returned for the published message.
    	id, err := result.Get(ctx)
    	if err != nil {
    		return fmt.Errorf("Get: %v", err)
    	}
    	fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id)
    	return nil
    }
    

node.js

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const topicName = 'YOUR_TOPIC_NAME';
    // const data = JSON.stringify({foo: 'bar'});

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    async function publishMessageWithCustomAttributes() {
      // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
      const dataBuffer = Buffer.from(data);

      // Add two custom attributes, origin and username, to the message
      const customAttributes = {
        origin: 'nodejs-sample',
        username: 'gcp',
      };

      const messageId = await pubSubClient
        .topic(topicName)
        .publish(dataBuffer, customAttributes);
      console.log(`Message ${messageId} published.`);
    }

    publishMessageWithCustomAttributes().catch(console.error);

python

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"

    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)

    for n in range(1, 10):
        data = u"Message number {}".format(n)
        # Data must be a bytestring
        data = data.encode("utf-8")
        # Add two attributes, origin and username, to the message
        future = publisher.publish(
            topic_path, data, origin="python-sample", username="gcp"
        )
        print(future.result())

    print("Published messages with custom attributes.")

ruby

# project_id = "Your Google Cloud Project ID"
    # topic_name = "Your Pubsub topic name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    topic = pubsub.topic topic_name
    # Add two attributes, origin and username, to the message
    topic.publish_async "This is a test message.",
                        origin:   "ruby-sample",
                        username: "gcp" do |result|
      raise "Failed to publish the message." unless result.succeeded?
      puts "Message with custom attributes published asynchronously."
    end

    # Stop the async_publisher to send all queued messages immediately.
    topic.async_publisher.stop.wait!

Agrupamento para equilibrar a latência e a capacidade

As bibliotecas de cliente do Pub/Sub agrupam várias mensagens em uma única chamada para o serviço. Lotes maiores aumentam a capacidade das mensagens, ou seja, a taxa de mensagens enviadas por CPU. O custo do agrupamento é a latência das mensagens individuais, que são enfileiradas na memória até que o lote correspondente seja completamente preenchido e esteja pronto para ser enviado pela rede. Para minimizar a latência, desative o agrupamento. Isso é particularmente importante para aplicativos que publicam uma única mensagem como parte de uma sequência de solicitação e resposta. Um exemplo comum desse padrão pode ser observado em aplicativos sem servidor e orientados a eventos que usam o Cloud Functions ou o App Engine.

As mensagens podem colocadas em lote com base no tamanho da solicitação (em bytes), número de mensagens e tempo. É possível modificar as configurações padrão como mostrado no exemplo abaixo:

c#

PublisherClient publisher = await PublisherClient.CreateAsync(
        new TopicName(projectId, topicId),
        settings: new PublisherClient.Settings
        {
            BatchingSettings = new Google.Api.Gax.BatchingSettings(
                elementCountThreshold: 100,
                byteCountThreshold: 10240,
                delayThreshold: TimeSpan.FromSeconds(3))
        });
    // PublisherClient collects messages into appropriately sized
    // batches.
    var publishTasks =
        messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                await Console.Out.WriteLineAsync($"Published message {message}");
            }
            catch (Exception exception)
            {
                await Console.Out.WriteLineAsync($"An error ocurred when publishing message {text}:");
                await Console.Out.WriteLineAsync(exception.Message);
            }
        });
    await Task.WhenAll(publishTasks);

go

import (
    	"context"
    	"fmt"
    	"io"
    	"strconv"
    	"time"

    	"cloud.google.com/go/pubsub"
    )

    func publishWithSettings(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}
    	var results []*pubsub.PublishResult
    	var resultErrors []error
    	t := client.Topic(topicID)
    	t.PublishSettings.ByteThreshold = 5000
    	t.PublishSettings.CountThreshold = 10
    	t.PublishSettings.DelayThreshold = 100 * time.Millisecond

    	for i := 0; i < 10; i++ {
    		result := t.Publish(ctx, &pubsub.Message{
    			Data: []byte("Message " + strconv.Itoa(i)),
    		})
    		results = append(results, result)
    	}
    	// The Get method blocks until a server-generated ID or
    	// an error is returned for the published message.
    	for i, res := range results {
    		id, err := res.Get(ctx)
    		if err != nil {
    			resultErrors = append(resultErrors, err)
    			fmt.Fprintf(w, "Failed to publish: %v", err)
    			continue
    		}
    		fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
    	}
    	if len(resultErrors) != 0 {
    		return fmt.Errorf("Get: %v", resultErrors[len(resultErrors)-1])
    	}
    	fmt.Fprintf(w, "Published messages with batch settings.")
    	return nil
    }
    

java

// Batch settings control how the publisher batches messages
    long requestBytesThreshold = 5000L; // default : 1 byte
    long messageCountBatchSize = 10L; // default : 1 message

    Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

    // Publish request get triggered based on request size, messages count & time since last publish
    BatchingSettings batchingSettings =
        BatchingSettings.newBuilder()
            .setElementCountThreshold(messageCountBatchSize)
            .setRequestByteThreshold(requestBytesThreshold)
            .setDelayThreshold(publishDelayThreshold)
            .build();

    Publisher publisher =
        Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

node.js

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const topicName = 'YOUR_TOPIC_NAME';
    // const data = JSON.stringify({foo: 'bar'});
    // const maxMessages = 10;
    // const maxWaitTime = 10;

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    async function publishBatchedMessages() {
      // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
      const dataBuffer = Buffer.from(data);

      const batchPublisher = pubSubClient.topic(topicName, {
        batching: {
          maxMessages: maxMessages,
          maxMilliseconds: maxWaitTime * 1000,
        },
      });

      for (let i = 0; i < 10; i++) {
        (async () => {
          const messageId = await batchPublisher.publish(dataBuffer);
          console.log(`Message ${messageId} published.`);
        })();
      }
    }

    publishBatchedMessages().catch(console.error);

php

use Google\Cloud\PubSub\PubSubClient;

    /**
     * Publishes a message for a Pub/Sub topic.
     *
     * The publisher should be used in conjunction with the `google-cloud-batch`
     * daemon, which should be running in the background.
     *
     * To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
     *
     * @param string $projectId  The Google project ID.
     * @param string $topicName  The Pub/Sub topic name.
     * @param string $message    The message to publish.
     */
    function publish_message_batch($projectId, $topicName, $message)
    {
        // Check if the batch daemon is running.
        if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
            trigger_error(
                'The batch daemon is not running. Call ' .
                '`vendor/bin/google-cloud-batch daemon` from ' .
                'your project root to start the daemon.',
                E_USER_NOTICE
            );
        }

        $batchOptions = [
            'batchSize' => 100, // Max messages for each batch.
            'callPeriod' => 0.01, // Max time in seconds between each batch publish.
        ];

        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $topic = $pubsub->topic($topicName);
        $publisher = $topic->batchPublisher([
            'batchOptions' => $batchOptions
        ]);

        for ($i = 0; $i < 10; $i++) {
            $publisher->publish(['data' => $message]);
        }

        print('Messages enqueued for publication.' . PHP_EOL);
    }

python

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"

    # Configure the batch to publish as soon as there is ten messages,
    # one kilobyte of data, or one second has passed.
    batch_settings = pubsub_v1.types.BatchSettings(
        max_messages=10,  # default 100
        max_bytes=1024,  # default 1 MB
        max_latency=1,  # default 10 ms
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)
    topic_path = publisher.topic_path(project_id, topic_name)

    # Resolve the publish future in a separate thread.
    def callback(future):
        message_id = future.result()
        print(message_id)

    for n in range(1, 10):
        data = u"Message number {}".format(n)
        # Data must be a bytestring
        data = data.encode("utf-8")
        future = publisher.publish(topic_path, data=data)
        # Non-blocking. Allow the publisher client to batch multiple messages.
        future.add_done_callback(callback)

    print("Published messages with batch settings.")

ruby

# project_id = "Your Google Cloud Project ID"
    # topic_name = "Your Pubsub topic name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    topic = pubsub.topic topic_name
    topic.publish do |batch|
      10.times do |i|
        batch.publish "This is message \##{i}."
      end
    end

    puts "Messages published in batch."
    # project_id = "Your Google Cloud Project ID"
    # topic_name = "Your Pubsub topic name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    # Start sending messages in one request once the size of all queued messages
    # reaches 1 MB or the number of queued messages reaches 20
    topic = pubsub.topic topic_name, async: {
      max_bytes:    1_000_000,
      max_messages: 20
    }
    10.times do |i|
      topic.publish_async "This is message \##{i}."
    end

    # Stop the async_publisher to send all queued messages immediately.
    topic.async_publisher.stop.wait!
    puts "Messages published asynchronously in batch."

Como reenviar solicitações

As publicações com falha são automaticamente reenviadas, com a exceção de erros que não possibilitem novas tentativas. Esta amostra de código demonstra a criação de um editor com configurações de reenvio personalizadas. Nem todas as bibliotecas de cliente são compatíveis com essas configurações, por isso consulte a documentação de referência da API da linguagem escolhida.

java

// Retry settings control how the publisher handles retryable failures
    Duration retryDelay = Duration.ofMillis(100); // default: 100 ms
    double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
    Duration maxRetryDelay = Duration.ofSeconds(60); // default : 1 minute
    Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
    double rpcTimeoutMultiplier = 1.0; // default: 1.0
    Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 10 minutes
    Duration totalTimeout = Duration.ofSeconds(600); // default: 10 minutes

    RetrySettings retrySettings =
        RetrySettings.newBuilder()
            .setInitialRetryDelay(retryDelay)
            .setRetryDelayMultiplier(retryDelayMultiplier)
            .setMaxRetryDelay(maxRetryDelay)
            .setInitialRpcTimeout(initialRpcTimeout)
            .setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
            .setMaxRpcTimeout(maxRpcTimeout)
            .setTotalTimeout(totalTimeout)
            .build();

    Publisher publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();

node.js

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const projectId = 'YOUR_PROJECT_ID'
    // const topicName = 'YOUR_TOPIC_NAME';
    // const data = JSON.stringify({foo: 'bar'});

    // Imports the Google Cloud client library. v1 is for the lower level
    // proto access.
    const {v1} = require('@google-cloud/pubsub');

    // Creates a publisher client.
    const publisherClient = new v1.PublisherClient({
      // optional auth parameters
    });

    async function publishWithRetrySettings() {
      const formattedTopic = publisherClient.topicPath(projectId, topicName);

      // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
      const dataBuffer = Buffer.from(data);
      const messagesElement = {
        data: dataBuffer,
      };
      const messages = [messagesElement];

      // Build the request
      const request = {
        topic: formattedTopic,
        messages: messages,
      };

      // Retry settings control how the publisher handles retryable failures
      // Default values are shown
      const retrySettings = {
        retryCodes: [
          10, // 'ABORTED'
          1, // 'CANCELLED',
          4, // 'DEADLINE_EXCEEDED'
          13, // 'INTERNAL'
          8, // 'RESOURCE_EXHAUSTED'
          14, // 'UNAVAILABLE'
          2, // 'UNKNOWN'
        ],
        backoffSettings: {
          initialRetryDelayMillis: 100,
          retryDelayMultiplier: 1.3,
          maxRetryDelayMillis: 60000,
          initialRpcTimeoutMillis: 5000,
          rpcTimeoutMultiplier: 1.0,
          maxRpcTimeoutMillis: 600000,
          totalTimeoutMillis: 600000,
        },
      };

      const [response] = await publisherClient.publish(request, {
        retry: retrySettings,
      });
      console.log(`Message ${response.messageIds} published.`);
    }

    publishWithRetrySettings().catch(console.error);

python

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"

    # Configure the retry settings. Defaults will be overwritten.
    retry_settings = {
        "interfaces": {
            "google.pubsub.v1.Publisher": {
                "retry_codes": {
                    "publish": [
                        "ABORTED",
                        "CANCELLED",
                        "DEADLINE_EXCEEDED",
                        "INTERNAL",
                        "RESOURCE_EXHAUSTED",
                        "UNAVAILABLE",
                        "UNKNOWN",
                    ]
                },
                "retry_params": {
                    "messaging": {
                        "initial_retry_delay_millis": 100,  # default: 100
                        "retry_delay_multiplier": 1.3,  # default: 1.3
                        "max_retry_delay_millis": 60000,  # default: 60000
                        "initial_rpc_timeout_millis": 5000,  # default: 25000
                        "rpc_timeout_multiplier": 1.0,  # default: 1.0
                        "max_rpc_timeout_millis": 600000,  # default: 30000
                        "total_timeout_millis": 600000,  # default: 600000
                    }
                },
                "methods": {
                    "Publish": {
                        "retry_codes_name": "publish",
                        "retry_params_name": "messaging",
                    }
                },
            }
        }
    }

    publisher = pubsub_v1.PublisherClient(client_config=retry_settings)
    topic_path = publisher.topic_path(project_id, topic_name)

    for n in range(1, 10):
        data = u"Message number {}".format(n)
        # Data must be a bytestring
        data = data.encode("utf-8")
        future = publisher.publish(topic_path, data=data)
        print(future.result())

    print("Published messages with retry settings.")

As configurações de nova tentativa controlam o número total de tentativas e espera exponencial (quanto tempo o cliente aguarda entre as novas tentativas subsequentes). O tempo limite de RPC inicial é o tempo que o cliente espera até que o RPC inicial seja bem-sucedido antes de tentar novamente. O tempo limite total é o tempo que o cliente aguarda antes de parar de tentar. Para repetir as solicitações de publicação, o tempo limite inicial do RPC deve ser menor que o tempo limite total.

Quando o primeiro RPC falha ou atinge o tempo limite, a lógica de espera exponencial determina quando serão realizadas as tentativas subsequentes. Em cada nova tentativa, o tempo limite do RPC aumenta de acordo com esse multiplicador, até atingir o tempo limite máximo do RPC. Além disso, a configuração de atraso de novas tentativas determina quanto tempo o cliente aguarda entre receber um erro ou atingir o tempo limite e iniciar a próxima solicitação.

Controle de simultaneidade

A compatibilidade com a simultaneidade depende da linguagem de programação. Consulte a documentação de referência de APIs para mais informações.

O exemplo a seguir ilustra como controlar a simultaneidade em um editor:

go

import (
    	"context"
    	"fmt"
    	"io"

    	"cloud.google.com/go/pubsub"
    )

    func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	// msg := "Hello World"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	t := client.Topic(topicID)
    	t.PublishSettings.NumGoroutines = 1

    	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
    	// Block until the result is returned and a server-generated
    	// ID is returned for the published message.
    	id, err := result.Get(ctx)
    	if err != nil {
    		return fmt.Errorf("Get: %v", err)
    	}
    	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
    	return nil
    }
    

java

// create a publisher with a single threaded executor
    ExecutorProvider executorProvider =
        InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
    Publisher publisher =
        Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

ruby

# project_id = "Your Google Cloud Project ID"
    # topic_name = "Your Pubsub topic name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    topic = pubsub.topic topic_name, async: {
      threads: {
        # Use exactly one thread for publishing message and exactly one thread
        # for executing callbacks
        publish:  1,
        callback: 1
      }
    }
    topic.publish_async "This is a test message." do |result|
      raise "Failed to publish the message." unless result.succeeded?
      puts "Message published asynchronously."
    end

    # Stop the async_publisher to send all queued messages immediately.
    topic.async_publisher.stop.wait!