src/ex/detail/strand_queue.hpp

76.6% Lines (49/64) 92.3% List of functions (12/13)
strand_queue.hpp
f(x) Functions (13)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11 #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/ex/frame_allocator.hpp>
15
16 #include <coroutine>
17 #include <cstddef>
18 #include <exception>
19
20 namespace boost {
21 namespace capy {
22 namespace detail {
23
24 class strand_queue;
25
26 //----------------------------------------------------------
27
28 // Metadata stored before the coroutine frame
29 struct frame_prefix
30 {
31 frame_prefix* next;
32 strand_queue* queue;
33 std::size_t alloc_size;
34 };
35
36 //----------------------------------------------------------
37
38 /** Wrapper coroutine for strand queue dispatch operations.
39
40 This coroutine wraps a target coroutine handle and resumes
41 it when dispatched. The wrapper ensures control returns to
42 the dispatch loop after the target suspends or completes.
43
44 The promise contains an intrusive list node for queue
45 storage and supports a custom allocator that recycles
46 coroutine frames via a free list.
47 */
48 struct strand_op
49 {
50 struct promise_type
51 {
52 promise_type* next = nullptr;
53
54 void*
55 operator new(
56 std::size_t size,
57 strand_queue& q,
58 std::coroutine_handle<void>);
59
60 void
61 operator delete(void* p, std::size_t);
62
63 strand_op
64 332x get_return_object() noexcept
65 {
66 332x return {std::coroutine_handle<promise_type>::from_promise(*this)};
67 }
68
69 std::suspend_always
70 332x initial_suspend() noexcept
71 {
72 332x return {};
73 }
74
75 std::suspend_always
76 332x final_suspend() noexcept
77 {
78 332x return {};
79 }
80
81 void
82 332x return_void() noexcept
83 {
84 332x }
85
86 void
87 unhandled_exception()
88 {
89 std::terminate();
90 }
91 };
92
93 std::coroutine_handle<promise_type> h_;
94 };
95
96 //----------------------------------------------------------
97
98 /** Single-threaded dispatch queue for coroutine handles.
99
100 This queue stores coroutine handles and resumes them
101 sequentially when dispatch() is called. Each pushed
102 handle is wrapped in a strand_op coroutine that ensures
103 control returns to the dispatch loop after the target
104 suspends or completes.
105
106 The queue uses an intrusive singly-linked list through
107 the promise type to avoid separate node allocations.
108 A free list recycles wrapper coroutine frames to reduce
109 allocation overhead during repeated push/dispatch cycles.
110
111 @par Thread Safety
112 This class is not thread-safe. All operations must be
113 called from a single thread.
114 */
115 class strand_queue
116 {
117 using promise_type = strand_op::promise_type;
118
119 promise_type* head_ = nullptr;
120 promise_type* tail_ = nullptr;
121 frame_prefix* free_list_ = nullptr;
122
123 friend struct strand_op::promise_type;
124
125 static
126 strand_op
127 332x make_strand_op(
128 strand_queue& q,
129 std::coroutine_handle<void> target)
130 {
131 (void)q;
132 safe_resume(target);
133 co_return;
134 664x }
135
136 public:
137 4853x strand_queue() = default;
138
139 strand_queue(strand_queue const&) = delete;
140 strand_queue& operator=(strand_queue const&) = delete;
141
142 /** Destructor.
143
144 Destroys any pending wrappers without resuming them,
145 then frees all memory in the free list.
146 */
147 4853x ~strand_queue()
148 {
149 // Destroy pending wrappers
150 4853x while(head_)
151 {
152 promise_type* p = head_;
153 head_ = p->next;
154
155 auto h = std::coroutine_handle<promise_type>::from_promise(*p);
156 h.destroy();
157 }
158
159 // Free the free list memory
160 4853x while(free_list_)
161 {
162 frame_prefix* prefix = free_list_;
163 free_list_ = prefix->next;
164 ::operator delete(prefix);
165 }
166 4853x }
167
168 /** Returns true if there are no pending operations.
169 */
170 bool
171 23x empty() const noexcept
172 {
173 23x return head_ == nullptr;
174 }
175
176 /** Push a coroutine handle to the queue.
177
178 Creates a wrapper coroutine and appends it to the
179 queue. The wrapper will resume the target handle
180 when dispatch() processes it.
181
182 @param h The coroutine handle to dispatch.
183 */
184 void
185 332x push(std::coroutine_handle<void> h)
186 {
187 332x strand_op op = make_strand_op(*this, h);
188
189 332x promise_type* p = &op.h_.promise();
190 332x p->next = nullptr;
191
192 332x if(tail_)
193 309x tail_->next = p;
194 else
195 23x head_ = p;
196 332x tail_ = p;
197 332x }
198
199 /** Resume all queued coroutines in sequence.
200
201 Processes each wrapper in FIFO order, resuming its
202 target coroutine. After each target suspends or
203 completes, the wrapper is destroyed and its frame
204 is added to the free list for reuse.
205
206 Coroutines resumed during dispatch may push new
207 handles, which will also be processed in the same
208 dispatch call.
209
210 @warning Not thread-safe. Do not call while another
211 thread may be calling push().
212 */
213 void
214 dispatch()
215 {
216 while(head_)
217 {
218 promise_type* p = head_;
219 head_ = p->next;
220 if(!head_)
221 tail_ = nullptr;
222
223 auto h = std::coroutine_handle<promise_type>::from_promise(*p);
224 safe_resume(h);
225 h.destroy();
226 }
227 }
228
229 /** Batch of taken items for thread-safe dispatch. */
230 struct taken_batch
231 {
232 promise_type* head = nullptr;
233 promise_type* tail = nullptr;
234 };
235
236 /** Take all pending items atomically.
237
238 Removes all items from the queue and returns them
239 as a batch. The queue is left empty.
240
241 @return The batch of taken items.
242 */
243 taken_batch
244 23x take_all() noexcept
245 {
246 23x taken_batch batch{head_, tail_};
247 23x head_ = tail_ = nullptr;
248 23x return batch;
249 }
250
251 /** Dispatch a batch of taken items.
252
253 @param batch The batch to dispatch.
254
255 @note This is thread-safe w.r.t. push() because it doesn't
256 access the queue's free_list_. Frames are deleted directly
257 rather than recycled.
258 */
259 static
260 void
261 23x dispatch_batch(taken_batch& batch)
262 {
263 355x while(batch.head)
264 {
265 332x promise_type* p = batch.head;
266 332x batch.head = p->next;
267
268 332x auto h = std::coroutine_handle<promise_type>::from_promise(*p);
269 332x safe_resume(h);
270 // Don't use h.destroy() - it would call operator delete which
271 // accesses the queue's free_list_ (race with push).
272 // Instead, manually free the frame without recycling.
273 // h.address() returns the frame base (what operator new returned).
274 332x frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
275 332x ::operator delete(prefix);
276 }
277 23x batch.tail = nullptr;
278 23x }
279 };
280
281 //----------------------------------------------------------
282
283 inline
284 void*
285 332x strand_op::promise_type::operator new(
286 std::size_t size,
287 strand_queue& q,
288 std::coroutine_handle<void>)
289 {
290 // Total size includes prefix
291 332x std::size_t alloc_size = size + sizeof(frame_prefix);
292 void* raw;
293
294 // Try to reuse from free list
295 332x if(q.free_list_)
296 {
297 frame_prefix* prefix = q.free_list_;
298 q.free_list_ = prefix->next;
299 raw = prefix;
300 }
301 else
302 {
303 332x raw = ::operator new(alloc_size);
304 }
305
306 // Initialize prefix
307 332x frame_prefix* prefix = static_cast<frame_prefix*>(raw);
308 332x prefix->next = nullptr;
309 332x prefix->queue = &q;
310 332x prefix->alloc_size = alloc_size;
311
312 // Return pointer AFTER the prefix (this is where coroutine frame goes)
313 332x return prefix + 1;
314 }
315
316 inline
317 void
318 strand_op::promise_type::operator delete(void* p, std::size_t)
319 {
320 // Calculate back to get the prefix
321 frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
322
323 // Add to free list
324 prefix->next = prefix->queue->free_list_;
325 prefix->queue->free_list_ = prefix;
326 }
327
328 } // namespace detail
329 } // namespace capy
330 } // namespace boost
331
332 #endif
333