Suscribirse con control de simultaneidad

Configura el número de hilos para recibir mensajes y procesar las retrollamadas de mensajes.

Investigar más

Para obtener documentación detallada que incluya este código de muestra, consulta lo siguiente:

Código de ejemplo

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API C++ Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received messa<<ge <<"  m  "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración de C# que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API C# Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class SubscriberConcurrencyControlSample
{
    public async Task<int> SubscribeWithConcurrencyControlAsync(string projectId, string subscriptionId)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        SubscriberClient subscriber = await new SubscriberClientBuilder
        {
            SubscriptionName = subscriptionName,
            // Normally the number of clients depends on the number of processors.
            // Here we explicitly request 2 concurrent clients instead.
            ClientCount = 2
        }.BuildAsync();

        int count = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancellationToken) =>
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Received message: {text}");
            Interlocked.Increment(ref count);
            return Task.FromResult(SubscriberClient.Reply.Ack);
        });
        // Run for 10 seconds.
        await Task.Delay(10_000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return count;
    }
}

Go

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	// NumGoroutines determines the number of streams sub.Receive will spawn to pull
	// messages. It is recommended to set this to 1, unless your throughput
	// is greater than 10 MB/s, as even having 1 stream can still result in
	// messages being handled asynchronously.
	sub.ReceiveSettings.NumGoroutines = 1
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled& or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Java Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReply>Consumer consumer) - {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Python Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

# An optional executor to use. If not specified, a default one with maximum 10
# threads will be created.
executor = futures.ThreadPoolExecutor(max_workers=5)
# A thread pool-based scheduler. It must not be shared across SubscriberClients.
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor)

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscribe>r.message.Message) - None:
    print(f&quot;Received {message.data!r}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback, scheduler=scheduler
)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Ruby Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
listener = subscriber.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

Siguientes pasos

Para buscar y filtrar ejemplos de código de otros Google Cloud productos, consulta el Google Cloud navegador de ejemplos.