LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 100.0 % 128 128
Test Date: 2026-03-24 19:38:46 Functions: 100.0 % 25 25

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Michael Vandeberg
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/boostorg/capy
       9                 : //
      10                 : 
      11                 : #include <boost/capy/ex/thread_pool.hpp>
      12                 : #include <boost/capy/continuation.hpp>
      13                 : #include <boost/capy/ex/frame_allocator.hpp>
      14                 : #include <boost/capy/test/thread_name.hpp>
      15                 : #include <algorithm>
      16                 : #include <atomic>
      17                 : #include <condition_variable>
      18                 : #include <cstdio>
      19                 : #include <mutex>
      20                 : #include <thread>
      21                 : #include <vector>
      22                 : 
      23                 : /*
      24                 :     Thread pool implementation using a shared work queue.
      25                 : 
      26                 :     Work items are continuations linked via their intrusive next pointer,
      27                 :     stored in a single queue protected by a mutex. No per-post heap
      28                 :     allocation: the continuation is owned by the caller and linked
      29                 :     directly. Worker threads wait on a condition_variable until work
      30                 :     is available or stop is requested.
      31                 : 
      32                 :     Threads are started lazily on first post() via std::call_once to avoid
      33                 :     spawning threads for pools that are constructed but never used. Each
      34                 :     thread is named with a configurable prefix plus index for debugger
      35                 :     visibility.
      36                 : 
      37                 :     Work tracking: on_work_started/on_work_finished maintain an atomic
      38                 :     outstanding_work_ counter. join() blocks until this counter reaches
      39                 :     zero, then signals workers to stop and joins threads.
      40                 : 
      41                 :     Two shutdown paths:
      42                 :     - join(): waits for outstanding work to drain, then stops workers.
      43                 :     - stop(): immediately signals workers to exit; queued work is abandoned.
      44                 :     - Destructor: stop() then join() (abandon + wait for threads).
      45                 : */
      46                 : 
      47                 : namespace boost {
      48                 : namespace capy {
      49                 : 
      50                 : //------------------------------------------------------------------------------
      51                 : 
      52                 : class thread_pool::impl
      53                 : {
      54                 :     // Intrusive queue of continuations via continuation::next.
      55                 :     // No per-post allocation: the continuation is owned by the caller.
      56                 :     continuation* head_ = nullptr;
      57                 :     continuation* tail_ = nullptr;
      58                 : 
      59 HIT         830 :     void push(continuation* c) noexcept
      60                 :     {
      61             830 :         c->next = nullptr;
      62             830 :         if(tail_)
      63             617 :             tail_->next = c;
      64                 :         else
      65             213 :             head_ = c;
      66             830 :         tail_ = c;
      67             830 :     }
      68                 : 
      69             987 :     continuation* pop() noexcept
      70                 :     {
      71             987 :         if(!head_)
      72             157 :             return nullptr;
      73             830 :         continuation* c = head_;
      74             830 :         head_ = head_->next;
      75             830 :         if(!head_)
      76             213 :             tail_ = nullptr;
      77             830 :         return c;
      78                 :     }
      79                 : 
      80            1026 :     bool empty() const noexcept
      81                 :     {
      82            1026 :         return head_ == nullptr;
      83                 :     }
      84                 : 
      85                 :     std::mutex mutex_;
      86                 :     std::condition_variable work_cv_;
      87                 :     std::condition_variable done_cv_;
      88                 :     std::vector<std::thread> threads_;
      89                 :     std::atomic<std::size_t> outstanding_work_{0};
      90                 :     bool stop_{false};
      91                 :     bool joined_{false};
      92                 :     std::size_t num_threads_;
      93                 :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
      94                 :     std::once_flag start_flag_;
      95                 : 
      96                 : public:
      97             157 :     ~impl() = default;
      98                 : 
      99                 :     // Destroy abandoned coroutine frames. Must be called
     100                 :     // before execution_context::shutdown()/destroy() so
     101                 :     // that suspended-frame destructors (e.g. delay_awaitable
     102                 :     // calling timer_service::cancel()) run while services
     103                 :     // are still valid.
     104                 :     void
     105             157 :     drain_abandoned() noexcept
     106                 :     {
     107             356 :         while(auto* c = pop())
     108                 :         {
     109             199 :             auto h = c->h;
     110             199 :             if(h && h != std::noop_coroutine())
     111             147 :                 h.destroy();
     112             199 :         }
     113             157 :     }
     114                 : 
     115             157 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
     116             157 :         : num_threads_(num_threads)
     117                 :     {
     118             157 :         if(num_threads_ == 0)
     119               4 :             num_threads_ = std::max(
     120               2 :                 std::thread::hardware_concurrency(), 1u);
     121                 : 
     122                 :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
     123             157 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
     124             157 :         thread_name_prefix_[n] = '\0';
     125             157 :     }
     126                 : 
     127                 :     void
     128             830 :     post(continuation& c)
     129                 :     {
     130             830 :         ensure_started();
     131                 :         {
     132             830 :             std::lock_guard<std::mutex> lock(mutex_);
     133             830 :             push(&c);
     134             830 :         }
     135             830 :         work_cv_.notify_one();
     136             830 :     }
     137                 : 
     138                 :     void
     139             345 :     on_work_started() noexcept
     140                 :     {
     141             345 :         outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
     142             345 :     }
     143                 : 
     144                 :     void
     145             345 :     on_work_finished() noexcept
     146                 :     {
     147             345 :         if(outstanding_work_.fetch_sub(
     148             345 :             1, std::memory_order_acq_rel) == 1)
     149                 :         {
     150              85 :             std::lock_guard<std::mutex> lock(mutex_);
     151              85 :             if(joined_ && !stop_)
     152               4 :                 stop_ = true;
     153              85 :             done_cv_.notify_all();
     154              85 :             work_cv_.notify_all();
     155              85 :         }
     156             345 :     }
     157                 : 
     158                 :     void
     159             168 :     join() noexcept
     160                 :     {
     161                 :         {
     162             168 :             std::unique_lock<std::mutex> lock(mutex_);
     163             168 :             if(joined_)
     164              11 :                 return;
     165             157 :             joined_ = true;
     166                 : 
     167             157 :             if(outstanding_work_.load(
     168             157 :                 std::memory_order_acquire) == 0)
     169                 :             {
     170             101 :                 stop_ = true;
     171             101 :                 work_cv_.notify_all();
     172                 :             }
     173                 :             else
     174                 :             {
     175              56 :                 done_cv_.wait(lock, [this]{
     176              61 :                     return stop_;
     177                 :                 });
     178                 :             }
     179             168 :         }
     180                 : 
     181             336 :         for(auto& t : threads_)
     182             179 :             if(t.joinable())
     183             179 :                 t.join();
     184                 :     }
     185                 : 
     186                 :     void
     187             159 :     stop() noexcept
     188                 :     {
     189                 :         {
     190             159 :             std::lock_guard<std::mutex> lock(mutex_);
     191             159 :             stop_ = true;
     192             159 :         }
     193             159 :         work_cv_.notify_all();
     194             159 :         done_cv_.notify_all();
     195             159 :     }
     196                 : 
     197                 : private:
     198                 :     void
     199             830 :     ensure_started()
     200                 :     {
     201             830 :         std::call_once(start_flag_, [this]{
     202             101 :             threads_.reserve(num_threads_);
     203             280 :             for(std::size_t i = 0; i < num_threads_; ++i)
     204             358 :                 threads_.emplace_back([this, i]{ run(i); });
     205             101 :         });
     206             830 :     }
     207                 : 
     208                 :     void
     209             179 :     run(std::size_t index)
     210                 :     {
     211                 :         // Build name; set_current_thread_name truncates to platform limits.
     212                 :         char name[16];
     213             179 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     214             179 :         set_current_thread_name(name);
     215                 : 
     216                 :         for(;;)
     217                 :         {
     218             810 :             continuation* c = nullptr;
     219                 :             {
     220             810 :                 std::unique_lock<std::mutex> lock(mutex_);
     221             810 :                 work_cv_.wait(lock, [this]{
     222            1329 :                     return !empty() ||
     223            1329 :                         stop_;
     224                 :                 });
     225             810 :                 if(stop_)
     226             358 :                     return;
     227             631 :                 c = pop();
     228             810 :             }
     229             631 :             if(c)
     230             631 :                 safe_resume(c->h);
     231             631 :         }
     232                 :     }
     233                 : };
     234                 : 
     235                 : //------------------------------------------------------------------------------
     236                 : 
     237             157 : thread_pool::
     238                 : ~thread_pool()
     239                 : {
     240             157 :     impl_->stop();
     241             157 :     impl_->join();
     242             157 :     impl_->drain_abandoned();
     243             157 :     shutdown();
     244             157 :     destroy();
     245             157 :     delete impl_;
     246             157 : }
     247                 : 
     248             157 : thread_pool::
     249             157 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     250             157 :     : impl_(new impl(num_threads, thread_name_prefix))
     251                 : {
     252             157 :     this->set_frame_allocator(std::allocator<void>{});
     253             157 : }
     254                 : 
     255                 : void
     256              11 : thread_pool::
     257                 : join() noexcept
     258                 : {
     259              11 :     impl_->join();
     260              11 : }
     261                 : 
     262                 : void
     263               2 : thread_pool::
     264                 : stop() noexcept
     265                 : {
     266               2 :     impl_->stop();
     267               2 : }
     268                 : 
     269                 : //------------------------------------------------------------------------------
     270                 : 
     271                 : thread_pool::executor_type
     272             163 : thread_pool::
     273                 : get_executor() const noexcept
     274                 : {
     275             163 :     return executor_type(
     276             163 :         const_cast<thread_pool&>(*this));
     277                 : }
     278                 : 
     279                 : void
     280             345 : thread_pool::executor_type::
     281                 : on_work_started() const noexcept
     282                 : {
     283             345 :     pool_->impl_->on_work_started();
     284             345 : }
     285                 : 
     286                 : void
     287             345 : thread_pool::executor_type::
     288                 : on_work_finished() const noexcept
     289                 : {
     290             345 :     pool_->impl_->on_work_finished();
     291             345 : }
     292                 : 
     293                 : void
     294             830 : thread_pool::executor_type::
     295                 : post(continuation& c) const
     296                 : {
     297             830 :     pool_->impl_->post(c);
     298             830 : }
     299                 : 
     300                 : } // capy
     301                 : } // boost
        

Generated by: LCOV version 2.3