TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11 : #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/detail/intrusive.hpp>
15 : #include <boost/capy/ex/execution_context.hpp>
16 :
17 : #include <condition_variable>
18 : #include <mutex>
19 : #include <stdexcept>
20 : #include <thread>
21 : #include <vector>
22 :
23 : namespace boost::corosio::detail {
24 :
25 : /** Base class for thread pool work items.
26 :
27 : Derive from this to create work that can be posted to a
28 : @ref thread_pool. Uses static function pointer dispatch,
29 : consistent with the IOCP `op` pattern.
30 :
31 : @par Example
32 : @code
33 : struct my_work : pool_work_item
34 : {
35 : int* result;
36 : static void execute( pool_work_item* w ) noexcept
37 : {
38 : auto* self = static_cast<my_work*>( w );
39 : *self->result = 42;
40 : }
41 : };
42 :
43 : my_work w;
44 : w.func_ = &my_work::execute;
45 : w.result = &r;
46 : pool.post( &w );
47 : @endcode
48 : */
49 : struct pool_work_item : intrusive_queue<pool_work_item>::node
50 : {
51 : /// Static dispatch function signature.
52 : using func_type = void (*)(pool_work_item*) noexcept;
53 :
54 : /// Completion handler invoked by the worker thread.
55 : func_type func_ = nullptr;
56 : };
57 :
58 : /** Shared thread pool for dispatching blocking operations.
59 :
60 : Provides a fixed pool of reusable worker threads for operations
61 : that cannot be integrated with async I/O (e.g. blocking DNS
62 : calls). Registered as an `execution_context::service` so it
63 : is a singleton per io_context.
64 :
65 : Threads are created eagerly in the constructor. The default
66 : thread count is 1.
67 :
68 : @par Thread Safety
69 : All public member functions are thread-safe.
70 :
71 : @par Shutdown
72 : Sets a shutdown flag, notifies all threads, and joins them.
73 : In-flight blocking calls complete naturally before the thread
74 : exits.
75 : */
76 : class thread_pool final : public capy::execution_context::service
77 : {
78 : std::mutex mutex_;
79 : std::condition_variable cv_;
80 : intrusive_queue<pool_work_item> work_queue_;
81 : std::vector<std::thread> threads_;
82 : bool shutdown_ = false;
83 :
84 : void worker_loop();
85 :
86 : public:
87 : using key_type = thread_pool;
88 :
89 : /** Construct the thread pool service.
90 :
91 : Eagerly creates all worker threads.
92 :
93 : @par Exception Safety
94 : Strong guarantee. If thread creation fails, all
95 : already-created threads are shut down and joined
96 : before the exception propagates.
97 :
98 : @param ctx Reference to the owning execution_context.
99 : @param num_threads Number of worker threads. Must be
100 : at least 1.
101 :
102 : @throws std::logic_error If `num_threads` is 0.
103 : */
104 HIT 468 : explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
105 468 : {
106 : (void)ctx;
107 468 : if (!num_threads)
108 1 : throw std::logic_error("thread_pool requires at least 1 thread");
109 467 : threads_.reserve(num_threads);
110 : try
111 : {
112 937 : for (unsigned i = 0; i < num_threads; ++i)
113 940 : threads_.emplace_back([this] { worker_loop(); });
114 : }
115 MIS 0 : catch (...)
116 : {
117 0 : shutdown();
118 0 : throw;
119 0 : }
120 HIT 470 : }
121 :
122 933 : ~thread_pool() override = default;
123 :
124 : thread_pool(thread_pool const&) = delete;
125 : thread_pool& operator=(thread_pool const&) = delete;
126 :
127 : /** Enqueue a work item for execution on the thread pool.
128 :
129 : Zero-allocation: the caller owns the work item's storage.
130 :
131 : @param w The work item to execute. Must remain valid until
132 : its `func_` has been called.
133 :
134 : @return `true` if the item was enqueued, `false` if the
135 : pool has already shut down.
136 : */
137 : bool post(pool_work_item* w) noexcept;
138 :
139 : /** Shut down the thread pool.
140 :
141 : Signals all threads to exit after draining any
142 : remaining queued work, then joins them.
143 : */
144 : void shutdown() override;
145 : };
146 :
147 : inline void
148 470 : thread_pool::worker_loop()
149 : {
150 : for (;;)
151 : {
152 : pool_work_item* w;
153 : {
154 511 : std::unique_lock<std::mutex> lock(mutex_);
155 511 : cv_.wait(
156 770 : lock, [this] { return shutdown_ || !work_queue_.empty(); });
157 :
158 511 : w = work_queue_.pop();
159 511 : if (!w)
160 : {
161 470 : if (shutdown_)
162 940 : return;
163 MIS 0 : continue;
164 : }
165 HIT 511 : }
166 41 : w->func_(w);
167 41 : }
168 : }
169 :
170 : inline bool
171 42 : thread_pool::post(pool_work_item* w) noexcept
172 : {
173 : {
174 42 : std::lock_guard<std::mutex> lock(mutex_);
175 42 : if (shutdown_)
176 1 : return false;
177 41 : work_queue_.push(w);
178 42 : }
179 41 : cv_.notify_one();
180 41 : return true;
181 : }
182 :
183 : inline void
184 471 : thread_pool::shutdown()
185 : {
186 : {
187 471 : std::lock_guard<std::mutex> lock(mutex_);
188 471 : shutdown_ = true;
189 471 : }
190 471 : cv_.notify_all();
191 :
192 941 : for (auto& t : threads_)
193 : {
194 470 : if (t.joinable())
195 470 : t.join();
196 : }
197 471 : threads_.clear();
198 :
199 : {
200 471 : std::lock_guard<std::mutex> lock(mutex_);
201 471 : while (work_queue_.pop())
202 : ;
203 471 : }
204 471 : }
205 :
206 : } // namespace boost::corosio::detail
207 :
208 : #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
|