LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex/detail - strand_queue.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 76.6 % 64 49 15
Test Date: 2026-03-24 19:38:46 Functions: 92.3 % 13 12 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
      11                 : #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/ex/frame_allocator.hpp>
      15                 : 
      16                 : #include <coroutine>
      17                 : #include <cstddef>
      18                 : #include <exception>
      19                 : 
      20                 : namespace boost {
      21                 : namespace capy {
      22                 : namespace detail {
      23                 : 
      24                 : class strand_queue;
      25                 : 
      26                 : //----------------------------------------------------------
      27                 : 
      28                 : // Metadata stored before the coroutine frame
      29                 : struct frame_prefix
      30                 : {
      31                 :     frame_prefix* next;
      32                 :     strand_queue* queue;
      33                 :     std::size_t alloc_size;
      34                 : };
      35                 : 
      36                 : //----------------------------------------------------------
      37                 : 
      38                 : /** Wrapper coroutine for strand queue dispatch operations.
      39                 : 
      40                 :     This coroutine wraps a target coroutine handle and resumes
      41                 :     it when dispatched. The wrapper ensures control returns to
      42                 :     the dispatch loop after the target suspends or completes.
      43                 : 
      44                 :     The promise contains an intrusive list node for queue
      45                 :     storage and supports a custom allocator that recycles
      46                 :     coroutine frames via a free list.
      47                 : */
      48                 : struct strand_op
      49                 : {
      50                 :     struct promise_type
      51                 :     {
      52                 :         promise_type* next = nullptr;
      53                 : 
      54                 :         void*
      55                 :         operator new(
      56                 :             std::size_t size,
      57                 :             strand_queue& q,
      58                 :             std::coroutine_handle<void>);
      59                 : 
      60                 :         void
      61                 :         operator delete(void* p, std::size_t);
      62                 : 
      63                 :         strand_op
      64 HIT         332 :         get_return_object() noexcept
      65                 :         {
      66             332 :             return {std::coroutine_handle<promise_type>::from_promise(*this)};
      67                 :         }
      68                 : 
      69                 :         std::suspend_always
      70             332 :         initial_suspend() noexcept
      71                 :         {
      72             332 :             return {};
      73                 :         }
      74                 : 
      75                 :         std::suspend_always
      76             332 :         final_suspend() noexcept
      77                 :         {
      78             332 :             return {};
      79                 :         }
      80                 : 
      81                 :         void
      82             332 :         return_void() noexcept
      83                 :         {
      84             332 :         }
      85                 : 
      86                 :         void
      87                 :         unhandled_exception()
      88                 :         {
      89                 :             std::terminate();
      90                 :         }
      91                 :     };
      92                 : 
      93                 :     std::coroutine_handle<promise_type> h_;
      94                 : };
      95                 : 
      96                 : //----------------------------------------------------------
      97                 : 
      98                 : /** Single-threaded dispatch queue for coroutine handles.
      99                 : 
     100                 :     This queue stores coroutine handles and resumes them
     101                 :     sequentially when dispatch() is called. Each pushed
     102                 :     handle is wrapped in a strand_op coroutine that ensures
     103                 :     control returns to the dispatch loop after the target
     104                 :     suspends or completes.
     105                 : 
     106                 :     The queue uses an intrusive singly-linked list through
     107                 :     the promise type to avoid separate node allocations.
     108                 :     A free list recycles wrapper coroutine frames to reduce
     109                 :     allocation overhead during repeated push/dispatch cycles.
     110                 : 
     111                 :     @par Thread Safety
     112                 :     This class is not thread-safe. All operations must be
     113                 :     called from a single thread.
     114                 : */
     115                 : class strand_queue
     116                 : {
     117                 :     using promise_type = strand_op::promise_type;
     118                 : 
     119                 :     promise_type* head_ = nullptr;
     120                 :     promise_type* tail_ = nullptr;
     121                 :     frame_prefix* free_list_ = nullptr;
     122                 : 
     123                 :     friend struct strand_op::promise_type;
     124                 : 
     125                 :     static
     126                 :     strand_op
     127             332 :     make_strand_op(
     128                 :         strand_queue& q,
     129                 :         std::coroutine_handle<void> target)
     130                 :     {
     131                 :         (void)q;
     132                 :         safe_resume(target);
     133                 :         co_return;
     134             664 :     }
     135                 : 
     136                 : public:
     137            4853 :     strand_queue() = default;
     138                 : 
     139                 :     strand_queue(strand_queue const&) = delete;
     140                 :     strand_queue& operator=(strand_queue const&) = delete;
     141                 : 
     142                 :     /** Destructor.
     143                 : 
     144                 :         Destroys any pending wrappers without resuming them,
     145                 :         then frees all memory in the free list.
     146                 :     */
     147            4853 :     ~strand_queue()
     148                 :     {
     149                 :         // Destroy pending wrappers
     150            4853 :         while(head_)
     151                 :         {
     152 MIS           0 :             promise_type* p = head_;
     153               0 :             head_ = p->next;
     154                 : 
     155               0 :             auto h = std::coroutine_handle<promise_type>::from_promise(*p);
     156               0 :             h.destroy();
     157                 :         }
     158                 : 
     159                 :         // Free the free list memory
     160 HIT        4853 :         while(free_list_)
     161                 :         {
     162 MIS           0 :             frame_prefix* prefix = free_list_;
     163               0 :             free_list_ = prefix->next;
     164               0 :             ::operator delete(prefix);
     165                 :         }
     166 HIT        4853 :     }
     167                 : 
     168                 :     /** Returns true if there are no pending operations.
     169                 :     */
     170                 :     bool
     171              23 :     empty() const noexcept
     172                 :     {
     173              23 :         return head_ == nullptr;
     174                 :     }
     175                 : 
     176                 :     /** Push a coroutine handle to the queue.
     177                 : 
     178                 :         Creates a wrapper coroutine and appends it to the
     179                 :         queue. The wrapper will resume the target handle
     180                 :         when dispatch() processes it.
     181                 : 
     182                 :         @param h The coroutine handle to dispatch.
     183                 :     */
     184                 :     void
     185             332 :     push(std::coroutine_handle<void> h)
     186                 :     {
     187             332 :         strand_op op = make_strand_op(*this, h);
     188                 : 
     189             332 :         promise_type* p = &op.h_.promise();
     190             332 :         p->next = nullptr;
     191                 : 
     192             332 :         if(tail_)
     193             309 :             tail_->next = p;
     194                 :         else
     195              23 :             head_ = p;
     196             332 :         tail_ = p;
     197             332 :     }
     198                 : 
     199                 :     /** Resume all queued coroutines in sequence.
     200                 : 
     201                 :         Processes each wrapper in FIFO order, resuming its
     202                 :         target coroutine. After each target suspends or
     203                 :         completes, the wrapper is destroyed and its frame
     204                 :         is added to the free list for reuse.
     205                 : 
     206                 :         Coroutines resumed during dispatch may push new
     207                 :         handles, which will also be processed in the same
     208                 :         dispatch call.
     209                 : 
     210                 :         @warning Not thread-safe. Do not call while another
     211                 :             thread may be calling push().
     212                 :     */
     213                 :     void
     214                 :     dispatch()
     215                 :     {
     216                 :         while(head_)
     217                 :         {
     218                 :             promise_type* p = head_;
     219                 :             head_ = p->next;
     220                 :             if(!head_)
     221                 :                 tail_ = nullptr;
     222                 : 
     223                 :             auto h = std::coroutine_handle<promise_type>::from_promise(*p);
     224                 :             safe_resume(h);
     225                 :             h.destroy();
     226                 :         }
     227                 :     }
     228                 : 
     229                 :     /** Batch of taken items for thread-safe dispatch. */
     230                 :     struct taken_batch
     231                 :     {
     232                 :         promise_type* head = nullptr;
     233                 :         promise_type* tail = nullptr;
     234                 :     };
     235                 : 
     236                 :     /** Take all pending items atomically.
     237                 : 
     238                 :         Removes all items from the queue and returns them
     239                 :         as a batch. The queue is left empty.
     240                 : 
     241                 :         @return The batch of taken items.
     242                 :     */
     243                 :     taken_batch
     244              23 :     take_all() noexcept
     245                 :     {
     246              23 :         taken_batch batch{head_, tail_};
     247              23 :         head_ = tail_ = nullptr;
     248              23 :         return batch;
     249                 :     }
     250                 : 
     251                 :     /** Dispatch a batch of taken items.
     252                 : 
     253                 :         @param batch The batch to dispatch.
     254                 : 
     255                 :         @note This is thread-safe w.r.t. push() because it doesn't
     256                 :             access the queue's free_list_. Frames are deleted directly
     257                 :             rather than recycled.
     258                 :     */
     259                 :     static
     260                 :     void
     261              23 :     dispatch_batch(taken_batch& batch)
     262                 :     {
     263             355 :         while(batch.head)
     264                 :         {
     265             332 :             promise_type* p = batch.head;
     266             332 :             batch.head = p->next;
     267                 : 
     268             332 :             auto h = std::coroutine_handle<promise_type>::from_promise(*p);
     269             332 :             safe_resume(h);
     270                 :             // Don't use h.destroy() - it would call operator delete which
     271                 :             // accesses the queue's free_list_ (race with push).
     272                 :             // Instead, manually free the frame without recycling.
     273                 :             // h.address() returns the frame base (what operator new returned).
     274             332 :             frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
     275             332 :             ::operator delete(prefix);
     276                 :         }
     277              23 :         batch.tail = nullptr;
     278              23 :     }
     279                 : };
     280                 : 
     281                 : //----------------------------------------------------------
     282                 : 
     283                 : inline
     284                 : void*
     285             332 : strand_op::promise_type::operator new(
     286                 :     std::size_t size,
     287                 :     strand_queue& q,
     288                 :     std::coroutine_handle<void>)
     289                 : {
     290                 :     // Total size includes prefix
     291             332 :     std::size_t alloc_size = size + sizeof(frame_prefix);
     292                 :     void* raw;
     293                 :     
     294                 :     // Try to reuse from free list
     295             332 :     if(q.free_list_)
     296                 :     {
     297 MIS           0 :         frame_prefix* prefix = q.free_list_;
     298               0 :         q.free_list_ = prefix->next;
     299               0 :         raw = prefix;
     300                 :     }
     301                 :     else
     302                 :     {
     303 HIT         332 :         raw = ::operator new(alloc_size);
     304                 :     }
     305                 : 
     306                 :     // Initialize prefix
     307             332 :     frame_prefix* prefix = static_cast<frame_prefix*>(raw);
     308             332 :     prefix->next = nullptr;
     309             332 :     prefix->queue = &q;
     310             332 :     prefix->alloc_size = alloc_size;
     311                 : 
     312                 :     // Return pointer AFTER the prefix (this is where coroutine frame goes)
     313             332 :     return prefix + 1;
     314                 : }
     315                 : 
     316                 : inline
     317                 : void
     318 MIS           0 : strand_op::promise_type::operator delete(void* p, std::size_t)
     319                 : {
     320                 :     // Calculate back to get the prefix
     321               0 :     frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
     322                 : 
     323                 :     // Add to free list
     324               0 :     prefix->next = prefix->queue->free_list_;
     325               0 :     prefix->queue->free_list_ = prefix;
     326               0 : }
     327                 : 
     328                 : } // namespace detail
     329                 : } // namespace capy
     330                 : } // namespace boost
     331                 : 
     332                 : #endif
        

Generated by: LCOV version 2.3