同時実行制御

このドキュメントでは、トピックにパブリッシュされたメッセージで同時実行制御を使用する方法について説明します。

同時実行制御を使用すると、クライアント ライブラリのパブリッシュ メッセージで使用されるデフォルトのバックグラウンド(I/O)スレッド数をオーバーライドできます。これにより、パブリッシャー クライアントはメッセージを並行して送信できます。

同時実行制御は、Pub/Sub の高レベル クライアント ライブラリで使用できる機能です。低レベル ライブラリを使用している場合は、独自の同時実行制御を実装することもできます。

同時実行制御のサポートは、クライアント ライブラリのプログラミング言語によって異なります。C++、Go、Java などの並列スレッドをサポートする言語を実装する場合、クライアント ライブラリはデフォルトのスレッド数を選択します。

このページでは、同時実行制御のコンセプトと、パブリッシャー クライアント向けにこの機能を設定する方法について説明します。同時実行制御用にサブスクライバー クライアントを構成するには、同時実行制御でより多くのメッセージを処理するをご覧ください。

始める前に

パブリッシュ ワークフローを構成する前に、次のタスクを完了していることを確認してください。

必要なロール

トピックにメッセージをパブリッシュするために必要な権限を取得するには、トピックに対する Pub/Sub パブリッシャーroles/pubsub.publisher)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

トピックとサブスクリプションを作成または更新するには、追加の権限が必要です。

同時実行制御の構成

同時実行制御変数のデフォルト値と変数の名前は、クライアント ライブラリによって異なる場合があります。たとえば、Java クライアント ライブラリでは、同時実行制御を構成するメソッドは setExecutorProvider()setChannelProvider() です。詳細については、API リファレンス ドキュメントをご覧ください。

  • setExecutorProvider() を使用すると、パブリッシュ レスポンスを処理するために使用されるエグゼキュータ プロバイダをカスタマイズできます。たとえば、エグゼキュータ プロバイダを、複数のパブリッシャー クライアントにわたってスレッド数が制限された単一の共有エグゼキュータを返すプロバイダに変更できます。この構成は、作成されるスレッドの数を制限することに役立ちます。

  • setChannelProvider() を使用すると、Pub/Sub への接続の開始に使用するチャネル プロバイダをカスタマイズできます。通常、複数のパブリッシャー クライアントで同じチャネルを使用する場合を除き、この値は構成しません。チャネルを多くのクライアントで再利用すると、GOAWAY エラーまたは ENHANCE_YOUR_CALM エラーが発生する可能性があります。アプリケーションのログまたは Cloud Logging にこれらのエラーが表示された場合は、チャンネルを追加します。

同時実行制御のコードサンプル

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.PublishSettings.NumGoroutines = 1

	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API リファレンス ドキュメントをご覧ください。

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

次のステップ