src/ex/detail/strand_queue.hpp
76.6% Lines (49/64)
92.3% List of functions (12/13)
Functions (13)
Function
Calls
Lines
Blocks
boost::capy::detail::strand_op::promise_type::get_return_object()
:64
332x
100.0%
100.0%
boost::capy::detail::strand_op::promise_type::initial_suspend()
:70
332x
100.0%
100.0%
boost::capy::detail::strand_op::promise_type::final_suspend()
:76
332x
100.0%
100.0%
boost::capy::detail::strand_op::promise_type::return_void()
:82
332x
100.0%
100.0%
boost::capy::detail::strand_queue::make_strand_op(boost::capy::detail::strand_queue&, std::__n4861::coroutine_handle<void>)
:127
332x
100.0%
50.0%
boost::capy::detail::strand_queue::strand_queue()
:137
4853x
100.0%
100.0%
boost::capy::detail::strand_queue::~strand_queue()
:147
4853x
36.4%
56.0%
boost::capy::detail::strand_queue::empty() const
:171
23x
100.0%
100.0%
boost::capy::detail::strand_queue::push(std::__n4861::coroutine_handle<void>)
:185
332x
100.0%
100.0%
boost::capy::detail::strand_queue::take_all()
:244
23x
100.0%
100.0%
boost::capy::detail::strand_queue::dispatch_batch(boost::capy::detail::strand_queue::taken_batch&)
:261
23x
100.0%
100.0%
boost::capy::detail::strand_op::promise_type::operator new(unsigned long, boost::capy::detail::strand_queue&, std::__n4861::coroutine_handle<void>)
:285
332x
75.0%
83.0%
boost::capy::detail::strand_op::promise_type::operator delete(void*, unsigned long)
:318
0
0.0%
0.0%
| Line | TLA | Hits | Source Code |
|---|---|---|---|
| 1 | // | ||
| 2 | // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) | ||
| 3 | // | ||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | ||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | ||
| 6 | // | ||
| 7 | // Official repository: https://github.com/cppalliance/capy | ||
| 8 | // | ||
| 9 | |||
| 10 | #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | ||
| 11 | #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP | ||
| 12 | |||
| 13 | #include <boost/capy/detail/config.hpp> | ||
| 14 | #include <boost/capy/ex/frame_allocator.hpp> | ||
| 15 | |||
| 16 | #include <coroutine> | ||
| 17 | #include <cstddef> | ||
| 18 | #include <exception> | ||
| 19 | |||
| 20 | namespace boost { | ||
| 21 | namespace capy { | ||
| 22 | namespace detail { | ||
| 23 | |||
| 24 | class strand_queue; | ||
| 25 | |||
| 26 | //---------------------------------------------------------- | ||
| 27 | |||
| 28 | // Metadata stored before the coroutine frame | ||
| 29 | struct frame_prefix | ||
| 30 | { | ||
| 31 | frame_prefix* next; | ||
| 32 | strand_queue* queue; | ||
| 33 | std::size_t alloc_size; | ||
| 34 | }; | ||
| 35 | |||
| 36 | //---------------------------------------------------------- | ||
| 37 | |||
| 38 | /** Wrapper coroutine for strand queue dispatch operations. | ||
| 39 | |||
| 40 | This coroutine wraps a target coroutine handle and resumes | ||
| 41 | it when dispatched. The wrapper ensures control returns to | ||
| 42 | the dispatch loop after the target suspends or completes. | ||
| 43 | |||
| 44 | The promise contains an intrusive list node for queue | ||
| 45 | storage and supports a custom allocator that recycles | ||
| 46 | coroutine frames via a free list. | ||
| 47 | */ | ||
| 48 | struct strand_op | ||
| 49 | { | ||
| 50 | struct promise_type | ||
| 51 | { | ||
| 52 | promise_type* next = nullptr; | ||
| 53 | |||
| 54 | void* | ||
| 55 | operator new( | ||
| 56 | std::size_t size, | ||
| 57 | strand_queue& q, | ||
| 58 | std::coroutine_handle<void>); | ||
| 59 | |||
| 60 | void | ||
| 61 | operator delete(void* p, std::size_t); | ||
| 62 | |||
| 63 | strand_op | ||
| 64 | 332x | get_return_object() noexcept | |
| 65 | { | ||
| 66 | 332x | return {std::coroutine_handle<promise_type>::from_promise(*this)}; | |
| 67 | } | ||
| 68 | |||
| 69 | std::suspend_always | ||
| 70 | 332x | initial_suspend() noexcept | |
| 71 | { | ||
| 72 | 332x | return {}; | |
| 73 | } | ||
| 74 | |||
| 75 | std::suspend_always | ||
| 76 | 332x | final_suspend() noexcept | |
| 77 | { | ||
| 78 | 332x | return {}; | |
| 79 | } | ||
| 80 | |||
| 81 | void | ||
| 82 | 332x | return_void() noexcept | |
| 83 | { | ||
| 84 | 332x | } | |
| 85 | |||
| 86 | void | ||
| 87 | unhandled_exception() | ||
| 88 | { | ||
| 89 | std::terminate(); | ||
| 90 | } | ||
| 91 | }; | ||
| 92 | |||
| 93 | std::coroutine_handle<promise_type> h_; | ||
| 94 | }; | ||
| 95 | |||
| 96 | //---------------------------------------------------------- | ||
| 97 | |||
| 98 | /** Single-threaded dispatch queue for coroutine handles. | ||
| 99 | |||
| 100 | This queue stores coroutine handles and resumes them | ||
| 101 | sequentially when dispatch() is called. Each pushed | ||
| 102 | handle is wrapped in a strand_op coroutine that ensures | ||
| 103 | control returns to the dispatch loop after the target | ||
| 104 | suspends or completes. | ||
| 105 | |||
| 106 | The queue uses an intrusive singly-linked list through | ||
| 107 | the promise type to avoid separate node allocations. | ||
| 108 | A free list recycles wrapper coroutine frames to reduce | ||
| 109 | allocation overhead during repeated push/dispatch cycles. | ||
| 110 | |||
| 111 | @par Thread Safety | ||
| 112 | This class is not thread-safe. All operations must be | ||
| 113 | called from a single thread. | ||
| 114 | */ | ||
| 115 | class strand_queue | ||
| 116 | { | ||
| 117 | using promise_type = strand_op::promise_type; | ||
| 118 | |||
| 119 | promise_type* head_ = nullptr; | ||
| 120 | promise_type* tail_ = nullptr; | ||
| 121 | frame_prefix* free_list_ = nullptr; | ||
| 122 | |||
| 123 | friend struct strand_op::promise_type; | ||
| 124 | |||
| 125 | static | ||
| 126 | strand_op | ||
| 127 | 332x | make_strand_op( | |
| 128 | strand_queue& q, | ||
| 129 | std::coroutine_handle<void> target) | ||
| 130 | { | ||
| 131 | (void)q; | ||
| 132 | safe_resume(target); | ||
| 133 | co_return; | ||
| 134 | 664x | } | |
| 135 | |||
| 136 | public: | ||
| 137 | 4853x | strand_queue() = default; | |
| 138 | |||
| 139 | strand_queue(strand_queue const&) = delete; | ||
| 140 | strand_queue& operator=(strand_queue const&) = delete; | ||
| 141 | |||
| 142 | /** Destructor. | ||
| 143 | |||
| 144 | Destroys any pending wrappers without resuming them, | ||
| 145 | then frees all memory in the free list. | ||
| 146 | */ | ||
| 147 | 4853x | ~strand_queue() | |
| 148 | { | ||
| 149 | // Destroy pending wrappers | ||
| 150 | 4853x | while(head_) | |
| 151 | { | ||
| 152 | ✗ | promise_type* p = head_; | |
| 153 | ✗ | head_ = p->next; | |
| 154 | |||
| 155 | ✗ | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | |
| 156 | ✗ | h.destroy(); | |
| 157 | } | ||
| 158 | |||
| 159 | // Free the free list memory | ||
| 160 | 4853x | while(free_list_) | |
| 161 | { | ||
| 162 | ✗ | frame_prefix* prefix = free_list_; | |
| 163 | ✗ | free_list_ = prefix->next; | |
| 164 | ✗ | ::operator delete(prefix); | |
| 165 | } | ||
| 166 | 4853x | } | |
| 167 | |||
| 168 | /** Returns true if there are no pending operations. | ||
| 169 | */ | ||
| 170 | bool | ||
| 171 | 23x | empty() const noexcept | |
| 172 | { | ||
| 173 | 23x | return head_ == nullptr; | |
| 174 | } | ||
| 175 | |||
| 176 | /** Push a coroutine handle to the queue. | ||
| 177 | |||
| 178 | Creates a wrapper coroutine and appends it to the | ||
| 179 | queue. The wrapper will resume the target handle | ||
| 180 | when dispatch() processes it. | ||
| 181 | |||
| 182 | @param h The coroutine handle to dispatch. | ||
| 183 | */ | ||
| 184 | void | ||
| 185 | 332x | push(std::coroutine_handle<void> h) | |
| 186 | { | ||
| 187 | 332x | strand_op op = make_strand_op(*this, h); | |
| 188 | |||
| 189 | 332x | promise_type* p = &op.h_.promise(); | |
| 190 | 332x | p->next = nullptr; | |
| 191 | |||
| 192 | 332x | if(tail_) | |
| 193 | 309x | tail_->next = p; | |
| 194 | else | ||
| 195 | 23x | head_ = p; | |
| 196 | 332x | tail_ = p; | |
| 197 | 332x | } | |
| 198 | |||
| 199 | /** Resume all queued coroutines in sequence. | ||
| 200 | |||
| 201 | Processes each wrapper in FIFO order, resuming its | ||
| 202 | target coroutine. After each target suspends or | ||
| 203 | completes, the wrapper is destroyed and its frame | ||
| 204 | is added to the free list for reuse. | ||
| 205 | |||
| 206 | Coroutines resumed during dispatch may push new | ||
| 207 | handles, which will also be processed in the same | ||
| 208 | dispatch call. | ||
| 209 | |||
| 210 | @warning Not thread-safe. Do not call while another | ||
| 211 | thread may be calling push(). | ||
| 212 | */ | ||
| 213 | void | ||
| 214 | dispatch() | ||
| 215 | { | ||
| 216 | while(head_) | ||
| 217 | { | ||
| 218 | promise_type* p = head_; | ||
| 219 | head_ = p->next; | ||
| 220 | if(!head_) | ||
| 221 | tail_ = nullptr; | ||
| 222 | |||
| 223 | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | ||
| 224 | safe_resume(h); | ||
| 225 | h.destroy(); | ||
| 226 | } | ||
| 227 | } | ||
| 228 | |||
| 229 | /** Batch of taken items for thread-safe dispatch. */ | ||
| 230 | struct taken_batch | ||
| 231 | { | ||
| 232 | promise_type* head = nullptr; | ||
| 233 | promise_type* tail = nullptr; | ||
| 234 | }; | ||
| 235 | |||
| 236 | /** Take all pending items atomically. | ||
| 237 | |||
| 238 | Removes all items from the queue and returns them | ||
| 239 | as a batch. The queue is left empty. | ||
| 240 | |||
| 241 | @return The batch of taken items. | ||
| 242 | */ | ||
| 243 | taken_batch | ||
| 244 | 23x | take_all() noexcept | |
| 245 | { | ||
| 246 | 23x | taken_batch batch{head_, tail_}; | |
| 247 | 23x | head_ = tail_ = nullptr; | |
| 248 | 23x | return batch; | |
| 249 | } | ||
| 250 | |||
| 251 | /** Dispatch a batch of taken items. | ||
| 252 | |||
| 253 | @param batch The batch to dispatch. | ||
| 254 | |||
| 255 | @note This is thread-safe w.r.t. push() because it doesn't | ||
| 256 | access the queue's free_list_. Frames are deleted directly | ||
| 257 | rather than recycled. | ||
| 258 | */ | ||
| 259 | static | ||
| 260 | void | ||
| 261 | 23x | dispatch_batch(taken_batch& batch) | |
| 262 | { | ||
| 263 | 355x | while(batch.head) | |
| 264 | { | ||
| 265 | 332x | promise_type* p = batch.head; | |
| 266 | 332x | batch.head = p->next; | |
| 267 | |||
| 268 | 332x | auto h = std::coroutine_handle<promise_type>::from_promise(*p); | |
| 269 | 332x | safe_resume(h); | |
| 270 | // Don't use h.destroy() - it would call operator delete which | ||
| 271 | // accesses the queue's free_list_ (race with push). | ||
| 272 | // Instead, manually free the frame without recycling. | ||
| 273 | // h.address() returns the frame base (what operator new returned). | ||
| 274 | 332x | frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1; | |
| 275 | 332x | ::operator delete(prefix); | |
| 276 | } | ||
| 277 | 23x | batch.tail = nullptr; | |
| 278 | 23x | } | |
| 279 | }; | ||
| 280 | |||
| 281 | //---------------------------------------------------------- | ||
| 282 | |||
| 283 | inline | ||
| 284 | void* | ||
| 285 | 332x | strand_op::promise_type::operator new( | |
| 286 | std::size_t size, | ||
| 287 | strand_queue& q, | ||
| 288 | std::coroutine_handle<void>) | ||
| 289 | { | ||
| 290 | // Total size includes prefix | ||
| 291 | 332x | std::size_t alloc_size = size + sizeof(frame_prefix); | |
| 292 | void* raw; | ||
| 293 | |||
| 294 | // Try to reuse from free list | ||
| 295 | 332x | if(q.free_list_) | |
| 296 | { | ||
| 297 | ✗ | frame_prefix* prefix = q.free_list_; | |
| 298 | ✗ | q.free_list_ = prefix->next; | |
| 299 | ✗ | raw = prefix; | |
| 300 | } | ||
| 301 | else | ||
| 302 | { | ||
| 303 | 332x | raw = ::operator new(alloc_size); | |
| 304 | } | ||
| 305 | |||
| 306 | // Initialize prefix | ||
| 307 | 332x | frame_prefix* prefix = static_cast<frame_prefix*>(raw); | |
| 308 | 332x | prefix->next = nullptr; | |
| 309 | 332x | prefix->queue = &q; | |
| 310 | 332x | prefix->alloc_size = alloc_size; | |
| 311 | |||
| 312 | // Return pointer AFTER the prefix (this is where coroutine frame goes) | ||
| 313 | 332x | return prefix + 1; | |
| 314 | } | ||
| 315 | |||
| 316 | inline | ||
| 317 | void | ||
| 318 | ✗ | strand_op::promise_type::operator delete(void* p, std::size_t) | |
| 319 | { | ||
| 320 | // Calculate back to get the prefix | ||
| 321 | ✗ | frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1; | |
| 322 | |||
| 323 | // Add to free list | ||
| 324 | ✗ | prefix->next = prefix->queue->free_list_; | |
| 325 | ✗ | prefix->queue->free_list_ = prefix; | |
| 326 | ✗ | } | |
| 327 | |||
| 328 | } // namespace detail | ||
| 329 | } // namespace capy | ||
| 330 | } // namespace boost | ||
| 331 | |||
| 332 | #endif | ||
| 333 |