동시 실행 제어를 사용하여 추가 메시지 처리

동시 실행 제어는 Pub/Sub 상위 수준 클라이언트 라이브러리에서 사용할 수 있는 기능입니다. 하위 수준 라이브러리를 사용할 때 고유한 동시 실행 제어를 구현할 수도 있습니다.

동시 실행 제어 지원은 클라이언트 라이브러리의 프로그래밍 언어에 따라 달라집니다. C++, Go, Java와 같은 동시 스레드를 지원하는 언어 구현의 경우 클라이언트 라이브러리에서 스레드 수를 기본으로 선택합니다.

이는 애플리케이션에 가장 적합한 선택이 아닐 수도 있습니다. 예를 들어 구독자 애플리케이션에서 받은 메시지 볼륨을 따라잡지 못하고 CPU의 제한을 받지 않는 경우에는 스레드 수를 늘려야 합니다. CPU 집약적인 메시지 처리 작업이라면 스레드 수를 줄여야 할 수도 있습니다.

동시 실행 제어 구성

동시 실행 제어 변수의 기본값과 변수 이름은 클라이언트 라이브러리마다 다를 수 있습니다. 예를 들어 Java 클라이언트 라이브러리에서 동시 실행 제어를 구성하는 메서드는 setSystemExecutorProvider(), setExecutorProvider(), setParallelPullCount()입니다.

  • setParallelPullCount()를 사용하면 열 스트림 수를 결정할 수 있습니다. 구독자 클라이언트에서 단일 스트림에 전송되는 데이터(10MBps)보다 더 많은 데이터를 처리할 수 있으면 더 많은 스트림을 열 수 있습니다.

  • setExecutorProvider()를 사용하면 메시지 처리에 사용되는 실행자 제공업체를 맞춤설정할 수 있습니다. 예를 들어 실행자 제공업체를 여러 구독자 클라이언트에서 스레드 수가 제한된 단일 공유 실행자를 반환하는 실행자 제공업체로 변경할 수 있습니다. 이 구성은 생성되는 스레드 수를 제한하는 데 도움이 됩니다.

  • setSystemExecutorProvider()를 사용하면 임대 기간 관리에 사용되는 실행자 제공업체를 맞춤설정할 수 있습니다. 일반적으로 setExecutorProvidersetSystemExecutorProvider에 같은 실행자 제공업체를 사용하지 않는 한 이 값을 구성하지 않습니다. 예를 들어 처리량이 낮은 구독이 많은 경우 동일한 실행자 제공업체를 사용할 수 있습니다. 같은 값을 사용하면 클라이언트의 스레드 수가 제한됩니다.

동시 실행 제어에 사용되는 총 스레드 수는 클라이언트 라이브러리에 전달된 실행자 제공업체와 동시 가져오기 수에 따라 다릅니다.

동시 실행 제어용 코드 샘플

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
	// concurrency pulling of messages. Otherwise, NumGoroutines will be set to 1.
	sub.ReceiveSettings.Synchronous = false
	// NumGoroutines determines the number of goroutines sub.Receive will spawn to pull
	// messages.
	sub.ReceiveSettings.NumGoroutines = 16
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	// Note, even in synchronous mode, messages pulled in a batch can still be handled
	// concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 참조하세요.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

다음 단계

구독에 구성할 수 있는 다른 전송 옵션에 대해 알아보기