Elaborare più messaggi con il controllo della contemporaneità

Il controllo della concorrenza è una funzionalità disponibile nella libreria client di alto livello Pub/Sub. Puoi anche implementare il tuo controllo della contemporaneità quando utilizzi una libreria a basso livello.

Il supporto del controllo della contemporaneità dipende dal linguaggio di programmazione della libreria client. Per le implementazioni di linguaggi che supportano thread paralleli, come C++, Go e Java, le librerie client fanno una scelta predefinita per il numero di thread.

Questa scelta potrebbe non essere ottimale per la tua applicazione. Ad esempio, se la tua applicazione di abbonamento non è al passo con il volume di messaggi in entrata e non è vincolata alla CPU, devi aumentare il numero di thread. Per operazioni di elaborazione dei messaggi che richiedono un'elevata intensità di risorse della CPU, potrebbe essere opportuno ridurre il numero di thread.

Questa pagina spiega il concetto di controllo della contemporaneità e come configurare la funzionalità per i clienti sottoscrittori. Per configurare i client publisher per controllo della contemporaneità, consulta Controllo della concorrenza.

Configurazioni di controllo della concorrenza

I valori predefiniti per le variabili di controllo della contemporaneità e i nomi delle variabili possono variare in base alle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API. Ad esempio, nella libreria client Java, i metodi per configurare controllo della contemporaneità sono setParallelPullCount(), setExecutorProvider(), setSystemExecutorProvider() e setChannelProvider().

  • setParallelPullCount() ti consente di decidere quanti stream aprire. Puoi aprire più stream se il client sottoscrittore è in grado di gestire più dati di quelli inviati su un singolo stream, ovvero 10 MB/s.

  • setExecutorProvider() ti consente di personalizzare il provider dell'executor utilizzato per elaborare i messaggi. Ad esempio, puoi modificare il provider dell'executor in modo che restituisca un singolo executor condiviso con un numero limitato di thread su più client di abbonati. Questa configurazione consente di limitare il numero di thread creati. Il numero totale di thread utilizzati per controllo della contemporaneità dipende dal provider dell'executor passato nella libreria client e dal numero di pull paralleli.

  • setSystemExecutorProvider() ti consente di personalizzare il provider dell'executor usato per la gestione dei leasing. In genere, non configuri questo valore, a meno che non voglia utilizzare lo stesso provider di esecuzioni in setExecutorProvider e setSystemExecutorProvider. Ad esempio, puoi utilizzare lo stesso fornitore di agenti se hai un numero di abbonamenti con un basso throughput. L'utilizzo dello stesso valore limita il numero di thread nel client.

  • setChannelProvider() ti consente di personalizzare il provider di canali utilizzato per aprire le connessioni a Pub/Sub. In genere, non configuri questo valore a meno che tu non voglia utilizzare lo stesso canale su più client di abbonati. Il riutilizzo di un canale su troppi clienti potrebbe comportare errori GOAWAY o ENHANCE_YOUR_CALM. Se vedi questi errori nei log della tua applicazione o in Cloud Logs, crea altri canali.

Esempi di codice per controllo della contemporaneità

C++

Prima di provare questo esempio, segui le istruzioni di configurazione C++ riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

Vai

Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
	// concurrency pulling of messages. Otherwise, NumGoroutines will be set to 1.
	sub.ReceiveSettings.Synchronous = false
	// NumGoroutines determines the number of goroutines sub.Receive will spawn to pull
	// messages.
	sub.ReceiveSettings.NumGoroutines = 16
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	// Note, even in synchronous mode, messages pulled in a batch can still be handled
	// concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Ruby Pub/Sub.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Passaggi successivi

Scopri le altre opzioni di pubblicazione che puoi configurare per un abbonamento: