1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
10  
#ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11  
#define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11  
#define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12  

12  

13  
#include <boost/corosio/detail/config.hpp>
13  
#include <boost/corosio/detail/config.hpp>
14  
#include <boost/corosio/detail/intrusive.hpp>
14  
#include <boost/corosio/detail/intrusive.hpp>
15  
#include <boost/capy/ex/execution_context.hpp>
15  
#include <boost/capy/ex/execution_context.hpp>
16  

16  

17  
#include <condition_variable>
17  
#include <condition_variable>
18  
#include <mutex>
18  
#include <mutex>
19  
#include <stdexcept>
19  
#include <stdexcept>
20  
#include <thread>
20  
#include <thread>
21  
#include <vector>
21  
#include <vector>
22  

22  

23  
namespace boost::corosio::detail {
23  
namespace boost::corosio::detail {
24  

24  

25  
/** Base class for thread pool work items.
25  
/** Base class for thread pool work items.
26  

26  

27  
    Derive from this to create work that can be posted to a
27  
    Derive from this to create work that can be posted to a
28  
    @ref thread_pool. Uses static function pointer dispatch,
28  
    @ref thread_pool. Uses static function pointer dispatch,
29  
    consistent with the IOCP `op` pattern.
29  
    consistent with the IOCP `op` pattern.
30  

30  

31  
    @par Example
31  
    @par Example
32  
    @code
32  
    @code
33  
    struct my_work : pool_work_item
33  
    struct my_work : pool_work_item
34  
    {
34  
    {
35  
        int* result;
35  
        int* result;
36  
        static void execute( pool_work_item* w ) noexcept
36  
        static void execute( pool_work_item* w ) noexcept
37  
        {
37  
        {
38  
            auto* self = static_cast<my_work*>( w );
38  
            auto* self = static_cast<my_work*>( w );
39  
            *self->result = 42;
39  
            *self->result = 42;
40  
        }
40  
        }
41  
    };
41  
    };
42  

42  

43  
    my_work w;
43  
    my_work w;
44  
    w.func_ = &my_work::execute;
44  
    w.func_ = &my_work::execute;
45  
    w.result = &r;
45  
    w.result = &r;
46  
    pool.post( &w );
46  
    pool.post( &w );
47  
    @endcode
47  
    @endcode
48  
*/
48  
*/
49  
struct pool_work_item : intrusive_queue<pool_work_item>::node
49  
struct pool_work_item : intrusive_queue<pool_work_item>::node
50  
{
50  
{
51  
    /// Static dispatch function signature.
51  
    /// Static dispatch function signature.
52  
    using func_type = void (*)(pool_work_item*) noexcept;
52  
    using func_type = void (*)(pool_work_item*) noexcept;
53  

53  

54  
    /// Completion handler invoked by the worker thread.
54  
    /// Completion handler invoked by the worker thread.
55  
    func_type func_ = nullptr;
55  
    func_type func_ = nullptr;
56  
};
56  
};
57  

57  

58  
/** Shared thread pool for dispatching blocking operations.
58  
/** Shared thread pool for dispatching blocking operations.
59  

59  

60  
    Provides a fixed pool of reusable worker threads for operations
60  
    Provides a fixed pool of reusable worker threads for operations
61  
    that cannot be integrated with async I/O (e.g. blocking DNS
61  
    that cannot be integrated with async I/O (e.g. blocking DNS
62  
    calls). Registered as an `execution_context::service` so it
62  
    calls). Registered as an `execution_context::service` so it
63  
    is a singleton per io_context.
63  
    is a singleton per io_context.
64  

64  

65  
    Threads are created eagerly in the constructor. The default
65  
    Threads are created eagerly in the constructor. The default
66  
    thread count is 1.
66  
    thread count is 1.
67  

67  

68  
    @par Thread Safety
68  
    @par Thread Safety
69  
    All public member functions are thread-safe.
69  
    All public member functions are thread-safe.
70  

70  

71  
    @par Shutdown
71  
    @par Shutdown
72  
    Sets a shutdown flag, notifies all threads, and joins them.
72  
    Sets a shutdown flag, notifies all threads, and joins them.
73  
    In-flight blocking calls complete naturally before the thread
73  
    In-flight blocking calls complete naturally before the thread
74  
    exits.
74  
    exits.
75  
*/
75  
*/
76  
class thread_pool final : public capy::execution_context::service
76  
class thread_pool final : public capy::execution_context::service
77  
{
77  
{
78  
    std::mutex mutex_;
78  
    std::mutex mutex_;
79  
    std::condition_variable cv_;
79  
    std::condition_variable cv_;
80  
    intrusive_queue<pool_work_item> work_queue_;
80  
    intrusive_queue<pool_work_item> work_queue_;
81  
    std::vector<std::thread> threads_;
81  
    std::vector<std::thread> threads_;
82  
    bool shutdown_ = false;
82  
    bool shutdown_ = false;
83  

83  

84  
    void worker_loop();
84  
    void worker_loop();
85  

85  

86  
public:
86  
public:
87  
    using key_type = thread_pool;
87  
    using key_type = thread_pool;
88  

88  

89  
    /** Construct the thread pool service.
89  
    /** Construct the thread pool service.
90  

90  

91  
        Eagerly creates all worker threads.
91  
        Eagerly creates all worker threads.
92  

92  

93  
        @par Exception Safety
93  
        @par Exception Safety
94  
        Strong guarantee. If thread creation fails, all
94  
        Strong guarantee. If thread creation fails, all
95  
        already-created threads are shut down and joined
95  
        already-created threads are shut down and joined
96  
        before the exception propagates.
96  
        before the exception propagates.
97  

97  

98  
        @param ctx Reference to the owning execution_context.
98  
        @param ctx Reference to the owning execution_context.
99  
        @param num_threads Number of worker threads. Must be
99  
        @param num_threads Number of worker threads. Must be
100  
               at least 1.
100  
               at least 1.
101  

101  

102  
        @throws std::logic_error If `num_threads` is 0.
102  
        @throws std::logic_error If `num_threads` is 0.
103  
    */
103  
    */
104  
    explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
104  
    explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
105  
    {
105  
    {
106  
        (void)ctx;
106  
        (void)ctx;
107  
        if (!num_threads)
107  
        if (!num_threads)
108  
            throw std::logic_error("thread_pool requires at least 1 thread");
108  
            throw std::logic_error("thread_pool requires at least 1 thread");
109  
        threads_.reserve(num_threads);
109  
        threads_.reserve(num_threads);
110  
        try
110  
        try
111  
        {
111  
        {
112  
            for (unsigned i = 0; i < num_threads; ++i)
112  
            for (unsigned i = 0; i < num_threads; ++i)
113  
                threads_.emplace_back([this] { worker_loop(); });
113  
                threads_.emplace_back([this] { worker_loop(); });
114  
        }
114  
        }
115  
        catch (...)
115  
        catch (...)
116  
        {
116  
        {
117  
            shutdown();
117  
            shutdown();
118  
            throw;
118  
            throw;
119  
        }
119  
        }
120  
    }
120  
    }
121  

121  

122  
    ~thread_pool() override = default;
122  
    ~thread_pool() override = default;
123  

123  

124  
    thread_pool(thread_pool const&)            = delete;
124  
    thread_pool(thread_pool const&)            = delete;
125  
    thread_pool& operator=(thread_pool const&) = delete;
125  
    thread_pool& operator=(thread_pool const&) = delete;
126  

126  

127  
    /** Enqueue a work item for execution on the thread pool.
127  
    /** Enqueue a work item for execution on the thread pool.
128  

128  

129  
        Zero-allocation: the caller owns the work item's storage.
129  
        Zero-allocation: the caller owns the work item's storage.
130  

130  

131  
        @param w The work item to execute. Must remain valid until
131  
        @param w The work item to execute. Must remain valid until
132  
                 its `func_` has been called.
132  
                 its `func_` has been called.
133  

133  

134  
        @return `true` if the item was enqueued, `false` if the
134  
        @return `true` if the item was enqueued, `false` if the
135  
                pool has already shut down.
135  
                pool has already shut down.
136  
    */
136  
    */
137  
    bool post(pool_work_item* w) noexcept;
137  
    bool post(pool_work_item* w) noexcept;
138  

138  

139  
    /** Shut down the thread pool.
139  
    /** Shut down the thread pool.
140  

140  

141  
        Signals all threads to exit after draining any
141  
        Signals all threads to exit after draining any
142  
        remaining queued work, then joins them.
142  
        remaining queued work, then joins them.
143  
    */
143  
    */
144  
    void shutdown() override;
144  
    void shutdown() override;
145  
};
145  
};
146  

146  

147  
inline void
147  
inline void
148  
thread_pool::worker_loop()
148  
thread_pool::worker_loop()
149  
{
149  
{
150  
    for (;;)
150  
    for (;;)
151  
    {
151  
    {
152  
        pool_work_item* w;
152  
        pool_work_item* w;
153  
        {
153  
        {
154  
            std::unique_lock<std::mutex> lock(mutex_);
154  
            std::unique_lock<std::mutex> lock(mutex_);
155  
            cv_.wait(
155  
            cv_.wait(
156  
                lock, [this] { return shutdown_ || !work_queue_.empty(); });
156  
                lock, [this] { return shutdown_ || !work_queue_.empty(); });
157  

157  

158  
            w = work_queue_.pop();
158  
            w = work_queue_.pop();
159  
            if (!w)
159  
            if (!w)
160  
            {
160  
            {
161  
                if (shutdown_)
161  
                if (shutdown_)
162  
                    return;
162  
                    return;
163  
                continue;
163  
                continue;
164  
            }
164  
            }
165  
        }
165  
        }
166  
        w->func_(w);
166  
        w->func_(w);
167  
    }
167  
    }
168  
}
168  
}
169  

169  

170  
inline bool
170  
inline bool
171  
thread_pool::post(pool_work_item* w) noexcept
171  
thread_pool::post(pool_work_item* w) noexcept
172  
{
172  
{
173  
    {
173  
    {
174  
        std::lock_guard<std::mutex> lock(mutex_);
174  
        std::lock_guard<std::mutex> lock(mutex_);
175  
        if (shutdown_)
175  
        if (shutdown_)
176  
            return false;
176  
            return false;
177  
        work_queue_.push(w);
177  
        work_queue_.push(w);
178  
    }
178  
    }
179  
    cv_.notify_one();
179  
    cv_.notify_one();
180  
    return true;
180  
    return true;
181  
}
181  
}
182  

182  

183  
inline void
183  
inline void
184  
thread_pool::shutdown()
184  
thread_pool::shutdown()
185  
{
185  
{
186  
    {
186  
    {
187  
        std::lock_guard<std::mutex> lock(mutex_);
187  
        std::lock_guard<std::mutex> lock(mutex_);
188  
        shutdown_ = true;
188  
        shutdown_ = true;
189  
    }
189  
    }
190  
    cv_.notify_all();
190  
    cv_.notify_all();
191  

191  

192  
    for (auto& t : threads_)
192  
    for (auto& t : threads_)
193  
    {
193  
    {
194  
        if (t.joinable())
194  
        if (t.joinable())
195  
            t.join();
195  
            t.join();
196  
    }
196  
    }
197  
    threads_.clear();
197  
    threads_.clear();
198  

198  

199  
    {
199  
    {
200  
        std::lock_guard<std::mutex> lock(mutex_);
200  
        std::lock_guard<std::mutex> lock(mutex_);
201  
        while (work_queue_.pop())
201  
        while (work_queue_.pop())
202  
            ;
202  
            ;
203  
    }
203  
    }
204  
}
204  
}
205  

205  

206  
} // namespace boost::corosio::detail
206  
} // namespace boost::corosio::detail
207  

207  

208  
#endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
208  
#endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP