src/ex/thread_pool.cpp

100.0% Lines (128/128) 100.0% List of functions (25/25)
thread_pool.cpp
f(x) Functions (25)
Function Calls Lines Blocks
boost::capy::thread_pool::impl::push(boost::capy::continuation*) :59 830x 100.0% 100.0% boost::capy::thread_pool::impl::pop() :69 987x 100.0% 100.0% boost::capy::thread_pool::impl::empty() const :80 1026x 100.0% 100.0% boost::capy::thread_pool::impl::~impl() :97 157x 100.0% 100.0% boost::capy::thread_pool::impl::drain_abandoned() :105 157x 100.0% 100.0% boost::capy::thread_pool::impl::impl(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :115 157x 100.0% 72.0% boost::capy::thread_pool::impl::post(boost::capy::continuation&) :128 830x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_started() :139 345x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_finished() :145 345x 100.0% 100.0% boost::capy::thread_pool::impl::join() :159 168x 100.0% 85.0% boost::capy::thread_pool::impl::join()::{lambda()#1}::operator()() const :175 61x 100.0% 100.0% boost::capy::thread_pool::impl::stop() :187 159x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started() :199 830x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const :201 101x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const::{lambda()#1}::operator()() const :204 179x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long) :209 179x 100.0% 84.0% boost::capy::thread_pool::impl::run(unsigned long)::{lambda()#1}::operator()() const :221 1026x 100.0% 100.0% boost::capy::thread_pool::~thread_pool() :237 157x 100.0% 100.0% boost::capy::thread_pool::thread_pool(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :248 157x 100.0% 55.0% boost::capy::thread_pool::join() :256 11x 100.0% 100.0% boost::capy::thread_pool::stop() :263 2x 100.0% 100.0% boost::capy::thread_pool::get_executor() const :272 163x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_started() const :280 345x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_finished() const :287 345x 100.0% 100.0% boost::capy::thread_pool::executor_type::post(boost::capy::continuation&) const :294 830x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/continuation.hpp>
13 #include <boost/capy/ex/frame_allocator.hpp>
14 #include <boost/capy/test/thread_name.hpp>
15 #include <algorithm>
16 #include <atomic>
17 #include <condition_variable>
18 #include <cstdio>
19 #include <mutex>
20 #include <thread>
21 #include <vector>
22
23 /*
24 Thread pool implementation using a shared work queue.
25
26 Work items are continuations linked via their intrusive next pointer,
27 stored in a single queue protected by a mutex. No per-post heap
28 allocation: the continuation is owned by the caller and linked
29 directly. Worker threads wait on a condition_variable until work
30 is available or stop is requested.
31
32 Threads are started lazily on first post() via std::call_once to avoid
33 spawning threads for pools that are constructed but never used. Each
34 thread is named with a configurable prefix plus index for debugger
35 visibility.
36
37 Work tracking: on_work_started/on_work_finished maintain an atomic
38 outstanding_work_ counter. join() blocks until this counter reaches
39 zero, then signals workers to stop and joins threads.
40
41 Two shutdown paths:
42 - join(): waits for outstanding work to drain, then stops workers.
43 - stop(): immediately signals workers to exit; queued work is abandoned.
44 - Destructor: stop() then join() (abandon + wait for threads).
45 */
46
47 namespace boost {
48 namespace capy {
49
50 //------------------------------------------------------------------------------
51
52 class thread_pool::impl
53 {
54 // Intrusive queue of continuations via continuation::next.
55 // No per-post allocation: the continuation is owned by the caller.
56 continuation* head_ = nullptr;
57 continuation* tail_ = nullptr;
58
59 830x void push(continuation* c) noexcept
60 {
61 830x c->next = nullptr;
62 830x if(tail_)
63 617x tail_->next = c;
64 else
65 213x head_ = c;
66 830x tail_ = c;
67 830x }
68
69 987x continuation* pop() noexcept
70 {
71 987x if(!head_)
72 157x return nullptr;
73 830x continuation* c = head_;
74 830x head_ = head_->next;
75 830x if(!head_)
76 213x tail_ = nullptr;
77 830x return c;
78 }
79
80 1026x bool empty() const noexcept
81 {
82 1026x return head_ == nullptr;
83 }
84
85 std::mutex mutex_;
86 std::condition_variable work_cv_;
87 std::condition_variable done_cv_;
88 std::vector<std::thread> threads_;
89 std::atomic<std::size_t> outstanding_work_{0};
90 bool stop_{false};
91 bool joined_{false};
92 std::size_t num_threads_;
93 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
94 std::once_flag start_flag_;
95
96 public:
97 157x ~impl() = default;
98
99 // Destroy abandoned coroutine frames. Must be called
100 // before execution_context::shutdown()/destroy() so
101 // that suspended-frame destructors (e.g. delay_awaitable
102 // calling timer_service::cancel()) run while services
103 // are still valid.
104 void
105 157x drain_abandoned() noexcept
106 {
107 356x while(auto* c = pop())
108 {
109 199x auto h = c->h;
110 199x if(h && h != std::noop_coroutine())
111 147x h.destroy();
112 199x }
113 157x }
114
115 157x impl(std::size_t num_threads, std::string_view thread_name_prefix)
116 157x : num_threads_(num_threads)
117 {
118 157x if(num_threads_ == 0)
119 4x num_threads_ = std::max(
120 2x std::thread::hardware_concurrency(), 1u);
121
122 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
123 157x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
124 157x thread_name_prefix_[n] = '\0';
125 157x }
126
127 void
128 830x post(continuation& c)
129 {
130 830x ensure_started();
131 {
132 830x std::lock_guard<std::mutex> lock(mutex_);
133 830x push(&c);
134 830x }
135 830x work_cv_.notify_one();
136 830x }
137
138 void
139 345x on_work_started() noexcept
140 {
141 345x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
142 345x }
143
144 void
145 345x on_work_finished() noexcept
146 {
147 345x if(outstanding_work_.fetch_sub(
148 345x 1, std::memory_order_acq_rel) == 1)
149 {
150 85x std::lock_guard<std::mutex> lock(mutex_);
151 85x if(joined_ && !stop_)
152 4x stop_ = true;
153 85x done_cv_.notify_all();
154 85x work_cv_.notify_all();
155 85x }
156 345x }
157
158 void
159 168x join() noexcept
160 {
161 {
162 168x std::unique_lock<std::mutex> lock(mutex_);
163 168x if(joined_)
164 11x return;
165 157x joined_ = true;
166
167 157x if(outstanding_work_.load(
168 157x std::memory_order_acquire) == 0)
169 {
170 101x stop_ = true;
171 101x work_cv_.notify_all();
172 }
173 else
174 {
175 56x done_cv_.wait(lock, [this]{
176 61x return stop_;
177 });
178 }
179 168x }
180
181 336x for(auto& t : threads_)
182 179x if(t.joinable())
183 179x t.join();
184 }
185
186 void
187 159x stop() noexcept
188 {
189 {
190 159x std::lock_guard<std::mutex> lock(mutex_);
191 159x stop_ = true;
192 159x }
193 159x work_cv_.notify_all();
194 159x done_cv_.notify_all();
195 159x }
196
197 private:
198 void
199 830x ensure_started()
200 {
201 830x std::call_once(start_flag_, [this]{
202 101x threads_.reserve(num_threads_);
203 280x for(std::size_t i = 0; i < num_threads_; ++i)
204 358x threads_.emplace_back([this, i]{ run(i); });
205 101x });
206 830x }
207
208 void
209 179x run(std::size_t index)
210 {
211 // Build name; set_current_thread_name truncates to platform limits.
212 char name[16];
213 179x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
214 179x set_current_thread_name(name);
215
216 for(;;)
217 {
218 810x continuation* c = nullptr;
219 {
220 810x std::unique_lock<std::mutex> lock(mutex_);
221 810x work_cv_.wait(lock, [this]{
222 1329x return !empty() ||
223 1329x stop_;
224 });
225 810x if(stop_)
226 358x return;
227 631x c = pop();
228 810x }
229 631x if(c)
230 631x safe_resume(c->h);
231 631x }
232 }
233 };
234
235 //------------------------------------------------------------------------------
236
237 157x thread_pool::
238 ~thread_pool()
239 {
240 157x impl_->stop();
241 157x impl_->join();
242 157x impl_->drain_abandoned();
243 157x shutdown();
244 157x destroy();
245 157x delete impl_;
246 157x }
247
248 157x thread_pool::
249 157x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
250 157x : impl_(new impl(num_threads, thread_name_prefix))
251 {
252 157x this->set_frame_allocator(std::allocator<void>{});
253 157x }
254
255 void
256 11x thread_pool::
257 join() noexcept
258 {
259 11x impl_->join();
260 11x }
261
262 void
263 2x thread_pool::
264 stop() noexcept
265 {
266 2x impl_->stop();
267 2x }
268
269 //------------------------------------------------------------------------------
270
271 thread_pool::executor_type
272 163x thread_pool::
273 get_executor() const noexcept
274 {
275 163x return executor_type(
276 163x const_cast<thread_pool&>(*this));
277 }
278
279 void
280 345x thread_pool::executor_type::
281 on_work_started() const noexcept
282 {
283 345x pool_->impl_->on_work_started();
284 345x }
285
286 void
287 345x thread_pool::executor_type::
288 on_work_finished() const noexcept
289 {
290 345x pool_->impl_->on_work_finished();
291 345x }
292
293 void
294 830x thread_pool::executor_type::
295 post(continuation& c) const
296 {
297 830x pool_->impl_->post(c);
298 830x }
299
300 } // capy
301 } // boost
302