Control de simultaneidad

En este documento se proporciona información sobre cómo usar el control de simultaneidad con los mensajes publicados en un tema.

El control de simultaneidad le ayuda a anular el número predeterminado de subprocesos de entrada/salida en segundo plano que usa la biblioteca de cliente para publicar mensajes. De esta forma, los clientes editores pueden enviar mensajes en paralelo.

El control de simultaneidad es una función disponible en la biblioteca de cliente de alto nivel de Pub/Sub. También puedes implementar tu propio control de simultaneidad cuando uses una biblioteca de bajo nivel.

La compatibilidad con el control de simultaneidad depende del lenguaje de programación de la biblioteca de cliente. En el caso de las implementaciones de lenguajes que admiten subprocesos paralelos, como C++, Go y Java, las bibliotecas de cliente eligen un número de subprocesos predeterminado.

En esta página se explica el concepto de control de simultaneidad y cómo configurar la función para tus clientes editores. Para configurar tus clientes de suscriptor para el control de la simultaneidad, consulta Procesar más mensajes con el control de la simultaneidad.

Antes de empezar

Antes de configurar el flujo de trabajo de publicación, asegúrate de que has completado las siguientes tareas:

Roles obligatorios

Para obtener los permisos que necesitas para publicar mensajes en un tema, pide a tu administrador que te conceda el rol de gestión de identidades y accesos Editor de Pub/Sub (roles/pubsub.publisher) en el tema. Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Necesitas permisos adicionales para crear o actualizar temas y suscripciones.

Configuraciones de control de simultaneidad

Los valores predeterminados de las variables de control de simultaneidad y los nombres de las variables pueden variar en las bibliotecas de cliente. Por ejemplo, en la biblioteca de cliente de Java, los métodos para configurar el control de simultaneidad son setExecutorProvider() y setChannelProvider(). Para obtener más información, consulta la documentación de referencia de la API.

  • setExecutorProvider() te permite personalizar el proveedor de ejecutores que se usa para procesar las respuestas de publicación. Por ejemplo, puedes cambiar el proveedor de ejecutores por uno que devuelva un único ejecutor compartido con un número limitado de hilos en varios clientes de editores. Esta configuración ayuda a limitar el número de hilos creados.

  • setChannelProvider() te permite personalizar el proveedor de canales que se usa para abrir conexiones a Pub/Sub. Por lo general, no se configura este valor a menos que quieras usar el mismo canal en varios clientes editores. Si se reutiliza un canal en demasiados clientes, pueden producirse errores GOAWAY o ENHANCE_YOUR_CALM. Si ves estos errores en los registros de tu aplicación o en Cloud Logs, crea más canales.

Ejemplos de código para el control de la simultaneidad

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	publisher := client.Publisher(topicID)
	publisher.PublishSettings.NumGoroutines = 1

	result := publisher.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

En el siguiente ejemplo se usa la biblioteca de cliente de Ruby Pub/Sub v3. Si sigues usando la biblioteca v2, consulta la guía de migración a la versión 3. Para ver una lista de ejemplos de código de Ruby v2, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Ruby de Pub/Sub.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}

publisher.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!

Siguientes pasos