1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
10  
#ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11  
#define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11  
#define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
 
14 +
#include <boost/capy/ex/frame_allocator.hpp>
14  

15  

15  
#include <coroutine>
16  
#include <coroutine>
16  
#include <cstddef>
17  
#include <cstddef>
17  
#include <exception>
18  
#include <exception>
18  

19  

19  
namespace boost {
20  
namespace boost {
20  
namespace capy {
21  
namespace capy {
21  
namespace detail {
22  
namespace detail {
22  

23  

23  
class strand_queue;
24  
class strand_queue;
24  

25  

25  
//----------------------------------------------------------
26  
//----------------------------------------------------------
26  

27  

27  
// Metadata stored before the coroutine frame
28  
// Metadata stored before the coroutine frame
28  
struct frame_prefix
29  
struct frame_prefix
29  
{
30  
{
30  
    frame_prefix* next;
31  
    frame_prefix* next;
31  
    strand_queue* queue;
32  
    strand_queue* queue;
32  
    std::size_t alloc_size;
33  
    std::size_t alloc_size;
33  
};
34  
};
34  

35  

35  
//----------------------------------------------------------
36  
//----------------------------------------------------------
36  

37  

37  
/** Wrapper coroutine for strand queue dispatch operations.
38  
/** Wrapper coroutine for strand queue dispatch operations.
38  

39  

39  
    This coroutine wraps a target coroutine handle and resumes
40  
    This coroutine wraps a target coroutine handle and resumes
40  
    it when dispatched. The wrapper ensures control returns to
41  
    it when dispatched. The wrapper ensures control returns to
41  
    the dispatch loop after the target suspends or completes.
42  
    the dispatch loop after the target suspends or completes.
42  

43  

43  
    The promise contains an intrusive list node for queue
44  
    The promise contains an intrusive list node for queue
44  
    storage and supports a custom allocator that recycles
45  
    storage and supports a custom allocator that recycles
45  
    coroutine frames via a free list.
46  
    coroutine frames via a free list.
46  
*/
47  
*/
47  
struct strand_op
48  
struct strand_op
48  
{
49  
{
49  
    struct promise_type
50  
    struct promise_type
50  
    {
51  
    {
51  
        promise_type* next = nullptr;
52  
        promise_type* next = nullptr;
52  

53  

53  
        void*
54  
        void*
54  
        operator new(
55  
        operator new(
55  
            std::size_t size,
56  
            std::size_t size,
56  
            strand_queue& q,
57  
            strand_queue& q,
57  
            std::coroutine_handle<void>);
58  
            std::coroutine_handle<void>);
58  

59  

59  
        void
60  
        void
60  
        operator delete(void* p, std::size_t);
61  
        operator delete(void* p, std::size_t);
61  

62  

62  
        strand_op
63  
        strand_op
63  
        get_return_object() noexcept
64  
        get_return_object() noexcept
64  
        {
65  
        {
65  
            return {std::coroutine_handle<promise_type>::from_promise(*this)};
66  
            return {std::coroutine_handle<promise_type>::from_promise(*this)};
66  
        }
67  
        }
67  

68  

68  
        std::suspend_always
69  
        std::suspend_always
69  
        initial_suspend() noexcept
70  
        initial_suspend() noexcept
70  
        {
71  
        {
71  
            return {};
72  
            return {};
72  
        }
73  
        }
73  

74  

74  
        std::suspend_always
75  
        std::suspend_always
75  
        final_suspend() noexcept
76  
        final_suspend() noexcept
76  
        {
77  
        {
77  
            return {};
78  
            return {};
78  
        }
79  
        }
79  

80  

80  
        void
81  
        void
81  
        return_void() noexcept
82  
        return_void() noexcept
82  
        {
83  
        {
83  
        }
84  
        }
84  

85  

85  
        void
86  
        void
86  
        unhandled_exception()
87  
        unhandled_exception()
87  
        {
88  
        {
88  
            std::terminate();
89  
            std::terminate();
89  
        }
90  
        }
90  
    };
91  
    };
91  

92  

92  
    std::coroutine_handle<promise_type> h_;
93  
    std::coroutine_handle<promise_type> h_;
93  
};
94  
};
94  

95  

95  
//----------------------------------------------------------
96  
//----------------------------------------------------------
96  

97  

97  
/** Single-threaded dispatch queue for coroutine handles.
98  
/** Single-threaded dispatch queue for coroutine handles.
98  

99  

99  
    This queue stores coroutine handles and resumes them
100  
    This queue stores coroutine handles and resumes them
100  
    sequentially when dispatch() is called. Each pushed
101  
    sequentially when dispatch() is called. Each pushed
101  
    handle is wrapped in a strand_op coroutine that ensures
102  
    handle is wrapped in a strand_op coroutine that ensures
102  
    control returns to the dispatch loop after the target
103  
    control returns to the dispatch loop after the target
103  
    suspends or completes.
104  
    suspends or completes.
104  

105  

105  
    The queue uses an intrusive singly-linked list through
106  
    The queue uses an intrusive singly-linked list through
106  
    the promise type to avoid separate node allocations.
107  
    the promise type to avoid separate node allocations.
107  
    A free list recycles wrapper coroutine frames to reduce
108  
    A free list recycles wrapper coroutine frames to reduce
108  
    allocation overhead during repeated push/dispatch cycles.
109  
    allocation overhead during repeated push/dispatch cycles.
109  

110  

110  
    @par Thread Safety
111  
    @par Thread Safety
111  
    This class is not thread-safe. All operations must be
112  
    This class is not thread-safe. All operations must be
112  
    called from a single thread.
113  
    called from a single thread.
113  
*/
114  
*/
114  
class strand_queue
115  
class strand_queue
115  
{
116  
{
116  
    using promise_type = strand_op::promise_type;
117  
    using promise_type = strand_op::promise_type;
117  

118  

118  
    promise_type* head_ = nullptr;
119  
    promise_type* head_ = nullptr;
119  
    promise_type* tail_ = nullptr;
120  
    promise_type* tail_ = nullptr;
120  
    frame_prefix* free_list_ = nullptr;
121  
    frame_prefix* free_list_ = nullptr;
121  

122  

122  
    friend struct strand_op::promise_type;
123  
    friend struct strand_op::promise_type;
123  

124  

124  
    static
125  
    static
125  
    strand_op
126  
    strand_op
126  
    make_strand_op(
127  
    make_strand_op(
127  
        strand_queue& q,
128  
        strand_queue& q,
128  
        std::coroutine_handle<void> target)
129  
        std::coroutine_handle<void> target)
129  
    {
130  
    {
130  
        (void)q;
131  
        (void)q;
131 -
        target.resume();
132 +
        safe_resume(target);
132  
        co_return;
133  
        co_return;
133  
    }
134  
    }
134  

135  

135  
public:
136  
public:
136  
    strand_queue() = default;
137  
    strand_queue() = default;
137  

138  

138  
    strand_queue(strand_queue const&) = delete;
139  
    strand_queue(strand_queue const&) = delete;
139  
    strand_queue& operator=(strand_queue const&) = delete;
140  
    strand_queue& operator=(strand_queue const&) = delete;
140  

141  

141  
    /** Destructor.
142  
    /** Destructor.
142  

143  

143  
        Destroys any pending wrappers without resuming them,
144  
        Destroys any pending wrappers without resuming them,
144  
        then frees all memory in the free list.
145  
        then frees all memory in the free list.
145  
    */
146  
    */
146  
    ~strand_queue()
147  
    ~strand_queue()
147  
    {
148  
    {
148  
        // Destroy pending wrappers
149  
        // Destroy pending wrappers
149  
        while(head_)
150  
        while(head_)
150  
        {
151  
        {
151  
            promise_type* p = head_;
152  
            promise_type* p = head_;
152  
            head_ = p->next;
153  
            head_ = p->next;
153  

154  

154  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
155  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
155  
            h.destroy();
156  
            h.destroy();
156  
        }
157  
        }
157  

158  

158  
        // Free the free list memory
159  
        // Free the free list memory
159  
        while(free_list_)
160  
        while(free_list_)
160  
        {
161  
        {
161  
            frame_prefix* prefix = free_list_;
162  
            frame_prefix* prefix = free_list_;
162  
            free_list_ = prefix->next;
163  
            free_list_ = prefix->next;
163  
            ::operator delete(prefix);
164  
            ::operator delete(prefix);
164  
        }
165  
        }
165  
    }
166  
    }
166  

167  

167  
    /** Returns true if there are no pending operations.
168  
    /** Returns true if there are no pending operations.
168  
    */
169  
    */
169  
    bool
170  
    bool
170  
    empty() const noexcept
171  
    empty() const noexcept
171  
    {
172  
    {
172  
        return head_ == nullptr;
173  
        return head_ == nullptr;
173  
    }
174  
    }
174  

175  

175  
    /** Push a coroutine handle to the queue.
176  
    /** Push a coroutine handle to the queue.
176  

177  

177  
        Creates a wrapper coroutine and appends it to the
178  
        Creates a wrapper coroutine and appends it to the
178  
        queue. The wrapper will resume the target handle
179  
        queue. The wrapper will resume the target handle
179  
        when dispatch() processes it.
180  
        when dispatch() processes it.
180  

181  

181  
        @param h The coroutine handle to dispatch.
182  
        @param h The coroutine handle to dispatch.
182  
    */
183  
    */
183  
    void
184  
    void
184  
    push(std::coroutine_handle<void> h)
185  
    push(std::coroutine_handle<void> h)
185  
    {
186  
    {
186  
        strand_op op = make_strand_op(*this, h);
187  
        strand_op op = make_strand_op(*this, h);
187  

188  

188  
        promise_type* p = &op.h_.promise();
189  
        promise_type* p = &op.h_.promise();
189  
        p->next = nullptr;
190  
        p->next = nullptr;
190  

191  

191  
        if(tail_)
192  
        if(tail_)
192  
            tail_->next = p;
193  
            tail_->next = p;
193  
        else
194  
        else
194  
            head_ = p;
195  
            head_ = p;
195  
        tail_ = p;
196  
        tail_ = p;
196  
    }
197  
    }
197  

198  

198  
    /** Resume all queued coroutines in sequence.
199  
    /** Resume all queued coroutines in sequence.
199  

200  

200  
        Processes each wrapper in FIFO order, resuming its
201  
        Processes each wrapper in FIFO order, resuming its
201  
        target coroutine. After each target suspends or
202  
        target coroutine. After each target suspends or
202  
        completes, the wrapper is destroyed and its frame
203  
        completes, the wrapper is destroyed and its frame
203  
        is added to the free list for reuse.
204  
        is added to the free list for reuse.
204  

205  

205  
        Coroutines resumed during dispatch may push new
206  
        Coroutines resumed during dispatch may push new
206  
        handles, which will also be processed in the same
207  
        handles, which will also be processed in the same
207  
        dispatch call.
208  
        dispatch call.
208  

209  

209  
        @warning Not thread-safe. Do not call while another
210  
        @warning Not thread-safe. Do not call while another
210  
            thread may be calling push().
211  
            thread may be calling push().
211  
    */
212  
    */
212  
    void
213  
    void
213  
    dispatch()
214  
    dispatch()
214  
    {
215  
    {
215  
        while(head_)
216  
        while(head_)
216  
        {
217  
        {
217  
            promise_type* p = head_;
218  
            promise_type* p = head_;
218  
            head_ = p->next;
219  
            head_ = p->next;
219  
            if(!head_)
220  
            if(!head_)
220  
                tail_ = nullptr;
221  
                tail_ = nullptr;
221  

222  

222  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
223  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
223 -
            h.resume();
224 +
            safe_resume(h);
224  
            h.destroy();
225  
            h.destroy();
225  
        }
226  
        }
226  
    }
227  
    }
227  

228  

228  
    /** Batch of taken items for thread-safe dispatch. */
229  
    /** Batch of taken items for thread-safe dispatch. */
229  
    struct taken_batch
230  
    struct taken_batch
230  
    {
231  
    {
231  
        promise_type* head = nullptr;
232  
        promise_type* head = nullptr;
232  
        promise_type* tail = nullptr;
233  
        promise_type* tail = nullptr;
233  
    };
234  
    };
234  

235  

235  
    /** Take all pending items atomically.
236  
    /** Take all pending items atomically.
236  

237  

237  
        Removes all items from the queue and returns them
238  
        Removes all items from the queue and returns them
238  
        as a batch. The queue is left empty.
239  
        as a batch. The queue is left empty.
239  

240  

240  
        @return The batch of taken items.
241  
        @return The batch of taken items.
241  
    */
242  
    */
242  
    taken_batch
243  
    taken_batch
243  
    take_all() noexcept
244  
    take_all() noexcept
244  
    {
245  
    {
245  
        taken_batch batch{head_, tail_};
246  
        taken_batch batch{head_, tail_};
246  
        head_ = tail_ = nullptr;
247  
        head_ = tail_ = nullptr;
247  
        return batch;
248  
        return batch;
248  
    }
249  
    }
249  

250  

250  
    /** Dispatch a batch of taken items.
251  
    /** Dispatch a batch of taken items.
251  

252  

252  
        @param batch The batch to dispatch.
253  
        @param batch The batch to dispatch.
253  

254  

254  
        @note This is thread-safe w.r.t. push() because it doesn't
255  
        @note This is thread-safe w.r.t. push() because it doesn't
255  
            access the queue's free_list_. Frames are deleted directly
256  
            access the queue's free_list_. Frames are deleted directly
256  
            rather than recycled.
257  
            rather than recycled.
257  
    */
258  
    */
258  
    static
259  
    static
259  
    void
260  
    void
260  
    dispatch_batch(taken_batch& batch)
261  
    dispatch_batch(taken_batch& batch)
261  
    {
262  
    {
262  
        while(batch.head)
263  
        while(batch.head)
263  
        {
264  
        {
264  
            promise_type* p = batch.head;
265  
            promise_type* p = batch.head;
265  
            batch.head = p->next;
266  
            batch.head = p->next;
266  

267  

267  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
268  
            auto h = std::coroutine_handle<promise_type>::from_promise(*p);
268 -
            h.resume();
269 +
            safe_resume(h);
269  
            // Don't use h.destroy() - it would call operator delete which
270  
            // Don't use h.destroy() - it would call operator delete which
270  
            // accesses the queue's free_list_ (race with push).
271  
            // accesses the queue's free_list_ (race with push).
271  
            // Instead, manually free the frame without recycling.
272  
            // Instead, manually free the frame without recycling.
272  
            // h.address() returns the frame base (what operator new returned).
273  
            // h.address() returns the frame base (what operator new returned).
273  
            frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
274  
            frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
274  
            ::operator delete(prefix);
275  
            ::operator delete(prefix);
275  
        }
276  
        }
276  
        batch.tail = nullptr;
277  
        batch.tail = nullptr;
277  
    }
278  
    }
278  
};
279  
};
279  

280  

280  
//----------------------------------------------------------
281  
//----------------------------------------------------------
281  

282  

282  
inline
283  
inline
283  
void*
284  
void*
284  
strand_op::promise_type::operator new(
285  
strand_op::promise_type::operator new(
285  
    std::size_t size,
286  
    std::size_t size,
286  
    strand_queue& q,
287  
    strand_queue& q,
287  
    std::coroutine_handle<void>)
288  
    std::coroutine_handle<void>)
288  
{
289  
{
289  
    // Total size includes prefix
290  
    // Total size includes prefix
290  
    std::size_t alloc_size = size + sizeof(frame_prefix);
291  
    std::size_t alloc_size = size + sizeof(frame_prefix);
291  
    void* raw;
292  
    void* raw;
292  
    
293  
    
293  
    // Try to reuse from free list
294  
    // Try to reuse from free list
294  
    if(q.free_list_)
295  
    if(q.free_list_)
295  
    {
296  
    {
296  
        frame_prefix* prefix = q.free_list_;
297  
        frame_prefix* prefix = q.free_list_;
297  
        q.free_list_ = prefix->next;
298  
        q.free_list_ = prefix->next;
298  
        raw = prefix;
299  
        raw = prefix;
299  
    }
300  
    }
300  
    else
301  
    else
301  
    {
302  
    {
302  
        raw = ::operator new(alloc_size);
303  
        raw = ::operator new(alloc_size);
303  
    }
304  
    }
304  

305  

305  
    // Initialize prefix
306  
    // Initialize prefix
306  
    frame_prefix* prefix = static_cast<frame_prefix*>(raw);
307  
    frame_prefix* prefix = static_cast<frame_prefix*>(raw);
307  
    prefix->next = nullptr;
308  
    prefix->next = nullptr;
308  
    prefix->queue = &q;
309  
    prefix->queue = &q;
309  
    prefix->alloc_size = alloc_size;
310  
    prefix->alloc_size = alloc_size;
310  

311  

311  
    // Return pointer AFTER the prefix (this is where coroutine frame goes)
312  
    // Return pointer AFTER the prefix (this is where coroutine frame goes)
312  
    return prefix + 1;
313  
    return prefix + 1;
313  
}
314  
}
314  

315  

315  
inline
316  
inline
316  
void
317  
void
317  
strand_op::promise_type::operator delete(void* p, std::size_t)
318  
strand_op::promise_type::operator delete(void* p, std::size_t)
318  
{
319  
{
319  
    // Calculate back to get the prefix
320  
    // Calculate back to get the prefix
320  
    frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
321  
    frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
321  

322  

322  
    // Add to free list
323  
    // Add to free list
323  
    prefix->next = prefix->queue->free_list_;
324  
    prefix->next = prefix->queue->free_list_;
324  
    prefix->queue->free_list_ = prefix;
325  
    prefix->queue->free_list_ = prefix;
325  
}
326  
}
326  

327  

327  
} // namespace detail
328  
} // namespace detail
328  
} // namespace capy
329  
} // namespace capy
329  
} // namespace boost
330  
} // namespace boost
330  

331  

331  
#endif
332  
#endif