このガイドでは、C++ クライアント ライブラリで使用されるスレッドモデルについて説明し、アプリケーション内のデフォルトのスレッドプールをオーバーライドする方法を説明します。
目標
- C++ クライアント ライブラリのデフォルトのスレッドモデルについて説明します。
- 必要に応じてアプリケーションのデフォルトをオーバーライドする方法を説明します。
クライアント ライブラリがバックグラウンド スレッドを使用する理由
クライアント ライブラリのほとんどの関数は、関数を呼び出すスレッドを使用して、サービスに対する RPC や認証アクセス トークンの更新などのすべての作業を行います。
非同期関数は、本質的に現在のスレッドを使用して処理を完了できません。一部の別のスレッドは、作業が完了してレスポンスを処理するのを待つ必要があります。
また、長時間実行されるオペレーションでは呼び出し元のスレッドをブロックするのも無駄です。この場合、サービスがオペレーションを完了するまでに数分かかる場合があります。このようなオペレーションでは、クライアント ライブラリがバックグラウンド スレッドを使用して、長時間実行オペレーションの状態を定期的にポーリングします。
バックグラウンド スレッドを必要とする関数とライブラリは何ですか?
一部の T
型の future<T>
を返す関数は、バックグラウンド スレッドを使用して作業が完了するまで待機します。
すべてのクライアント ライブラリに非同期関数や長時間実行オペレーションがあるわけではありません。これらを必要としないライブラリは、バックグラウンド スレッドを作成しません。
アプリケーション内で追加のスレッドが見つかることもありますが、これらは、gRPC など C++ クライアント ライブラリの依存関係によって作成されることがあります。通常、これらのスレッドはそれほど興味深いものではありません。アプリケーション コードはこれらのスレッドで実行されることはなく、補助的な関数のみを提供しているためです。
これらのバックグラウンド スレッドはアプリケーションにどのように影響しますか?
通常、これらのスレッドは CPU リソースやメモリリソースをアプリケーションの残りの部分と競合させます。必要に応じて、独自のスレッドプールを作成して、これらのスレッドが使用するリソースを細かく制御できます。詳しくは、以下をご覧ください。
これらのいずれかのスレッドでコードが実行されることはありますか?
はい。コールバックを future<T>
にアタッチすると、コールバックはほとんどの場合、バックグラウンド スレッドのいずれかによって実行されます。これは、コールバックをアタッチするまでにすでに future<T>
が満たされている場合にのみ発生します。その場合、コールバックは、コールバックを接続するスレッドのコンテキストですぐに実行されます。
たとえば、Pub/Sub クライアント ライブラリを使用するアプリケーションを考えてみましょう。
Publish()
呼び出しは Future を返します。アプリケーションはいくつかの作業を実行した後、コールバックをアタッチできます。
namespace pubsub = ::google::cloud::pubsub;
namespace g = google::cloud;
void Callback(g::future<g::StatusOr<std::string>>);
void F(pubsub::Publisher publisher) {
auto my_future = publisher.Publish(
pubsub::MessageBuilder("Hello World!").Build());
// do some work.
my_future.then(Callback);
}
.then()
関数が呼び出される前に my_future
が満たされている場合、コールバックはすぐに呼び出されます。コードを別のスレッドで確実に実行するようにするには、独自のスレッドプールを使用し、.then()
に呼び出し可能を指定して、実行をスレッドプールに転送します。
デフォルトのスレッドプール
バックグラウンド スレッドを必要とするライブラリの場合、Make*Connection()
によってデフォルトのスレッドプールが作成されます。スレッドプールをオーバーライドしない限り、各 *Connection
オブジェクトには個別のスレッドプールがあります。
ほとんどのライブラリのデフォルトのスレッドプールには、単一のスレッドが含まれています。長時間実行オペレーションの状態をポーリングするためにバックグラウンド スレッドが使用されるため、より多くのスレッドが必要になることはほとんどありません。これらの呼び出しは適度に存続時間が短く、CPU をほとんど消費しません。そのため、1 つのバックグラウンド スレッドで保留の数百の長時間実行オペレーションを処理できます。それほど多くのオペレーションを持つアプリケーションはあまりありません。
他の非同期オペレーションでは、より多くのリソースが必要になる場合があります。
必要に応じて、GrpcBackgroundThreadPoolSizeOption
を使用してデフォルトのバックグラウンド スレッドプールのサイズを変更します。
Pub/Sub ライブラリでは、通常 1 秒あたり数千件のメッセージを受信または送信するため、この作業を大幅に増やす必要があります。その結果、このライブラリでは、デフォルトで 64 ビット アーキテクチャのコアごとに 1 つのスレッドが作成されます。32 ビット アーキテクチャでは(または 64 ビット アーキテクチャで実行している場合でも、32 ビットモードでコンパイルされた場合)、このデフォルトは 4 スレッドのみに変更されます。
独自のスレッドプールを指定する
バックグラウンド スレッドに独自のスレッドプールを指定できます。CompletionQueue
オブジェクトを作成し、それにスレッドをアタッチして、クライアントの初期化時に GrpcCompletionQueueOption
を構成します。次に例を示します。
namespace admin = ::google::cloud::spanner_admin;
namespace g = ::google::cloud;
void F() {
// You will need to create threads
auto cq = g::CompletionQueue();
std::vector<std::jthread> threads;
for (int i = 0; i != 10; ++i) {
threads.emplace_back([](auto cq) { cq.Run(); }, cq);
}
auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
// Use `client` as usual
}
異なるサービス間でも、同じ CompletionQueue
オブジェクトを複数のクライアントで共有できます。
namespace admin = ::google::cloud::spanner_admin;
namespace admin = ::google::cloud::pubsub;
namespace g = ::google::cloud;
void F(pubsub::Topic const& topic1, pubsub::Topic const& topic2) {
// You will need to create threads
auto cq = g::CompletionQueue();
std::vector<std::jthread> threads;
for (int i = 0; i != 10; ++i) {
threads.emplace_back([](auto cq) { cq.Run(); }, cq);
}
auto client = admin::InstanceAdminClient(admin::MakeInstanceAdminConnection(
g::Options{}.set<g::GrpcCompletionQueue>(cq)));
auto p1 = pubsub::Publisher(pubsub::MakePublisherConnection(
topic1, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
auto p2 = pubsub::Publisher(pubsub::MakePublisherConnection(
topic2, g::Options{}.set<g::GrpcCompletionQueueOption>(cq)));
// Use `client`, `p1`, and `p2` as usual
}
次のステップ
- 一般的なライブラリ構成オプションの詳細については、クライアント ライブラリの構成をご覧ください。
- Pub/Sub クライアント ライブラリの完全なリファレンス ドキュメントについては、Cloud Pub/Sub C++ クライアント ライブラリをご覧ください。