C++ 客户端库中的后台线程

本指南介绍了 C++ 客户端库使用的线程模型,并介绍了如何替换应用中的默认线程池。

目标

  • 描述 C++ 客户端库的默认线程模型。
  • 说明如何针对需要替换这些默认值的应用替换这些默认值。

为什么客户端库使用后台线程?

客户端库中的大多数函数都使用调用该函数的线程来完成所有工作,包括服务的所有 RPC 和/或刷新访问令牌以进行身份验证。

从本质上讲,异步函数无法使用当前线程来完成其工作。某个单独的线程必须等待工作完成并处理响应。

在长时间运行的操作中阻塞发起调用的线程也是一种浪费,因为服务可能需要几分钟或更长时间才能完成工作。对于此类操作,客户端库使用后台线程定期轮询长时间运行的操作的状态。

哪些函数和库需要后台线程?

针对某种类型 T 返回 future<T> 的函数使用后台线程等待工作完成。

并非所有客户端库都具有异步函数或长时间运行的操作。不需要这些线程的库不会创建任何后台线程。

您可能会注意到应用中存在其他线程,但这些线程可能是由 C++ 客户端库的依赖项(例如 gRPC)创建的。这些线程通常不太有趣,因为它们不会在这些线程中运行应用代码,并且它们仅提供辅助功能。

这些后台线程对应用有何影响?

这些线程照常与应用的其余部分竞争 CPU 和内存资源。如果需要,您可以创建自己的线程池,以精确控制这些线程使用的任何资源。有关详细信息,请参见下文。

我的代码是否会在这些线程中运行?

可以。当您将回调附加到 future<T> 时,回调几乎总是由其中一个后台线程执行。不会发生这种情况的唯一情况是,在您附加回调时已满足 future<T>。在这种情况下,回调会在附加回调的线程的上下文中立即运行。

例如,假设一个应用使用 Pub/Sub 客户端库。 Publish() 调用会返回 future,并且应用可以执行一些工作后附加回调:

namespace pubsub = ::google::cloud::pubsub;
namespace g = google::cloud;

void Callback(g::future<g::StatusOr<std::string>>);

void F(pubsub::Publisher publisher) {
  auto my_future = publisher.Publish(
      pubsub::MessageBuilder("Hello World!").Build());
  // do some work.
  my_future.then(Callback);
}

如果在调用 .then() 函数之前满足 my_future,则立即调用回调函数。如果要保证代码在单独的线程中运行,您需要使用自己的线程池,并在 .then() 中提供一个 Callable 函数,以将执行操作转发到线程池。

默认线程池

对于需要后台线程的库,Make*Connection() 会创建一个默认线程池。除非您替换线程池,否则每个 *Connection 对象都有单独的线程池。

大多数库中的默认线程池都包含一个线程。很少需要更多线程,因为使用后台线程轮询长时间运行的操作的状态。这些调用在一定程度上存在时间短暂且占用的 CPU 资源非常少,因此单个后台线程可以处理数百个待处理的长时间运行的操作,很少有应用有这么多操作。

其他异步操作可能需要更多资源。 如果需要,可以使用 GrpcBackgroundThreadPoolSizeOption 更改默认的后台线程池大小。

Pub/Sub 库预计会执行更多工作,因为 Pub/Sub 应用每秒接收或发送数千条消息很常见。因此,在 64 位架构上,该库默认为每个核心一个线程。在 32 位架构上(或在以 32 位模式下编译时,即使在 64 位架构上运行时),此默认值会更改为仅 4 个线程。

提供您自己的线程池

您可以为后台线程提供自己的线程池。创建 CompletionQueue 对象,向其附加线程,并在初始化客户端时配置 GrpcCompletionQueueOption。例如:

namespace admin = ::google::cloud::spanner_admin;
namespace g = ::google::cloud;

void F() {
  // You will need to create threads
  auto cq = g::CompletionQueue();
  std::vector<std::jthread> threads;
  for (int i = 0; i != 10; ++i) {
    threads.emplace_back([](auto cq) { cq.Run(); }, cq);
  }
  auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
      g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
  // Use `client` as usual
}

您可以在多个客户端之间共享同一个 CompletionQueue 对象,即使对于不同的服务也是如此:

namespace admin = ::google::cloud::spanner_admin;
namespace admin = ::google::cloud::pubsub;
namespace g = ::google::cloud;

void F(pubsub::Topic const& topic1, pubsub::Topic const& topic2) {
  // You will need to create threads
  auto cq = g::CompletionQueue();
  std::vector<std::jthread> threads;
  for (int i = 0; i != 10; ++i) {
    threads.emplace_back([](auto cq) { cq.Run(); }, cq);
  }
  auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
      g::Options{}.set<g::GrpcCompletionQueue>(cq)));
  auto p1 = pubsub::Publisher(pubsub::MakePublisherConnection(
      topic1, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
  auto p2 = pubsub::Publisher(pubsub::MakePublisherConnection(
      topic2, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
  // Use `client`, `p1`, and `p2` as usual
}

后续步骤