Mensajes en lote

La mensajería por lotes crea un cliente editor con ajustes de procesamiento por lotes personalizados y lo usa para publicar algunos mensajes.

En este documento se proporciona información sobre cómo usar la mensajería por lotes con mensajes publicados en un tema.

Antes de empezar

Antes de configurar el flujo de trabajo de publicación, asegúrate de que has completado las siguientes tareas:

Roles obligatorios

Para obtener los permisos que necesitas para publicar mensajes en un tema, pide a tu administrador que te conceda el rol de gestión de identidades y accesos Editor de Pub/Sub (roles/pubsub.publisher) en el tema. Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Necesitas permisos adicionales para crear o actualizar temas y suscripciones.

Usar la mensajería por lotes

Consulta los siguientes ejemplos de código para saber cómo configurar los ajustes de mensajería por lotes de tu editor.

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // By default, the publisher will flush a batch after 10ms, after it
  // contains more than 100 message, or after it contains more than 1MiB of
  // data, whichever comes first. This changes those defaults.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      Options{}
          .set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(20))
          .set<pubsub::MaxBatchBytesOption>(4 * 1024 * 1024L)
          .set<pubsub::MaxBatchMessagesOption>(200)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración de C# que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C# de Pub/Sub.


using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishBatchedMessagesAsyncSample
{
    public async Task<int> PublishBatchMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        // Default Settings:
        // byteCountThreshold: 1000000
        // elementCountThreshold: 100
        // delayThreshold: 10 milliseconds
        var customSettings = new PublisherClient.Settings
        {
            BatchingSettings = new BatchingSettings(
                elementCountThreshold: 50,
                byteCountThreshold: 10240,
                delayThreshold: TimeSpan.FromMilliseconds(500))
        };

        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            Settings = customSettings
        }.BuildAsync();

        int publishedMessageCount = 0;
        var publishTasks = messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        // PublisherClient instance should be shutdown after use.
        // The TimeSpan specifies for how long to attempt to publish locally queued messages.
        await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));
        return publishedMessageCount;
    }
}

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func publishWithSettings(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	publisher := client.Publisher(topicID)
	publisher.PublishSettings.ByteThreshold = 5000
	publisher.PublishSettings.CountThreshold = 10
	publisher.PublishSettings.DelayThreshold = 100 * time.Millisecond

	var results []*pubsub.PublishResult
	var resultErrors []error
	for i := 0; i < 10; i++ {
		result := publisher.Publish(ctx, &pubsub.Message{
			Data: []byte("Message " + strconv.Itoa(i)),
		})
		results = append(results, result)
	}
	// The Get method blocks until a server-generated ID or
	// an error is returned for the published message.
	for i, res := range results {
		id, err := res.Get(ctx)
		if err != nil {
			resultErrors = append(resultErrors, err)
			fmt.Fprintf(w, "Failed to publish: %v", err)
			continue
		}
		fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
	}
	if len(resultErrors) != 0 {
		return fmt.Errorf("Get: %v", resultErrors[len(resultErrors)-1])
	}
	fmt.Fprintf(w, "Published messages with batch settings.")
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;

public class PublishWithBatchSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithBatchSettingsExample(projectId, topicId);
  }

  public static void publishWithBatchSettingsExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Batch settings control how the publisher batches messages
      long requestBytesThreshold = 5000L; // default : 1000 bytes
      long messageCountBatchSize = 100L; // default : 100 message

      Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

      // Publish request get triggered based on request size, messages count & time since last
      // publish, whichever condition is met first.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder()
              .setElementCountThreshold(messageCountBatchSize)
              .setRequestByteThreshold(requestBytesThreshold)
              .setDelayThreshold(publishDelayThreshold)
              .build();

      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with batch settings.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishBatchedMessages(
  topicNameOrId,
  data,
  maxMessages,
  maxWaitTime,
) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const publishOptions = {
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime * 1000,
    },
  };
  const batchPublisher = pubSubClient.topic(topicNameOrId, publishOptions);

  const promises = [];
  for (let i = 0; i < 10; i++) {
    promises.push(
      (async () => {
        const messageId = await batchPublisher.publishMessage({
          data: dataBuffer,
        });
        console.log(`Message ${messageId} published.`);
      })(),
    );
  }
  await Promise.all(promises);
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;

// Imports the Google Cloud client library
import {PublishOptions, PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishBatchedMessages(
  topicNameOrId: string,
  data: string,
  maxMessages: number,
  maxWaitTime: number,
) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Cache topic objects (publishers) and reuse them.
  const publishOptions: PublishOptions = {
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime * 1000,
    },
  };
  const batchPublisher = pubSubClient.topic(topicNameOrId, publishOptions);

  const promises: Promise<void>[] = [];
  for (let i = 0; i < 10; i++) {
    promises.push(
      (async () => {
        const messageId = await batchPublisher.publishMessage({
          data: dataBuffer,
        });
        console.log(`Message ${messageId} published.`);
      })(),
    );
  }
  await Promise.all(promises);
}

PHP

Antes de probar este ejemplo, sigue las instrucciones de configuración de PHP que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * The publisher should be used in conjunction with the `google-cloud-batch`
 * daemon, which should be running in the background.
 *
 * To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message    The message to publish.
 */
function publish_message_batch($projectId, $topicName, $message)
{
    // Check if the batch daemon is running.
    if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
        trigger_error(
            'The batch daemon is not running. Call ' .
            '`vendor/bin/google-cloud-batch daemon` from ' .
            'your project root to start the daemon.',
            E_USER_NOTICE
        );
    }

    $batchOptions = [
        'batchSize' => 100, // Max messages for each batch.
        'callPeriod' => 0.01, // Max time in seconds between each batch publish.
    ];

    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $publisher = $topic->batchPublisher([
        'batchOptions' => $batchOptions
    ]);

    for ($i = 0; $i < 10; $i++) {
        $publisher->publish(['data' => $message]);
    }

    print('Messages enqueued for publication.' . PHP_EOL);
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure the batch to publish as soon as there are 10 messages
# or 1 KiB of data, or 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_messages=10,  # default 100
    max_bytes=1024,  # default 1 MB
    max_latency=1,  # default 10 ms
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(future: pubsub_v1.publisher.futures.Future) -> None:
    message_id = future.result()
    print(message_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    publish_future = publisher.publish(topic_path, data)
    # Non-blocking. Allow the publisher client to batch multiple messages.
    publish_future.add_done_callback(callback)
    publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with batch settings to {topic_path}.")

Ruby

En el siguiente ejemplo se usa la biblioteca de cliente de Ruby Pub/Sub v3. Si sigues usando la biblioteca v2, consulta la guía de migración a la versión 3. Para ver una lista de ejemplos de código de Ruby v2, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Ruby de Pub/Sub.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
publisher = pubsub.publisher topic_id, async: {
  max_bytes:    1_000_000,
  max_messages: 20
}

10.times do |i|
  publisher.publish_async "This is message ##{i}."
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."

Inhabilitar el envío de mensajes en lote

Para desactivar el procesamiento por lotes en tu biblioteca de cliente, asigna el valor max_messages a 1.

Mensajería por lotes y entrega ordenada

Con la entrega ordenada, si no se confirma ningún mensaje del lote, significa que todos los mensajes del lote, incluidos los que se enviaron antes del mensaje que no se confirmó, se volverán a entregar.

Cuotas y límites de los mensajes por lotes

Antes de configurar la mensajería por lotes, ten en cuenta el efecto de factores como la cuota de rendimiento de publicación y el tamaño máximo de un lote. Las bibliotecas de cliente de alto nivel se aseguran de que las solicitudes por lotes se mantengan dentro de los límites especificados.

  • 1000 bytes es el tamaño mínimo de solicitud que se tiene en cuenta a efectos de costes, aunque el tamaño real del mensaje sea inferior a 1000 bytes.
  • Pub/Sub tiene un límite de 10 MB o 1000 mensajes por solicitud de publicación por lotes.

Para obtener más información, consulta las cuotas y los límites de Pub/Sub.

Siguientes pasos

Para saber cómo configurar las opciones de publicación avanzadas, consulta lo siguiente: