Mit Gleichzeitigkeitssteuerung mehr Nachrichten verarbeiten

Die Parallelitätssteuerung ist eine Funktion der Pub/Sub-Clientbibliothek. Sie können auch Ihre eigene Parallelitätssteuerung implementieren, wenn Sie eine Low-Level-Bibliothek verwenden.

Die Unterstützung für die Gleichzeitigkeitssteuerung hängt von der Programmiersprache der Clientbibliothek ab. Bei Sprachimplementierungen, die parallele Threads wie C++, Go und Java unterstützen, legen die Clientbibliotheken die Anzahl der Threads jeweils anhand ihrer Standardeinstellung fest.

Diese Einstellung ist für Ihre Anwendung möglicherweise nicht optimal. Wenn Ihre Abonnentenanwendung beispielsweise nicht mit dem eingehenden Nachrichtenvolumen mithält und nicht CPU-gebunden ist, müssen Sie die Anzahl der Threads erhöhen. Bei CPU-intensiver Nachrichtenverarbeitung kann es sinnvoll sein, die Anzahl der Threads zu verringern.

Auf dieser Seite wird das Konzept der Parallelitätssteuerung erläutert und beschrieben, wie du die Funktion für deine Abonnenten-Clients einrichtest. Informationen zum Konfigurieren deiner Publisher-Clients für die Gleichzeitigkeitssteuerung findest du unter Gleichzeitigkeitssteuerung.

Konfigurationen für die Parallelitätssteuerung

Die Standardwerte für die Variablen zur Parallelitätssteuerung und die Namen der Variablen können sich je nach Clientbibliothek unterscheiden. Weitere Informationen finden Sie in der API-Referenzdokumentation. In der Java-Clientbibliothek sind das beispielsweise die Methoden setParallelPullCount(), setExecutorProvider(), setSystemExecutorProvider() und setChannelProvider().

  • Mit setParallelPullCount() kannst du festlegen, wie viele Streams geöffnet werden sollen. Du kannst weitere Streams öffnen, wenn dein Abonnentenclient mehr Daten verarbeiten kann als über einen einzelnen Stream gesendet werden, also 10 MB/s.

  • Mit setExecutorProvider() können Sie den Executor-Anbieter anpassen, der für die Verarbeitung von Nachrichten verwendet wird. Du kannst den Executor-Anbieter beispielsweise in einen ändern, der einen einzelnen, freigegebenen Executor mit einer begrenzten Anzahl von Threads für mehrere Abonnenten-Clients zurückgibt. Mit dieser Konfiguration lässt sich die Anzahl der erstellten Threads begrenzen. Die Gesamtzahl der Threads, die für die Parallelität verwendet werden, hängt vom in der Clientbibliothek übergebenen Executor-Anbieter und der Anzahl der parallelen Pulls ab.

  • Mit setSystemExecutorProvider() können Sie den Executor-Anbieter anpassen, der für die Freigabeverwaltung verwendet wird. Normalerweise konfigurieren Sie diesen Wert nur, wenn Sie in setExecutorProvider und setSystemExecutorProvider denselben Executor-Anbieter verwenden möchten. Sie können beispielsweise denselben Ausführenden verwenden, wenn Sie mehrere Abos mit geringem Durchsatz haben. Wenn Sie denselben Wert verwenden, wird die Anzahl der Threads im Client begrenzt.

  • Mit setChannelProvider() können Sie den Channelanbieter anpassen, der zum Öffnen von Verbindungen zu Pub/Sub verwendet wird. Normalerweise konfigurierst du diesen Wert nur, wenn du denselben Kanal für mehrere Abonnenten-Clients verwenden möchtest. Wenn ein Kanal für zu viele Kunden wiederverwendet wird, kann das zu GOAWAY- oder ENHANCE_YOUR_CALM-Fehlern führen. Wenn Sie diese Fehler in den Logs Ihrer Anwendung oder in Cloud Logs sehen, erstellen Sie weitere Kanäle.

Codebeispiele für die Parallelitätssteuerung

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
	// concurrency pulling of messages. Otherwise, NumGoroutines will be set to 1.
	sub.ReceiveSettings.Synchronous = false
	// NumGoroutines determines the number of goroutines sub.Receive will spawn to pull
	// messages.
	sub.ReceiveSettings.NumGoroutines = 16
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	// Note, even in synchronous mode, messages pulled in a batch can still be handled
	// concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Nächste Schritte

Weitere Auslieferungsoptionen, die Sie für ein Abo konfigurieren können: