Procesar más mensajes con control de simultaneidad

El control de simultaneidad es una función disponible en la biblioteca de cliente de alto nivel de Pub/Sub. También puedes implementar tu propio control de simultaneidad cuando uses una biblioteca de bajo nivel.

La compatibilidad con el control de simultaneidad depende del lenguaje de programación de la biblioteca de cliente. En el caso de las implementaciones de lenguajes que admiten subprocesos paralelos, como C++, Go y Java, las bibliotecas de cliente eligen un número de subprocesos predeterminado.

Puede que esta opción no sea la más adecuada para tu aplicación. Por ejemplo, si tu aplicación de suscriptor no puede seguir el ritmo del volumen de mensajes entrantes y no está limitada por la CPU, debes aumentar el número de subprocesos. En el caso de las operaciones de procesamiento de mensajes que requieren un uso intensivo de la CPU, puede ser adecuado reducir el número de subprocesos.

En esta página se explica el concepto de control de simultaneidad y cómo configurar la función para tus clientes suscriptores. Para configurar tus clientes editores para el control de la simultaneidad, consulta Control de la simultaneidad.

Configuraciones de control de simultaneidad

Los valores predeterminados de las variables de control de simultaneidad y los nombres de las variables pueden variar en las bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API. Por ejemplo, en la biblioteca de cliente de Java, los métodos para configurar el control de simultaneidad son setParallelPullCount(), setExecutorProvider(), setSystemExecutorProvider() y setChannelProvider().

  • setParallelPullCount() te permite decidir cuántas secuencias abrir. Puedes abrir más flujos si tu cliente de suscriptor puede gestionar más datos de los que se envían en un solo flujo, que son 10 MBps.

  • setExecutorProvider() te permite personalizar el proveedor de ejecutores que se usa para procesar mensajes. Por ejemplo, puedes cambiar el proveedor del ejecutor por uno que devuelva un único ejecutor compartido con un número limitado de hilos en varios clientes suscriptores. Esta configuración ayuda a limitar el número de hilos creados. El número total de subprocesos utilizados para el control de la simultaneidad depende del proveedor de ejecutores que se haya pasado en la biblioteca de cliente y del número de extracciones paralelas.

  • setSystemExecutorProvider() te permite personalizar el proveedor del ejecutor que se usa para la gestión de arrendamientos. Por lo general, no se configura este valor a menos que quieras usar el mismo proveedor de ejecutor en setExecutorProvider y setSystemExecutorProvider. Por ejemplo, puedes usar el mismo proveedor de ejecutores si tienes varias suscripciones de bajo rendimiento. Si se usa el mismo valor, se limita el número de hilos del cliente.

  • setChannelProvider() te permite personalizar el proveedor de canales que se usa para abrir conexiones a Pub/Sub. Normalmente, no se configura este valor a menos que quieras usar el mismo canal en varios clientes de suscriptor. Si se reutiliza un canal en demasiados clientes, pueden producirse errores GOAWAY o ENHANCE_YOUR_CALM. Si ves estos errores en los registros de tu aplicación o en Cloud Logs, crea más canales.

Ejemplos de código para el control de la simultaneidad

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	// NumGoroutines determines the number of streams sub.Receive will spawn to pull
	// messages. It is recommended to set this to 1, unless your throughput
	// is greater than 10 MB/s, as even having 1 stream can still result in
	// messages being handled asynchronously.
	sub.ReceiveSettings.NumGoroutines = 1
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

En el siguiente ejemplo se usa la biblioteca de cliente de Ruby Pub/Sub v3. Si sigues usando la biblioteca v2, consulta la guía de migración a la versión 3. Para ver una lista de ejemplos de código de Ruby v2, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Ruby de Pub/Sub.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
listener = subscriber.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

Siguientes pasos

Consulta las otras opciones de entrega que puedes configurar para una suscripción: