TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11 : #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/ex/frame_allocator.hpp>
15 :
16 : #include <coroutine>
17 : #include <cstddef>
18 : #include <exception>
19 :
20 : namespace boost {
21 : namespace capy {
22 : namespace detail {
23 :
24 : class strand_queue;
25 :
26 : //----------------------------------------------------------
27 :
28 : // Metadata stored before the coroutine frame
29 : struct frame_prefix
30 : {
31 : frame_prefix* next;
32 : strand_queue* queue;
33 : std::size_t alloc_size;
34 : };
35 :
36 : //----------------------------------------------------------
37 :
38 : /** Wrapper coroutine for strand queue dispatch operations.
39 :
40 : This coroutine wraps a target coroutine handle and resumes
41 : it when dispatched. The wrapper ensures control returns to
42 : the dispatch loop after the target suspends or completes.
43 :
44 : The promise contains an intrusive list node for queue
45 : storage and supports a custom allocator that recycles
46 : coroutine frames via a free list.
47 : */
48 : struct strand_op
49 : {
50 : struct promise_type
51 : {
52 : promise_type* next = nullptr;
53 :
54 : void*
55 : operator new(
56 : std::size_t size,
57 : strand_queue& q,
58 : std::coroutine_handle<void>);
59 :
60 : void
61 : operator delete(void* p, std::size_t);
62 :
63 : strand_op
64 HIT 332 : get_return_object() noexcept
65 : {
66 332 : return {std::coroutine_handle<promise_type>::from_promise(*this)};
67 : }
68 :
69 : std::suspend_always
70 332 : initial_suspend() noexcept
71 : {
72 332 : return {};
73 : }
74 :
75 : std::suspend_always
76 332 : final_suspend() noexcept
77 : {
78 332 : return {};
79 : }
80 :
81 : void
82 332 : return_void() noexcept
83 : {
84 332 : }
85 :
86 : void
87 : unhandled_exception()
88 : {
89 : std::terminate();
90 : }
91 : };
92 :
93 : std::coroutine_handle<promise_type> h_;
94 : };
95 :
96 : //----------------------------------------------------------
97 :
98 : /** Single-threaded dispatch queue for coroutine handles.
99 :
100 : This queue stores coroutine handles and resumes them
101 : sequentially when dispatch() is called. Each pushed
102 : handle is wrapped in a strand_op coroutine that ensures
103 : control returns to the dispatch loop after the target
104 : suspends or completes.
105 :
106 : The queue uses an intrusive singly-linked list through
107 : the promise type to avoid separate node allocations.
108 : A free list recycles wrapper coroutine frames to reduce
109 : allocation overhead during repeated push/dispatch cycles.
110 :
111 : @par Thread Safety
112 : This class is not thread-safe. All operations must be
113 : called from a single thread.
114 : */
115 : class strand_queue
116 : {
117 : using promise_type = strand_op::promise_type;
118 :
119 : promise_type* head_ = nullptr;
120 : promise_type* tail_ = nullptr;
121 : frame_prefix* free_list_ = nullptr;
122 :
123 : friend struct strand_op::promise_type;
124 :
125 : static
126 : strand_op
127 332 : make_strand_op(
128 : strand_queue& q,
129 : std::coroutine_handle<void> target)
130 : {
131 : (void)q;
132 : safe_resume(target);
133 : co_return;
134 664 : }
135 :
136 : public:
137 4853 : strand_queue() = default;
138 :
139 : strand_queue(strand_queue const&) = delete;
140 : strand_queue& operator=(strand_queue const&) = delete;
141 :
142 : /** Destructor.
143 :
144 : Destroys any pending wrappers without resuming them,
145 : then frees all memory in the free list.
146 : */
147 4853 : ~strand_queue()
148 : {
149 : // Destroy pending wrappers
150 4853 : while(head_)
151 : {
152 MIS 0 : promise_type* p = head_;
153 0 : head_ = p->next;
154 :
155 0 : auto h = std::coroutine_handle<promise_type>::from_promise(*p);
156 0 : h.destroy();
157 : }
158 :
159 : // Free the free list memory
160 HIT 4853 : while(free_list_)
161 : {
162 MIS 0 : frame_prefix* prefix = free_list_;
163 0 : free_list_ = prefix->next;
164 0 : ::operator delete(prefix);
165 : }
166 HIT 4853 : }
167 :
168 : /** Returns true if there are no pending operations.
169 : */
170 : bool
171 23 : empty() const noexcept
172 : {
173 23 : return head_ == nullptr;
174 : }
175 :
176 : /** Push a coroutine handle to the queue.
177 :
178 : Creates a wrapper coroutine and appends it to the
179 : queue. The wrapper will resume the target handle
180 : when dispatch() processes it.
181 :
182 : @param h The coroutine handle to dispatch.
183 : */
184 : void
185 332 : push(std::coroutine_handle<void> h)
186 : {
187 332 : strand_op op = make_strand_op(*this, h);
188 :
189 332 : promise_type* p = &op.h_.promise();
190 332 : p->next = nullptr;
191 :
192 332 : if(tail_)
193 309 : tail_->next = p;
194 : else
195 23 : head_ = p;
196 332 : tail_ = p;
197 332 : }
198 :
199 : /** Resume all queued coroutines in sequence.
200 :
201 : Processes each wrapper in FIFO order, resuming its
202 : target coroutine. After each target suspends or
203 : completes, the wrapper is destroyed and its frame
204 : is added to the free list for reuse.
205 :
206 : Coroutines resumed during dispatch may push new
207 : handles, which will also be processed in the same
208 : dispatch call.
209 :
210 : @warning Not thread-safe. Do not call while another
211 : thread may be calling push().
212 : */
213 : void
214 : dispatch()
215 : {
216 : while(head_)
217 : {
218 : promise_type* p = head_;
219 : head_ = p->next;
220 : if(!head_)
221 : tail_ = nullptr;
222 :
223 : auto h = std::coroutine_handle<promise_type>::from_promise(*p);
224 : safe_resume(h);
225 : h.destroy();
226 : }
227 : }
228 :
229 : /** Batch of taken items for thread-safe dispatch. */
230 : struct taken_batch
231 : {
232 : promise_type* head = nullptr;
233 : promise_type* tail = nullptr;
234 : };
235 :
236 : /** Take all pending items atomically.
237 :
238 : Removes all items from the queue and returns them
239 : as a batch. The queue is left empty.
240 :
241 : @return The batch of taken items.
242 : */
243 : taken_batch
244 23 : take_all() noexcept
245 : {
246 23 : taken_batch batch{head_, tail_};
247 23 : head_ = tail_ = nullptr;
248 23 : return batch;
249 : }
250 :
251 : /** Dispatch a batch of taken items.
252 :
253 : @param batch The batch to dispatch.
254 :
255 : @note This is thread-safe w.r.t. push() because it doesn't
256 : access the queue's free_list_. Frames are deleted directly
257 : rather than recycled.
258 : */
259 : static
260 : void
261 23 : dispatch_batch(taken_batch& batch)
262 : {
263 355 : while(batch.head)
264 : {
265 332 : promise_type* p = batch.head;
266 332 : batch.head = p->next;
267 :
268 332 : auto h = std::coroutine_handle<promise_type>::from_promise(*p);
269 332 : safe_resume(h);
270 : // Don't use h.destroy() - it would call operator delete which
271 : // accesses the queue's free_list_ (race with push).
272 : // Instead, manually free the frame without recycling.
273 : // h.address() returns the frame base (what operator new returned).
274 332 : frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
275 332 : ::operator delete(prefix);
276 : }
277 23 : batch.tail = nullptr;
278 23 : }
279 : };
280 :
281 : //----------------------------------------------------------
282 :
283 : inline
284 : void*
285 332 : strand_op::promise_type::operator new(
286 : std::size_t size,
287 : strand_queue& q,
288 : std::coroutine_handle<void>)
289 : {
290 : // Total size includes prefix
291 332 : std::size_t alloc_size = size + sizeof(frame_prefix);
292 : void* raw;
293 :
294 : // Try to reuse from free list
295 332 : if(q.free_list_)
296 : {
297 MIS 0 : frame_prefix* prefix = q.free_list_;
298 0 : q.free_list_ = prefix->next;
299 0 : raw = prefix;
300 : }
301 : else
302 : {
303 HIT 332 : raw = ::operator new(alloc_size);
304 : }
305 :
306 : // Initialize prefix
307 332 : frame_prefix* prefix = static_cast<frame_prefix*>(raw);
308 332 : prefix->next = nullptr;
309 332 : prefix->queue = &q;
310 332 : prefix->alloc_size = alloc_size;
311 :
312 : // Return pointer AFTER the prefix (this is where coroutine frame goes)
313 332 : return prefix + 1;
314 : }
315 :
316 : inline
317 : void
318 MIS 0 : strand_op::promise_type::operator delete(void* p, std::size_t)
319 : {
320 : // Calculate back to get the prefix
321 0 : frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
322 :
323 : // Add to free list
324 0 : prefix->next = prefix->queue->free_list_;
325 0 : prefix->queue->free_list_ = prefix;
326 0 : }
327 :
328 : } // namespace detail
329 : } // namespace capy
330 : } // namespace boost
331 :
332 : #endif
|