TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #include <boost/capy/ex/thread_pool.hpp>
12 : #include <boost/capy/continuation.hpp>
13 : #include <boost/capy/ex/frame_allocator.hpp>
14 : #include <boost/capy/test/thread_name.hpp>
15 : #include <algorithm>
16 : #include <atomic>
17 : #include <condition_variable>
18 : #include <cstdio>
19 : #include <mutex>
20 : #include <thread>
21 : #include <vector>
22 :
23 : /*
24 : Thread pool implementation using a shared work queue.
25 :
26 : Work items are continuations linked via their intrusive next pointer,
27 : stored in a single queue protected by a mutex. No per-post heap
28 : allocation: the continuation is owned by the caller and linked
29 : directly. Worker threads wait on a condition_variable until work
30 : is available or stop is requested.
31 :
32 : Threads are started lazily on first post() via std::call_once to avoid
33 : spawning threads for pools that are constructed but never used. Each
34 : thread is named with a configurable prefix plus index for debugger
35 : visibility.
36 :
37 : Work tracking: on_work_started/on_work_finished maintain an atomic
38 : outstanding_work_ counter. join() blocks until this counter reaches
39 : zero, then signals workers to stop and joins threads.
40 :
41 : Two shutdown paths:
42 : - join(): waits for outstanding work to drain, then stops workers.
43 : - stop(): immediately signals workers to exit; queued work is abandoned.
44 : - Destructor: stop() then join() (abandon + wait for threads).
45 : */
46 :
47 : namespace boost {
48 : namespace capy {
49 :
50 : //------------------------------------------------------------------------------
51 :
52 : class thread_pool::impl
53 : {
54 : // Intrusive queue of continuations via continuation::next.
55 : // No per-post allocation: the continuation is owned by the caller.
56 : continuation* head_ = nullptr;
57 : continuation* tail_ = nullptr;
58 :
59 HIT 830 : void push(continuation* c) noexcept
60 : {
61 830 : c->next = nullptr;
62 830 : if(tail_)
63 617 : tail_->next = c;
64 : else
65 213 : head_ = c;
66 830 : tail_ = c;
67 830 : }
68 :
69 987 : continuation* pop() noexcept
70 : {
71 987 : if(!head_)
72 157 : return nullptr;
73 830 : continuation* c = head_;
74 830 : head_ = head_->next;
75 830 : if(!head_)
76 213 : tail_ = nullptr;
77 830 : return c;
78 : }
79 :
80 1026 : bool empty() const noexcept
81 : {
82 1026 : return head_ == nullptr;
83 : }
84 :
85 : std::mutex mutex_;
86 : std::condition_variable work_cv_;
87 : std::condition_variable done_cv_;
88 : std::vector<std::thread> threads_;
89 : std::atomic<std::size_t> outstanding_work_{0};
90 : bool stop_{false};
91 : bool joined_{false};
92 : std::size_t num_threads_;
93 : char thread_name_prefix_[13]{}; // 12 chars max + null terminator
94 : std::once_flag start_flag_;
95 :
96 : public:
97 157 : ~impl() = default;
98 :
99 : // Destroy abandoned coroutine frames. Must be called
100 : // before execution_context::shutdown()/destroy() so
101 : // that suspended-frame destructors (e.g. delay_awaitable
102 : // calling timer_service::cancel()) run while services
103 : // are still valid.
104 : void
105 157 : drain_abandoned() noexcept
106 : {
107 356 : while(auto* c = pop())
108 : {
109 199 : auto h = c->h;
110 199 : if(h && h != std::noop_coroutine())
111 147 : h.destroy();
112 199 : }
113 157 : }
114 :
115 157 : impl(std::size_t num_threads, std::string_view thread_name_prefix)
116 157 : : num_threads_(num_threads)
117 : {
118 157 : if(num_threads_ == 0)
119 4 : num_threads_ = std::max(
120 2 : std::thread::hardware_concurrency(), 1u);
121 :
122 : // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
123 157 : auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
124 157 : thread_name_prefix_[n] = '\0';
125 157 : }
126 :
127 : void
128 830 : post(continuation& c)
129 : {
130 830 : ensure_started();
131 : {
132 830 : std::lock_guard<std::mutex> lock(mutex_);
133 830 : push(&c);
134 830 : }
135 830 : work_cv_.notify_one();
136 830 : }
137 :
138 : void
139 345 : on_work_started() noexcept
140 : {
141 345 : outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
142 345 : }
143 :
144 : void
145 345 : on_work_finished() noexcept
146 : {
147 345 : if(outstanding_work_.fetch_sub(
148 345 : 1, std::memory_order_acq_rel) == 1)
149 : {
150 85 : std::lock_guard<std::mutex> lock(mutex_);
151 85 : if(joined_ && !stop_)
152 4 : stop_ = true;
153 85 : done_cv_.notify_all();
154 85 : work_cv_.notify_all();
155 85 : }
156 345 : }
157 :
158 : void
159 168 : join() noexcept
160 : {
161 : {
162 168 : std::unique_lock<std::mutex> lock(mutex_);
163 168 : if(joined_)
164 11 : return;
165 157 : joined_ = true;
166 :
167 157 : if(outstanding_work_.load(
168 157 : std::memory_order_acquire) == 0)
169 : {
170 101 : stop_ = true;
171 101 : work_cv_.notify_all();
172 : }
173 : else
174 : {
175 56 : done_cv_.wait(lock, [this]{
176 61 : return stop_;
177 : });
178 : }
179 168 : }
180 :
181 336 : for(auto& t : threads_)
182 179 : if(t.joinable())
183 179 : t.join();
184 : }
185 :
186 : void
187 159 : stop() noexcept
188 : {
189 : {
190 159 : std::lock_guard<std::mutex> lock(mutex_);
191 159 : stop_ = true;
192 159 : }
193 159 : work_cv_.notify_all();
194 159 : done_cv_.notify_all();
195 159 : }
196 :
197 : private:
198 : void
199 830 : ensure_started()
200 : {
201 830 : std::call_once(start_flag_, [this]{
202 101 : threads_.reserve(num_threads_);
203 280 : for(std::size_t i = 0; i < num_threads_; ++i)
204 358 : threads_.emplace_back([this, i]{ run(i); });
205 101 : });
206 830 : }
207 :
208 : void
209 179 : run(std::size_t index)
210 : {
211 : // Build name; set_current_thread_name truncates to platform limits.
212 : char name[16];
213 179 : std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
214 179 : set_current_thread_name(name);
215 :
216 : for(;;)
217 : {
218 810 : continuation* c = nullptr;
219 : {
220 810 : std::unique_lock<std::mutex> lock(mutex_);
221 810 : work_cv_.wait(lock, [this]{
222 1329 : return !empty() ||
223 1329 : stop_;
224 : });
225 810 : if(stop_)
226 358 : return;
227 631 : c = pop();
228 810 : }
229 631 : if(c)
230 631 : safe_resume(c->h);
231 631 : }
232 : }
233 : };
234 :
235 : //------------------------------------------------------------------------------
236 :
237 157 : thread_pool::
238 : ~thread_pool()
239 : {
240 157 : impl_->stop();
241 157 : impl_->join();
242 157 : impl_->drain_abandoned();
243 157 : shutdown();
244 157 : destroy();
245 157 : delete impl_;
246 157 : }
247 :
248 157 : thread_pool::
249 157 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
250 157 : : impl_(new impl(num_threads, thread_name_prefix))
251 : {
252 157 : this->set_frame_allocator(std::allocator<void>{});
253 157 : }
254 :
255 : void
256 11 : thread_pool::
257 : join() noexcept
258 : {
259 11 : impl_->join();
260 11 : }
261 :
262 : void
263 2 : thread_pool::
264 : stop() noexcept
265 : {
266 2 : impl_->stop();
267 2 : }
268 :
269 : //------------------------------------------------------------------------------
270 :
271 : thread_pool::executor_type
272 163 : thread_pool::
273 : get_executor() const noexcept
274 : {
275 163 : return executor_type(
276 163 : const_cast<thread_pool&>(*this));
277 : }
278 :
279 : void
280 345 : thread_pool::executor_type::
281 : on_work_started() const noexcept
282 : {
283 345 : pool_->impl_->on_work_started();
284 345 : }
285 :
286 : void
287 345 : thread_pool::executor_type::
288 : on_work_finished() const noexcept
289 : {
290 345 : pool_->impl_->on_work_finished();
291 345 : }
292 :
293 : void
294 830 : thread_pool::executor_type::
295 : post(continuation& c) const
296 : {
297 830 : pool_->impl_->post(c);
298 830 : }
299 :
300 : } // capy
301 : } // boost
|