include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

83.9% Lines (115/137) 100.0% List of functions (9/9)
f(x) Functions (9)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <atomic>
31 #include <chrono>
32 #include <cstdint>
33 #include <mutex>
34
35 #include <errno.h>
36 #include <sys/epoll.h>
37 #include <sys/eventfd.h>
38 #include <sys/timerfd.h>
39 #include <unistd.h>
40
41 namespace boost::corosio::detail {
42
43 struct epoll_op;
44 struct descriptor_state;
45
46 /** Linux scheduler using epoll for I/O multiplexing.
47
48 This scheduler implements the scheduler interface using Linux epoll
49 for efficient I/O event notification. It uses a single reactor model
50 where one thread runs epoll_wait while other threads
51 wait on a condition variable for handler work. This design provides:
52
53 - Handler parallelism: N posted handlers can execute on N threads
54 - No thundering herd: condition_variable wakes exactly one thread
55 - IOCP parity: Behavior matches Windows I/O completion port semantics
56
57 When threads call run(), they first try to execute queued handlers.
58 If the queue is empty and no reactor is running, one thread becomes
59 the reactor and runs epoll_wait. Other threads wait on a condition
60 variable until handlers are available.
61
62 @par Thread Safety
63 All public member functions are thread-safe.
64 */
65 class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
66 {
67 public:
68 /** Construct the scheduler.
69
70 Creates an epoll instance, eventfd for reactor interruption,
71 and timerfd for kernel-managed timer expiry.
72
73 @param ctx Reference to the owning execution_context.
74 @param concurrency_hint Hint for expected thread count (unused).
75 */
76 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
77
78 /// Destroy the scheduler.
79 ~epoll_scheduler() override;
80
81 epoll_scheduler(epoll_scheduler const&) = delete;
82 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
83
84 /// Shut down the scheduler, draining pending operations.
85 void shutdown() override;
86
87 /** Return the epoll file descriptor.
88
89 Used by socket services to register file descriptors
90 for I/O event notification.
91
92 @return The epoll file descriptor.
93 */
94 int epoll_fd() const noexcept
95 {
96 return epoll_fd_;
97 }
98
99 /** Register a descriptor for persistent monitoring.
100
101 The fd is registered once and stays registered until explicitly
102 deregistered. Events are dispatched via descriptor_state which
103 tracks pending read/write/connect operations.
104
105 @param fd The file descriptor to register.
106 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
107 */
108 void register_descriptor(int fd, descriptor_state* desc) const;
109
110 /** Deregister a persistently registered descriptor.
111
112 @param fd The file descriptor to deregister.
113 */
114 void deregister_descriptor(int fd) const;
115
116 private:
117 void
118 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
119 void interrupt_reactor() const override;
120 void update_timerfd() const;
121
122 int epoll_fd_;
123 int event_fd_;
124 int timer_fd_;
125
126 // Edge-triggered eventfd state
127 mutable std::atomic<bool> eventfd_armed_{false};
128
129 // Set when the earliest timer changes; flushed before epoll_wait
130 mutable std::atomic<bool> timerfd_stale_{false};
131 };
132
133 271x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
134 271x : epoll_fd_(-1)
135 271x , event_fd_(-1)
136 271x , timer_fd_(-1)
137 {
138 271x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
139 271x if (epoll_fd_ < 0)
140 detail::throw_system_error(make_err(errno), "epoll_create1");
141
142 271x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
143 271x if (event_fd_ < 0)
144 {
145 int errn = errno;
146 ::close(epoll_fd_);
147 detail::throw_system_error(make_err(errn), "eventfd");
148 }
149
150 271x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
151 271x if (timer_fd_ < 0)
152 {
153 int errn = errno;
154 ::close(event_fd_);
155 ::close(epoll_fd_);
156 detail::throw_system_error(make_err(errn), "timerfd_create");
157 }
158
159 271x epoll_event ev{};
160 271x ev.events = EPOLLIN | EPOLLET;
161 271x ev.data.ptr = nullptr;
162 271x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
163 {
164 int errn = errno;
165 ::close(timer_fd_);
166 ::close(event_fd_);
167 ::close(epoll_fd_);
168 detail::throw_system_error(make_err(errn), "epoll_ctl");
169 }
170
171 271x epoll_event timer_ev{};
172 271x timer_ev.events = EPOLLIN | EPOLLERR;
173 271x timer_ev.data.ptr = &timer_fd_;
174 271x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
175 {
176 int errn = errno;
177 ::close(timer_fd_);
178 ::close(event_fd_);
179 ::close(epoll_fd_);
180 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
181 }
182
183 271x timer_svc_ = &get_timer_service(ctx, *this);
184 271x timer_svc_->set_on_earliest_changed(
185 2968x timer_service::callback(this, [](void* p) {
186 2697x auto* self = static_cast<epoll_scheduler*>(p);
187 2697x self->timerfd_stale_.store(true, std::memory_order_release);
188 2697x self->interrupt_reactor();
189 2697x }));
190
191 271x get_resolver_service(ctx, *this);
192 271x get_signal_service(ctx, *this);
193
194 271x completed_ops_.push(&task_op_);
195 271x }
196
197 542x inline epoll_scheduler::~epoll_scheduler()
198 {
199 271x if (timer_fd_ >= 0)
200 271x ::close(timer_fd_);
201 271x if (event_fd_ >= 0)
202 271x ::close(event_fd_);
203 271x if (epoll_fd_ >= 0)
204 271x ::close(epoll_fd_);
205 542x }
206
207 inline void
208 271x epoll_scheduler::shutdown()
209 {
210 271x shutdown_drain();
211
212 271x if (event_fd_ >= 0)
213 271x interrupt_reactor();
214 271x }
215
216 inline void
217 5056x epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
218 {
219 5056x epoll_event ev{};
220 5056x ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
221 5056x ev.data.ptr = desc;
222
223 5056x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
224 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
225
226 5056x desc->registered_events = ev.events;
227 5056x desc->fd = fd;
228 5056x desc->scheduler_ = this;
229 5056x desc->ready_events_.store(0, std::memory_order_relaxed);
230
231 5056x std::lock_guard lock(desc->mutex);
232 5056x desc->impl_ref_.reset();
233 5056x desc->read_ready = false;
234 5056x desc->write_ready = false;
235 5056x }
236
237 inline void
238 5056x epoll_scheduler::deregister_descriptor(int fd) const
239 {
240 5056x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
241 5056x }
242
243 inline void
244 3202x epoll_scheduler::interrupt_reactor() const
245 {
246 3202x bool expected = false;
247 3202x if (eventfd_armed_.compare_exchange_strong(
248 expected, true, std::memory_order_release,
249 std::memory_order_relaxed))
250 {
251 3017x std::uint64_t val = 1;
252 3017x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
253 }
254 3202x }
255
256 inline void
257 5352x epoll_scheduler::update_timerfd() const
258 {
259 5352x auto nearest = timer_svc_->nearest_expiry();
260
261 5352x itimerspec ts{};
262 5352x int flags = 0;
263
264 5352x if (nearest == timer_service::time_point::max())
265 {
266 // No timers — disarm by setting to 0 (relative)
267 }
268 else
269 {
270 5298x auto now = std::chrono::steady_clock::now();
271 5298x if (nearest <= now)
272 {
273 // Use 1ns instead of 0 — zero disarms the timerfd
274 43x ts.it_value.tv_nsec = 1;
275 }
276 else
277 {
278 5255x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
279 5255x nearest - now)
280 5255x .count();
281 5255x ts.it_value.tv_sec = nsec / 1000000000;
282 5255x ts.it_value.tv_nsec = nsec % 1000000000;
283 5255x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
284 ts.it_value.tv_nsec = 1;
285 }
286 }
287
288 5352x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
289 detail::throw_system_error(make_err(errno), "timerfd_settime");
290 5352x }
291
292 inline void
293 31710x epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
294 {
295 31710x int timeout_ms = task_interrupted_ ? 0 : -1;
296
297 31710x if (lock.owns_lock())
298 7538x lock.unlock();
299
300 31710x task_cleanup on_exit{this, &lock, ctx};
301
302 // Flush deferred timerfd programming before blocking
303 31710x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
304 2675x update_timerfd();
305
306 epoll_event events[128];
307 31710x int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
308
309 31710x if (nfds < 0 && errno != EINTR)
310 detail::throw_system_error(make_err(errno), "epoll_wait");
311
312 31710x bool check_timers = false;
313 31710x op_queue local_ops;
314
315 73475x for (int i = 0; i < nfds; ++i)
316 {
317 41765x if (events[i].data.ptr == nullptr)
318 {
319 std::uint64_t val;
320 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
321 2746x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
322 2746x eventfd_armed_.store(false, std::memory_order_relaxed);
323 2746x continue;
324 2746x }
325
326 39019x if (events[i].data.ptr == &timer_fd_)
327 {
328 std::uint64_t expirations;
329 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
330 [[maybe_unused]] auto r =
331 2677x ::read(timer_fd_, &expirations, sizeof(expirations));
332 2677x check_timers = true;
333 2677x continue;
334 2677x }
335
336 36342x auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
337 36342x desc->add_ready_events(events[i].events);
338
339 36342x bool expected = false;
340 36342x if (desc->is_enqueued_.compare_exchange_strong(
341 expected, true, std::memory_order_release,
342 std::memory_order_relaxed))
343 {
344 36342x local_ops.push(desc);
345 }
346 }
347
348 31710x if (check_timers)
349 {
350 2677x timer_svc_->process_expired();
351 2677x update_timerfd();
352 }
353
354 31710x lock.lock();
355
356 31710x if (!local_ops.empty())
357 23662x completed_ops_.splice(local_ops);
358 31710x }
359
360 } // namespace boost::corosio::detail
361
362 #endif // BOOST_COROSIO_HAS_EPOLL
363
364 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
365