同時実行制御を使用してより多くのメッセージを処理する

同時実行制御は、Pub/Sub の高レベル クライアント ライブラリで使用できる機能です。低レベル ライブラリを使用している場合は、独自の同時実行制御を実装することもできます。

同時実行制御のサポートは、クライアント ライブラリのプログラミング言語によって異なります。C++、Go、Java などの並列スレッドをサポートする言語を実装する場合、クライアント ライブラリはデフォルトのスレッド数を選択します。

しかし、この値がアプリケーションに最適ではない場合もあります。たとえば、サブスクライバー アプリケーションがメッセージの受信量に対応できず、CPU にバインドされていない場合は、スレッド数を増やす必要があります。CPU 使用率の高いメッセージ処理のオペレーションでは、スレッド数を減らすと、問題が解決する場合があります。

このページでは、同時実行制御のコンセプトと、サブスクライバー クライアントにこの機能を設定する方法について説明します。同時実行制御用にパブリッシャー クライアントを構成するには、同時実行制御をご覧ください。

同時実行制御の構成

同時実行制御変数のデフォルト値と変数の名前は、クライアント ライブラリによって異なる場合があります。詳細については、API リファレンス ドキュメントをご覧ください。たとえば、Java クライアント ライブラリでは、同時実行制御を構成するメソッドは setParallelPullCount()setExecutorProvider()setSystemExecutorProvider()setChannelProvider() です。

  • setParallelPullCount() を使用すると、開くストリームの数を決定できます。サブスクライバー クライアントが 1 つのストリームで送信されるデータ(10 MBps)よりも多くのデータを処理できる場合は、より多くのストリームを開くことができます。

  • setExecutorProvider() を使用すると、メッセージの処理に使用するエグゼキュータ プロバイダをカスタマイズできます。たとえば、エグゼキュータ プロバイダを、複数のサブスクライバー クライアントにわたってスレッド数が制限された単一の共有エグゼキュータを返すプロバイダに変更できます。この構成は、作成されるスレッドの数を制限することに役立ちます。同時実行制御に使用されるスレッドの合計数は、クライアント ライブラリで渡されるエグゼキュータ プロバイダと並列 pull 数によって変わります。

  • setSystemExecutorProvider() を使用すると、リース管理に使用するエグゼキュータ プロバイダをカスタマイズできます。通常、setExecutorProvidersetSystemExecutorProvider で同じエグゼキュータ プロバイダを使用する場合を除き、この値は構成しません。たとえば、スループットの低いサブスクリプションが多数ある場合は、同じエグゼキュータ プロバイダを使用できます。同じ値を使用すると、クライアント内のスレッド数が制限されます。

  • setChannelProvider() を使用すると、Pub/Sub への接続の開始に使用するチャネル プロバイダをカスタマイズできます。通常、複数のサブスクライバー クライアントで同じチャネルを使用する場合を除き、この値は構成しません。チャネルを多くのクライアントで再利用すると、GOAWAY エラーまたは ENHANCE_YOUR_CALM エラーが発生する可能性があります。アプリケーションのログまたは Cloud Logging にこれらのエラーが表示された場合は、チャンネルを追加します。

同時実行制御のコードサンプル

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
	// concurrency pulling of messages. Otherwise, NumGoroutines will be set to 1.
	sub.ReceiveSettings.Synchronous = false
	// NumGoroutines determines the number of goroutines sub.Receive will spawn to pull
	// messages.
	sub.ReceiveSettings.NumGoroutines = 16
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	// Note, even in synchronous mode, messages pulled in a batch can still be handled
	// concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API リファレンス ドキュメントをご覧ください。

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

次のステップ

サブスクリプションに構成できる他の配信オプションについて確認します。