Contrôle de simultanéité

Ce document fournit des informations sur l'utilisation du contrôle de la simultanéité avec les messages publiés sur un sujet.

Le contrôle de la concurrence vous permet de remplacer le nombre par défaut de threads en arrière-plan (E/S) utilisés par la bibliothèque cliente pour publier des messages. Cela permet aux clients éditeurs d'envoyer des messages en parallèle.

Le contrôle de la concurrence est une fonctionnalité disponible dans la bibliothèque cliente de haut niveau Pub/Sub. Vous pouvez également implémenter votre propre contrôle de la simultanéité lorsque vous utilisez une bibliothèque de bas niveau.

La prise en charge du contrôle de la simultanéité dépend du langage de programmation de la bibliothèque cliente. Pour les implémentations de langage prenant en charge les threads parallèles, tels que C++, Go et Java, les bibliothèques clientes font un choix par défaut pour le nombre de threads.

Cette page explique le concept de contrôle de la simultanéité et comment configurer cette fonctionnalité pour vos clients éditeurs. Pour configurer vos clients abonnés pour le contrôle de la simultanéité, consultez la section Traiter plus de messages avec le contrôle de la simultanéité.

Avant de commencer

Avant de configurer le workflow de publication, assurez-vous d'avoir effectué les tâches suivantes:

Rôles requis

Pour obtenir les autorisations nécessaires pour publier des messages dans un sujet, demandez à votre administrateur de vous accorder le rôle IAM Éditeur Pub/Sub (roles/pubsub.publisher) sur le sujet. Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.

Vous avez besoin d'autorisations supplémentaires pour créer ou modifier des sujets et des abonnements.

Configurations de contrôle de simultanéité

Les valeurs par défaut des variables de contrôle de la concurrence et les noms des variables peuvent varier d'une bibliothèque cliente à l'autre. Par exemple, dans la bibliothèque cliente Java, les méthodes permettant de configurer le contrôle de la concurrence sont setExecutorProvider() et setChannelProvider(). Pour en savoir plus, consultez la documentation de référence sur les API.

  • setExecutorProvider() vous permet de personnaliser le fournisseur d'exécuteur utilisé pour traiter les réponses de publication. Par exemple, vous pouvez remplacer le fournisseur d'exécuteur par un fournisseur qui renvoie un seul exécuteur partagé avec un nombre limité de threads sur plusieurs clients d'éditeurs. Cette configuration permet de limiter le nombre de threads créés.

  • setChannelProvider() vous permet de personnaliser le fournisseur de canaux utilisé pour ouvrir des connexions à Pub/Sub. En règle générale, vous ne configurez pas cette valeur, sauf si vous souhaitez utiliser le même canal pour plusieurs clients éditeurs. Si vous réutilisez un canal pour trop de clients, des erreurs GOAWAY ou ENHANCE_YOUR_CALM peuvent se produire. Si ces erreurs s'affichent dans les journaux de votre application ou dans Cloud Logs, créez d'autres canaux.

Exemples de code pour le contrôle de la concurrence

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.PublishSettings.NumGoroutines = 1

	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

Étape suivante