1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Michael Vandeberg
3  
// Copyright (c) 2026 Michael Vandeberg
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/boostorg/capy
8  
// Official repository: https://github.com/boostorg/capy
9  
//
9  
//
10  

10  

11  
#include <boost/capy/ex/thread_pool.hpp>
11  
#include <boost/capy/ex/thread_pool.hpp>
12  
#include <boost/capy/continuation.hpp>
12  
#include <boost/capy/continuation.hpp>
 
13 +
#include <boost/capy/ex/frame_allocator.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
14  
#include <boost/capy/test/thread_name.hpp>
14  
#include <algorithm>
15  
#include <algorithm>
15  
#include <atomic>
16  
#include <atomic>
16  
#include <condition_variable>
17  
#include <condition_variable>
17  
#include <cstdio>
18  
#include <cstdio>
18  
#include <mutex>
19  
#include <mutex>
19  
#include <thread>
20  
#include <thread>
20  
#include <vector>
21  
#include <vector>
21  

22  

22  
/*
23  
/*
23  
    Thread pool implementation using a shared work queue.
24  
    Thread pool implementation using a shared work queue.
24  

25  

25  
    Work items are continuations linked via their intrusive next pointer,
26  
    Work items are continuations linked via their intrusive next pointer,
26  
    stored in a single queue protected by a mutex. No per-post heap
27  
    stored in a single queue protected by a mutex. No per-post heap
27  
    allocation: the continuation is owned by the caller and linked
28  
    allocation: the continuation is owned by the caller and linked
28  
    directly. Worker threads wait on a condition_variable until work
29  
    directly. Worker threads wait on a condition_variable until work
29  
    is available or stop is requested.
30  
    is available or stop is requested.
30  

31  

31  
    Threads are started lazily on first post() via std::call_once to avoid
32  
    Threads are started lazily on first post() via std::call_once to avoid
32  
    spawning threads for pools that are constructed but never used. Each
33  
    spawning threads for pools that are constructed but never used. Each
33  
    thread is named with a configurable prefix plus index for debugger
34  
    thread is named with a configurable prefix plus index for debugger
34  
    visibility.
35  
    visibility.
35  

36  

36  
    Work tracking: on_work_started/on_work_finished maintain an atomic
37  
    Work tracking: on_work_started/on_work_finished maintain an atomic
37  
    outstanding_work_ counter. join() blocks until this counter reaches
38  
    outstanding_work_ counter. join() blocks until this counter reaches
38  
    zero, then signals workers to stop and joins threads.
39  
    zero, then signals workers to stop and joins threads.
39  

40  

40  
    Two shutdown paths:
41  
    Two shutdown paths:
41  
    - join(): waits for outstanding work to drain, then stops workers.
42  
    - join(): waits for outstanding work to drain, then stops workers.
42  
    - stop(): immediately signals workers to exit; queued work is abandoned.
43  
    - stop(): immediately signals workers to exit; queued work is abandoned.
43  
    - Destructor: stop() then join() (abandon + wait for threads).
44  
    - Destructor: stop() then join() (abandon + wait for threads).
44  
*/
45  
*/
45  

46  

46  
namespace boost {
47  
namespace boost {
47  
namespace capy {
48  
namespace capy {
48  

49  

49  
//------------------------------------------------------------------------------
50  
//------------------------------------------------------------------------------
50  

51  

51  
class thread_pool::impl
52  
class thread_pool::impl
52  
{
53  
{
53  
    // Intrusive queue of continuations via continuation::next.
54  
    // Intrusive queue of continuations via continuation::next.
54  
    // No per-post allocation: the continuation is owned by the caller.
55  
    // No per-post allocation: the continuation is owned by the caller.
55  
    continuation* head_ = nullptr;
56  
    continuation* head_ = nullptr;
56  
    continuation* tail_ = nullptr;
57  
    continuation* tail_ = nullptr;
57  

58  

58  
    void push(continuation* c) noexcept
59  
    void push(continuation* c) noexcept
59  
    {
60  
    {
60  
        c->next = nullptr;
61  
        c->next = nullptr;
61  
        if(tail_)
62  
        if(tail_)
62  
            tail_->next = c;
63  
            tail_->next = c;
63  
        else
64  
        else
64  
            head_ = c;
65  
            head_ = c;
65  
        tail_ = c;
66  
        tail_ = c;
66  
    }
67  
    }
67  

68  

68  
    continuation* pop() noexcept
69  
    continuation* pop() noexcept
69  
    {
70  
    {
70  
        if(!head_)
71  
        if(!head_)
71  
            return nullptr;
72  
            return nullptr;
72  
        continuation* c = head_;
73  
        continuation* c = head_;
73  
        head_ = head_->next;
74  
        head_ = head_->next;
74  
        if(!head_)
75  
        if(!head_)
75  
            tail_ = nullptr;
76  
            tail_ = nullptr;
76  
        return c;
77  
        return c;
77  
    }
78  
    }
78  

79  

79  
    bool empty() const noexcept
80  
    bool empty() const noexcept
80  
    {
81  
    {
81  
        return head_ == nullptr;
82  
        return head_ == nullptr;
82  
    }
83  
    }
83  

84  

84  
    std::mutex mutex_;
85  
    std::mutex mutex_;
85  
    std::condition_variable work_cv_;
86  
    std::condition_variable work_cv_;
86  
    std::condition_variable done_cv_;
87  
    std::condition_variable done_cv_;
87  
    std::vector<std::thread> threads_;
88  
    std::vector<std::thread> threads_;
88  
    std::atomic<std::size_t> outstanding_work_{0};
89  
    std::atomic<std::size_t> outstanding_work_{0};
89  
    bool stop_{false};
90  
    bool stop_{false};
90  
    bool joined_{false};
91  
    bool joined_{false};
91  
    std::size_t num_threads_;
92  
    std::size_t num_threads_;
92  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
93  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
93  
    std::once_flag start_flag_;
94  
    std::once_flag start_flag_;
94  

95  

95  
public:
96  
public:
96  
    ~impl() = default;
97  
    ~impl() = default;
97  

98  

98  
    // Destroy abandoned coroutine frames. Must be called
99  
    // Destroy abandoned coroutine frames. Must be called
99  
    // before execution_context::shutdown()/destroy() so
100  
    // before execution_context::shutdown()/destroy() so
100  
    // that suspended-frame destructors (e.g. delay_awaitable
101  
    // that suspended-frame destructors (e.g. delay_awaitable
101  
    // calling timer_service::cancel()) run while services
102  
    // calling timer_service::cancel()) run while services
102  
    // are still valid.
103  
    // are still valid.
103  
    void
104  
    void
104  
    drain_abandoned() noexcept
105  
    drain_abandoned() noexcept
105  
    {
106  
    {
106  
        while(auto* c = pop())
107  
        while(auto* c = pop())
107  
        {
108  
        {
108  
            auto h = c->h;
109  
            auto h = c->h;
109  
            if(h && h != std::noop_coroutine())
110  
            if(h && h != std::noop_coroutine())
110  
                h.destroy();
111  
                h.destroy();
111  
        }
112  
        }
112  
    }
113  
    }
113  

114  

114  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
115  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
115  
        : num_threads_(num_threads)
116  
        : num_threads_(num_threads)
116  
    {
117  
    {
117  
        if(num_threads_ == 0)
118  
        if(num_threads_ == 0)
118  
            num_threads_ = std::max(
119  
            num_threads_ = std::max(
119  
                std::thread::hardware_concurrency(), 1u);
120  
                std::thread::hardware_concurrency(), 1u);
120  

121  

121  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
122  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
122  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
123  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
123  
        thread_name_prefix_[n] = '\0';
124  
        thread_name_prefix_[n] = '\0';
124  
    }
125  
    }
125  

126  

126  
    void
127  
    void
127  
    post(continuation& c)
128  
    post(continuation& c)
128  
    {
129  
    {
129  
        ensure_started();
130  
        ensure_started();
130  
        {
131  
        {
131  
            std::lock_guard<std::mutex> lock(mutex_);
132  
            std::lock_guard<std::mutex> lock(mutex_);
132  
            push(&c);
133  
            push(&c);
133  
        }
134  
        }
134  
        work_cv_.notify_one();
135  
        work_cv_.notify_one();
135  
    }
136  
    }
136  

137  

137  
    void
138  
    void
138  
    on_work_started() noexcept
139  
    on_work_started() noexcept
139  
    {
140  
    {
140  
        outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
141  
        outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
141  
    }
142  
    }
142  

143  

143  
    void
144  
    void
144  
    on_work_finished() noexcept
145  
    on_work_finished() noexcept
145  
    {
146  
    {
146  
        if(outstanding_work_.fetch_sub(
147  
        if(outstanding_work_.fetch_sub(
147  
            1, std::memory_order_acq_rel) == 1)
148  
            1, std::memory_order_acq_rel) == 1)
148  
        {
149  
        {
149  
            std::lock_guard<std::mutex> lock(mutex_);
150  
            std::lock_guard<std::mutex> lock(mutex_);
150  
            if(joined_ && !stop_)
151  
            if(joined_ && !stop_)
151  
                stop_ = true;
152  
                stop_ = true;
152  
            done_cv_.notify_all();
153  
            done_cv_.notify_all();
153  
            work_cv_.notify_all();
154  
            work_cv_.notify_all();
154  
        }
155  
        }
155  
    }
156  
    }
156  

157  

157  
    void
158  
    void
158  
    join() noexcept
159  
    join() noexcept
159  
    {
160  
    {
160  
        {
161  
        {
161  
            std::unique_lock<std::mutex> lock(mutex_);
162  
            std::unique_lock<std::mutex> lock(mutex_);
162  
            if(joined_)
163  
            if(joined_)
163  
                return;
164  
                return;
164  
            joined_ = true;
165  
            joined_ = true;
165  

166  

166  
            if(outstanding_work_.load(
167  
            if(outstanding_work_.load(
167  
                std::memory_order_acquire) == 0)
168  
                std::memory_order_acquire) == 0)
168  
            {
169  
            {
169  
                stop_ = true;
170  
                stop_ = true;
170  
                work_cv_.notify_all();
171  
                work_cv_.notify_all();
171  
            }
172  
            }
172  
            else
173  
            else
173  
            {
174  
            {
174  
                done_cv_.wait(lock, [this]{
175  
                done_cv_.wait(lock, [this]{
175  
                    return stop_;
176  
                    return stop_;
176  
                });
177  
                });
177  
            }
178  
            }
178  
        }
179  
        }
179  

180  

180  
        for(auto& t : threads_)
181  
        for(auto& t : threads_)
181  
            if(t.joinable())
182  
            if(t.joinable())
182  
                t.join();
183  
                t.join();
183  
    }
184  
    }
184  

185  

185  
    void
186  
    void
186  
    stop() noexcept
187  
    stop() noexcept
187  
    {
188  
    {
188  
        {
189  
        {
189  
            std::lock_guard<std::mutex> lock(mutex_);
190  
            std::lock_guard<std::mutex> lock(mutex_);
190  
            stop_ = true;
191  
            stop_ = true;
191  
        }
192  
        }
192  
        work_cv_.notify_all();
193  
        work_cv_.notify_all();
193  
        done_cv_.notify_all();
194  
        done_cv_.notify_all();
194  
    }
195  
    }
195  

196  

196  
private:
197  
private:
197  
    void
198  
    void
198  
    ensure_started()
199  
    ensure_started()
199  
    {
200  
    {
200  
        std::call_once(start_flag_, [this]{
201  
        std::call_once(start_flag_, [this]{
201  
            threads_.reserve(num_threads_);
202  
            threads_.reserve(num_threads_);
202  
            for(std::size_t i = 0; i < num_threads_; ++i)
203  
            for(std::size_t i = 0; i < num_threads_; ++i)
203  
                threads_.emplace_back([this, i]{ run(i); });
204  
                threads_.emplace_back([this, i]{ run(i); });
204  
        });
205  
        });
205  
    }
206  
    }
206  

207  

207  
    void
208  
    void
208  
    run(std::size_t index)
209  
    run(std::size_t index)
209  
    {
210  
    {
210  
        // Build name; set_current_thread_name truncates to platform limits.
211  
        // Build name; set_current_thread_name truncates to platform limits.
211  
        char name[16];
212  
        char name[16];
212  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
213  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
213  
        set_current_thread_name(name);
214  
        set_current_thread_name(name);
214  

215  

215  
        for(;;)
216  
        for(;;)
216  
        {
217  
        {
217  
            continuation* c = nullptr;
218  
            continuation* c = nullptr;
218  
            {
219  
            {
219  
                std::unique_lock<std::mutex> lock(mutex_);
220  
                std::unique_lock<std::mutex> lock(mutex_);
220  
                work_cv_.wait(lock, [this]{
221  
                work_cv_.wait(lock, [this]{
221  
                    return !empty() ||
222  
                    return !empty() ||
222  
                        stop_;
223  
                        stop_;
223  
                });
224  
                });
224  
                if(stop_)
225  
                if(stop_)
225  
                    return;
226  
                    return;
226  
                c = pop();
227  
                c = pop();
227  
            }
228  
            }
228  
            if(c)
229  
            if(c)
229 -
                c->h.resume();
230 +
                safe_resume(c->h);
230  
        }
231  
        }
231  
    }
232  
    }
232  
};
233  
};
233  

234  

234  
//------------------------------------------------------------------------------
235  
//------------------------------------------------------------------------------
235  

236  

236  
thread_pool::
237  
thread_pool::
237  
~thread_pool()
238  
~thread_pool()
238  
{
239  
{
239  
    impl_->stop();
240  
    impl_->stop();
240  
    impl_->join();
241  
    impl_->join();
241  
    impl_->drain_abandoned();
242  
    impl_->drain_abandoned();
242  
    shutdown();
243  
    shutdown();
243  
    destroy();
244  
    destroy();
244  
    delete impl_;
245  
    delete impl_;
245  
}
246  
}
246  

247  

247  
thread_pool::
248  
thread_pool::
248  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
249  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
249  
    : impl_(new impl(num_threads, thread_name_prefix))
250  
    : impl_(new impl(num_threads, thread_name_prefix))
250  
{
251  
{
251  
    this->set_frame_allocator(std::allocator<void>{});
252  
    this->set_frame_allocator(std::allocator<void>{});
252  
}
253  
}
253  

254  

254  
void
255  
void
255  
thread_pool::
256  
thread_pool::
256  
join() noexcept
257  
join() noexcept
257  
{
258  
{
258  
    impl_->join();
259  
    impl_->join();
259  
}
260  
}
260  

261  

261  
void
262  
void
262  
thread_pool::
263  
thread_pool::
263  
stop() noexcept
264  
stop() noexcept
264  
{
265  
{
265  
    impl_->stop();
266  
    impl_->stop();
266  
}
267  
}
267  

268  

268  
//------------------------------------------------------------------------------
269  
//------------------------------------------------------------------------------
269  

270  

270  
thread_pool::executor_type
271  
thread_pool::executor_type
271  
thread_pool::
272  
thread_pool::
272  
get_executor() const noexcept
273  
get_executor() const noexcept
273  
{
274  
{
274  
    return executor_type(
275  
    return executor_type(
275  
        const_cast<thread_pool&>(*this));
276  
        const_cast<thread_pool&>(*this));
276  
}
277  
}
277  

278  

278  
void
279  
void
279  
thread_pool::executor_type::
280  
thread_pool::executor_type::
280  
on_work_started() const noexcept
281  
on_work_started() const noexcept
281  
{
282  
{
282  
    pool_->impl_->on_work_started();
283  
    pool_->impl_->on_work_started();
283  
}
284  
}
284  

285  

285  
void
286  
void
286  
thread_pool::executor_type::
287  
thread_pool::executor_type::
287  
on_work_finished() const noexcept
288  
on_work_finished() const noexcept
288  
{
289  
{
289  
    pool_->impl_->on_work_finished();
290  
    pool_->impl_->on_work_finished();
290  
}
291  
}
291  

292  

292  
void
293  
void
293  
thread_pool::executor_type::
294  
thread_pool::executor_type::
294  
post(continuation& c) const
295  
post(continuation& c) const
295  
{
296  
{
296  
    pool_->impl_->post(c);
297  
    pool_->impl_->post(c);
297  
}
298  
}
298  

299  

299  
} // capy
300  
} // capy
300  
} // boost
301  
} // boost