TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
15 :
16 : #include <atomic>
17 : #include <cstdint>
18 : #include <memory>
19 : #include <mutex>
20 :
21 : #include <errno.h>
22 : #include <sys/socket.h>
23 :
24 : namespace boost::corosio::detail {
25 :
26 : /// Shared reactor event constants.
27 : /// These match epoll numeric values; kqueue maps its events to the same.
28 : static constexpr std::uint32_t reactor_event_read = 0x001;
29 : static constexpr std::uint32_t reactor_event_write = 0x004;
30 : static constexpr std::uint32_t reactor_event_error = 0x008;
31 :
32 : /** Per-descriptor state shared across reactor backends.
33 :
34 : Tracks pending operations for a file descriptor. The fd is registered
35 : once with the reactor and stays registered until closed. Uses deferred
36 : I/O: the reactor sets ready_events atomically, then enqueues this state.
37 : When popped by the scheduler, invoke_deferred_io() performs I/O under
38 : the mutex and queues completed ops.
39 :
40 : Non-template: uses reactor_op_base pointers so the scheduler and
41 : descriptor_state code exist as a single copy in the binary regardless
42 : of how many backends are compiled in.
43 :
44 : @par Thread Safety
45 : The mutex protects operation pointers and ready flags. ready_events_
46 : and is_enqueued_ are atomic for lock-free reactor access.
47 : */
48 : struct reactor_descriptor_state : scheduler_op
49 : {
50 : /// Protects operation pointers and ready/cancel flags.
51 : std::mutex mutex;
52 :
53 : /// Pending read operation (guarded by `mutex`).
54 : reactor_op_base* read_op = nullptr;
55 :
56 : /// Pending write operation (guarded by `mutex`).
57 : reactor_op_base* write_op = nullptr;
58 :
59 : /// Pending connect operation (guarded by `mutex`).
60 : reactor_op_base* connect_op = nullptr;
61 :
62 : /// True if a read edge event arrived before an op was registered.
63 : bool read_ready = false;
64 :
65 : /// True if a write edge event arrived before an op was registered.
66 : bool write_ready = false;
67 :
68 : /// Deferred read cancellation (IOCP-style cancel semantics).
69 : bool read_cancel_pending = false;
70 :
71 : /// Deferred write cancellation (IOCP-style cancel semantics).
72 : bool write_cancel_pending = false;
73 :
74 : /// Deferred connect cancellation (IOCP-style cancel semantics).
75 : bool connect_cancel_pending = false;
76 :
77 : /// Event mask set during registration (no mutex needed).
78 : std::uint32_t registered_events = 0;
79 :
80 : /// File descriptor this state tracks.
81 : int fd = -1;
82 :
83 : /// Accumulated ready events (set by reactor, read by scheduler).
84 : std::atomic<std::uint32_t> ready_events_{0};
85 :
86 : /// True while this state is queued in the scheduler's completed_ops.
87 : std::atomic<bool> is_enqueued_{false};
88 :
89 : /// Owning scheduler for posting completions.
90 : reactor_scheduler_base const* scheduler_ = nullptr;
91 :
92 : /// Prevents impl destruction while queued in the scheduler.
93 : std::shared_ptr<void> impl_ref_;
94 :
95 : /// Add ready events atomically.
96 : /// Release pairs with the consumer's acquire exchange on
97 : /// ready_events_ so the consumer sees all flags. On x86 (TSO)
98 : /// this compiles to the same LOCK OR as relaxed.
99 HIT 160360 : void add_ready_events(std::uint32_t ev) noexcept
100 : {
101 160360 : ready_events_.fetch_or(ev, std::memory_order_release);
102 160360 : }
103 :
104 : /// Invoke deferred I/O and dispatch completions.
105 160290 : void operator()() override
106 : {
107 160290 : invoke_deferred_io();
108 160290 : }
109 :
110 : /// Destroy without invoking.
111 : /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
112 : /// the self-referential cycle set by close_socket().
113 70 : void destroy() override
114 : {
115 70 : impl_ref_.reset();
116 70 : }
117 :
118 : /** Perform deferred I/O and queue completions.
119 :
120 : Performs I/O under the mutex and queues completed ops. EAGAIN
121 : ops stay parked in their slot for re-delivery on the next
122 : edge event.
123 : */
124 : void invoke_deferred_io();
125 : };
126 :
127 : inline void
128 160290 : reactor_descriptor_state::invoke_deferred_io()
129 : {
130 160290 : std::shared_ptr<void> prevent_impl_destruction;
131 160290 : op_queue local_ops;
132 :
133 : {
134 160290 : std::lock_guard lock(mutex);
135 :
136 : // Must clear is_enqueued_ and move impl_ref_ under the same
137 : // lock that processes I/O. close_socket() checks is_enqueued_
138 : // under this mutex — without atomicity between the flag store
139 : // and the ref move, close_socket() could see is_enqueued_==false,
140 : // skip setting impl_ref_, and destroy the impl under us.
141 160290 : prevent_impl_destruction = std::move(impl_ref_);
142 160290 : is_enqueued_.store(false, std::memory_order_release);
143 :
144 160290 : std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
145 160290 : if (ev == 0)
146 : {
147 : // Mutex unlocks here; compensate for work_cleanup's decrement
148 MIS 0 : scheduler_->compensating_work_started();
149 0 : return;
150 : }
151 :
152 HIT 160290 : int err = 0;
153 160290 : if (ev & reactor_event_error)
154 : {
155 2 : socklen_t len = sizeof(err);
156 2 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
157 MIS 0 : err = errno;
158 HIT 2 : if (err == 0)
159 MIS 0 : err = EIO;
160 : }
161 :
162 HIT 160290 : if (ev & reactor_event_read)
163 : {
164 133345 : if (read_op)
165 : {
166 4622 : auto* rd = read_op;
167 4622 : if (err)
168 MIS 0 : rd->complete(err, 0);
169 : else
170 HIT 4622 : rd->perform_io();
171 :
172 4622 : if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
173 : {
174 43 : rd->errn = 0;
175 : }
176 : else
177 : {
178 4579 : read_op = nullptr;
179 4579 : local_ops.push(rd);
180 : }
181 : }
182 : else
183 : {
184 128723 : read_ready = true;
185 : }
186 : }
187 160290 : if (ev & reactor_event_write)
188 : {
189 35861 : bool had_write_op = (connect_op || write_op);
190 35861 : if (connect_op)
191 : {
192 4490 : auto* cn = connect_op;
193 4490 : if (err)
194 2 : cn->complete(err, 0);
195 : else
196 4488 : cn->perform_io();
197 4490 : connect_op = nullptr;
198 4490 : local_ops.push(cn);
199 : }
200 35861 : if (write_op)
201 : {
202 MIS 0 : auto* wr = write_op;
203 0 : if (err)
204 0 : wr->complete(err, 0);
205 : else
206 0 : wr->perform_io();
207 :
208 0 : if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
209 : {
210 0 : wr->errn = 0;
211 : }
212 : else
213 : {
214 0 : write_op = nullptr;
215 0 : local_ops.push(wr);
216 : }
217 : }
218 HIT 35861 : if (!had_write_op)
219 31371 : write_ready = true;
220 : }
221 160290 : if (err)
222 : {
223 2 : if (read_op)
224 : {
225 MIS 0 : read_op->complete(err, 0);
226 0 : local_ops.push(std::exchange(read_op, nullptr));
227 : }
228 HIT 2 : if (write_op)
229 : {
230 MIS 0 : write_op->complete(err, 0);
231 0 : local_ops.push(std::exchange(write_op, nullptr));
232 : }
233 HIT 2 : if (connect_op)
234 : {
235 MIS 0 : connect_op->complete(err, 0);
236 0 : local_ops.push(std::exchange(connect_op, nullptr));
237 : }
238 : }
239 HIT 160290 : }
240 :
241 : // Execute first handler inline — the scheduler's work_cleanup
242 : // accounts for this as the "consumed" work item
243 160290 : scheduler_op* first = local_ops.pop();
244 160290 : if (first)
245 : {
246 9069 : scheduler_->post_deferred_completions(local_ops);
247 9069 : (*first)();
248 : }
249 : else
250 : {
251 151221 : scheduler_->compensating_work_started();
252 : }
253 160290 : }
254 :
255 : } // namespace boost::corosio::detail
256 :
257 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
|