Gleichzeitigkeitserkennung

In diesem Dokument erfahren Sie mehr über die Verwendung der Parallelitätssteuerung bei Nachrichten, die für ein Thema veröffentlicht werden.

Mit der Parallelitätssteuerung können Sie die Standardanzahl der Hintergrund-I/O-Threads überschreiben, die von der Clientbibliothek zum Veröffentlichen von Nachrichten verwendet werden. So können die Publisher-Clients Nachrichten parallel senden.

Die Parallelitätssteuerung ist eine Funktion der Pub/Sub-Clientbibliothek. Sie können auch Ihre eigene Parallelitätssteuerung implementieren, wenn Sie eine Low-Level-Bibliothek verwenden.

Die Unterstützung für die Parallelitätssteuerung hängt von der Programmiersprache der Clientbibliothek ab. Bei Sprachimplementierungen, die parallele Threads wie C++, Go und Java unterstützen, legen die Clientbibliotheken die Anzahl der Threads jeweils anhand ihrer Standardeinstellung fest.

Auf dieser Seite wird das Konzept der Parallelitätssteuerung erläutert und beschrieben, wie du die Funktion für deine Publisher-Kunden einrichtest. Informationen zum Konfigurieren Ihrer Abonnentenclients für die Gleichzeitigkeitssteuerung finden Sie unter Mehr Nachrichten mit Gleichzeitigkeitssteuerung verarbeiten.

Hinweise

Bevor du den Veröffentlichungsworkflow konfigurierst, musst du die folgenden Aufgaben erledigt haben:

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Pub/Sub-Publisher (roles/pubsub.publisher) für das Thema zuzuweisen, damit Sie die Berechtigungen erhalten, die Sie zum Veröffentlichen von Nachrichten in einem Thema benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Sie benötigen zusätzliche Berechtigungen, um Themen und Abos zu erstellen oder zu aktualisieren.

Konfigurationen für die Parallelitätssteuerung

Die Standardwerte für die Variablen zur Parallelitätssteuerung und die Namen der Variablen können sich je nach Clientbibliothek unterscheiden. In der Java-Clientbibliothek sind das beispielsweise die Methoden setExecutorProvider() und setChannelProvider(). Weitere Informationen finden Sie in der API-Referenzdokumentation.

  • Mit setExecutorProvider() können Sie den Executor-Anbieter anpassen, der für die Verarbeitung von Veröffentlichungsantworten verwendet wird. Du kannst beispielsweise den Executor-Anbieter in einen ändern, der einen einzelnen, freigegebenen Executor mit einer begrenzten Anzahl von Threads für mehrere Publisher-Clients zurückgibt. Mit dieser Konfiguration lässt sich die Anzahl der erstellten Threads begrenzen.

  • Mit setChannelProvider() können Sie den Channelanbieter anpassen, der zum Öffnen von Verbindungen zu Pub/Sub verwendet wird. Normalerweise konfigurieren Sie diesen Wert nur, wenn Sie denselben Channel für mehrere Publisher-Clients verwenden möchten. Wenn ein Kanal für zu viele Kunden wiederverwendet wird, kann das zu GOAWAY- oder ENHANCE_YOUR_CALM-Fehlern führen. Wenn Sie diese Fehler in den Logs Ihrer Anwendung oder in Cloud Logs sehen, erstellen Sie weitere Kanäle.

Codebeispiele für die Parallelitätssteuerung

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.PublishSettings.NumGoroutines = 1

	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Nächste Schritte