TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
12 :
13 : #include <boost/corosio/detail/intrusive.hpp>
14 : #include <boost/corosio/detail/native_handle.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
17 : #include <boost/corosio/native/detail/make_err.hpp>
18 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
19 :
20 : #include <memory>
21 : #include <mutex>
22 : #include <utility>
23 :
24 : #include <errno.h>
25 : #include <netinet/in.h>
26 : #include <sys/socket.h>
27 : #include <unistd.h>
28 :
29 : namespace boost::corosio::detail {
30 :
31 : /** CRTP base for reactor-backed socket implementations.
32 :
33 : Extracts the shared data members, virtual overrides, and
34 : cancel/close/register logic that is identical across TCP
35 : (reactor_stream_socket) and UDP (reactor_datagram_socket).
36 :
37 : Derived classes provide CRTP callbacks that enumerate their
38 : specific op slots so cancel/close can iterate them generically.
39 :
40 : @tparam Derived The concrete socket type (CRTP).
41 : @tparam ImplBase The public vtable base (tcp_socket::implementation
42 : or udp_socket::implementation).
43 : @tparam Service The backend's service type.
44 : @tparam DescState The backend's descriptor_state type.
45 : */
46 : template<class Derived, class ImplBase, class Service, class DescState>
47 : class reactor_basic_socket
48 : : public ImplBase
49 : , public std::enable_shared_from_this<Derived>
50 : , public intrusive_list<Derived>::node
51 : {
52 : friend Derived;
53 :
54 : template<class, class, class, class, class, class>
55 : friend class reactor_stream_socket;
56 :
57 : template<class, class, class, class, class, class, class, class>
58 : friend class reactor_datagram_socket;
59 :
60 HIT 13621 : explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
61 :
62 : protected:
63 : Service& svc_;
64 : int fd_ = -1;
65 : endpoint local_endpoint_;
66 :
67 : public:
68 : /// Per-descriptor state for persistent reactor registration.
69 : DescState desc_state_;
70 :
71 13621 : ~reactor_basic_socket() override = default;
72 :
73 : /// Return the underlying file descriptor.
74 27936 : native_handle_type native_handle() const noexcept override
75 : {
76 27936 : return fd_;
77 : }
78 :
79 : /// Return the cached local endpoint.
80 68 : endpoint local_endpoint() const noexcept override
81 : {
82 68 : return local_endpoint_;
83 : }
84 :
85 : /// Return true if the socket has an open file descriptor.
86 : bool is_open() const noexcept
87 : {
88 : return fd_ >= 0;
89 : }
90 :
91 : /// Set a socket option.
92 80 : std::error_code set_option(
93 : int level,
94 : int optname,
95 : void const* data,
96 : std::size_t size) noexcept override
97 : {
98 80 : if (::setsockopt(
99 80 : fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
100 MIS 0 : return make_err(errno);
101 HIT 80 : return {};
102 : }
103 :
104 : /// Get a socket option.
105 : std::error_code
106 78 : get_option(int level, int optname, void* data, std::size_t* size)
107 : const noexcept override
108 : {
109 78 : socklen_t len = static_cast<socklen_t>(*size);
110 78 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
111 MIS 0 : return make_err(errno);
112 HIT 78 : *size = static_cast<std::size_t>(len);
113 78 : return {};
114 : }
115 :
116 : /// Assign the file descriptor.
117 4484 : void set_socket(int fd) noexcept
118 : {
119 4484 : fd_ = fd;
120 4484 : }
121 :
122 : /// Cache the local endpoint.
123 : void set_local_endpoint(endpoint ep) noexcept
124 : {
125 : local_endpoint_ = ep;
126 : }
127 :
128 : /** Bind the socket to a local endpoint.
129 :
130 : Calls ::bind() and caches the resulting local endpoint
131 : via getsockname().
132 :
133 : @param ep The endpoint to bind to.
134 : @return Error code on failure, empty on success.
135 : */
136 42 : std::error_code do_bind(endpoint ep) noexcept
137 : {
138 42 : sockaddr_storage storage{};
139 42 : socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
140 42 : if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) != 0)
141 MIS 0 : return make_err(errno);
142 :
143 HIT 42 : sockaddr_storage local_storage{};
144 42 : socklen_t local_len = sizeof(local_storage);
145 42 : if (::getsockname(
146 42 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
147 : 0)
148 42 : local_endpoint_ = from_sockaddr(local_storage);
149 :
150 42 : return {};
151 : }
152 :
153 : /** Register an op with the reactor.
154 :
155 : Handles cached edge events and deferred cancellation.
156 : Called on the EAGAIN/EINPROGRESS path when speculative
157 : I/O failed.
158 : */
159 : template<class Op>
160 : void register_op(
161 : Op& op,
162 : reactor_op_base*& desc_slot,
163 : bool& ready_flag,
164 : bool& cancel_flag) noexcept;
165 :
166 : /** Cancel a single pending operation.
167 :
168 : Claims the operation from its descriptor_state slot under
169 : the mutex and posts it to the scheduler as cancelled.
170 : Derived must implement:
171 : op_to_desc_slot(Op&) -> reactor_op_base**
172 : op_to_cancel_flag(Op&) -> bool*
173 : */
174 : template<class Op>
175 : void cancel_single_op(Op& op) noexcept;
176 :
177 : /** Cancel all pending operations.
178 :
179 : Invoked by the derived class's cancel() override.
180 : Derived must implement:
181 : for_each_op(auto fn)
182 : for_each_desc_entry(auto fn)
183 : */
184 : void do_cancel() noexcept;
185 :
186 : /** Close the socket and cancel pending operations.
187 :
188 : Invoked by the derived class's close_socket(). The
189 : derived class may add backend-specific cleanup after
190 : calling this method.
191 : Derived must implement:
192 : for_each_op(auto fn)
193 : for_each_desc_entry(auto fn)
194 : */
195 : void do_close_socket() noexcept;
196 : };
197 :
198 : template<class Derived, class ImplBase, class Service, class DescState>
199 : template<class Op>
200 : void
201 4887 : reactor_basic_socket<Derived, ImplBase, Service, DescState>::register_op(
202 : Op& op,
203 : reactor_op_base*& desc_slot,
204 : bool& ready_flag,
205 : bool& cancel_flag) noexcept
206 : {
207 4887 : svc_.work_started();
208 :
209 4887 : std::lock_guard lock(desc_state_.mutex);
210 4887 : bool io_done = false;
211 4887 : if (ready_flag)
212 : {
213 185 : ready_flag = false;
214 185 : op.perform_io();
215 185 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
216 185 : if (!io_done)
217 185 : op.errn = 0;
218 : }
219 :
220 4887 : if (cancel_flag)
221 : {
222 MIS 0 : cancel_flag = false;
223 0 : op.cancelled.store(true, std::memory_order_relaxed);
224 : }
225 :
226 HIT 4887 : if (io_done || op.cancelled.load(std::memory_order_acquire))
227 : {
228 MIS 0 : svc_.post(&op);
229 0 : svc_.work_finished();
230 : }
231 : else
232 : {
233 HIT 4887 : desc_slot = &op;
234 : }
235 4887 : }
236 :
237 : template<class Derived, class ImplBase, class Service, class DescState>
238 : template<class Op>
239 : void
240 193 : reactor_basic_socket<Derived, ImplBase, Service, DescState>::cancel_single_op(
241 : Op& op) noexcept
242 : {
243 193 : auto self = this->weak_from_this().lock();
244 193 : if (!self)
245 MIS 0 : return;
246 :
247 HIT 193 : op.request_cancel();
248 :
249 193 : auto* d = static_cast<Derived*>(this);
250 193 : reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
251 :
252 193 : if (desc_op_ptr)
253 : {
254 193 : reactor_op_base* claimed = nullptr;
255 : {
256 193 : std::lock_guard lock(desc_state_.mutex);
257 193 : if (*desc_op_ptr == &op)
258 193 : claimed = std::exchange(*desc_op_ptr, nullptr);
259 : else
260 : {
261 MIS 0 : bool* cflag = d->op_to_cancel_flag(op);
262 0 : if (cflag)
263 0 : *cflag = true;
264 : }
265 HIT 193 : }
266 193 : if (claimed)
267 : {
268 193 : op.impl_ptr = self;
269 193 : svc_.post(&op);
270 193 : svc_.work_finished();
271 : }
272 : }
273 193 : }
274 :
275 : template<class Derived, class ImplBase, class Service, class DescState>
276 : void
277 189 : reactor_basic_socket<Derived, ImplBase, Service, DescState>::
278 : do_cancel() noexcept
279 : {
280 189 : auto self = this->weak_from_this().lock();
281 189 : if (!self)
282 MIS 0 : return;
283 :
284 HIT 189 : auto* d = static_cast<Derived*>(this);
285 :
286 764 : d->for_each_op([](auto& op) { op.request_cancel(); });
287 :
288 : // Claim ops under a single lock acquisition
289 : struct claimed_entry
290 : {
291 : reactor_op_base* op = nullptr;
292 : reactor_op_base* base = nullptr;
293 : };
294 : // Max 3 ops (conn, rd, wr)
295 189 : claimed_entry claimed[3];
296 189 : int count = 0;
297 :
298 : {
299 189 : std::lock_guard lock(desc_state_.mutex);
300 1339 : d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
301 575 : if (desc_slot == &op)
302 : {
303 101 : claimed[count].op = std::exchange(desc_slot, nullptr);
304 101 : claimed[count].base = &op;
305 101 : ++count;
306 : }
307 : });
308 189 : }
309 :
310 290 : for (int i = 0; i < count; ++i)
311 : {
312 101 : claimed[i].base->impl_ptr = self;
313 101 : svc_.post(claimed[i].base);
314 101 : svc_.work_finished();
315 : }
316 189 : }
317 :
318 : template<class Derived, class ImplBase, class Service, class DescState>
319 : void
320 40914 : reactor_basic_socket<Derived, ImplBase, Service, DescState>::
321 : do_close_socket() noexcept
322 : {
323 40914 : auto self = this->weak_from_this().lock();
324 40914 : if (self)
325 : {
326 40914 : auto* d = static_cast<Derived*>(this);
327 :
328 164264 : d->for_each_op([](auto& op) { op.request_cancel(); });
329 :
330 : struct claimed_entry
331 : {
332 : reactor_op_base* base = nullptr;
333 : };
334 40914 : claimed_entry claimed[3];
335 40914 : int count = 0;
336 :
337 : {
338 40914 : std::lock_guard lock(desc_state_.mutex);
339 40914 : d->for_each_desc_entry(
340 246700 : [&](auto& /*op*/, reactor_op_base*& desc_slot) {
341 123350 : auto* c = std::exchange(desc_slot, nullptr);
342 123350 : if (c)
343 : {
344 4 : claimed[count].base = c;
345 4 : ++count;
346 : }
347 : });
348 40914 : desc_state_.read_ready = false;
349 40914 : desc_state_.write_ready = false;
350 40914 : desc_state_.read_cancel_pending = false;
351 40914 : desc_state_.write_cancel_pending = false;
352 40914 : desc_state_.connect_cancel_pending = false;
353 :
354 40914 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
355 217 : desc_state_.impl_ref_ = self;
356 40914 : }
357 :
358 40918 : for (int i = 0; i < count; ++i)
359 : {
360 4 : claimed[i].base->impl_ptr = self;
361 4 : svc_.post(claimed[i].base);
362 4 : svc_.work_finished();
363 : }
364 : }
365 :
366 40914 : if (fd_ >= 0)
367 : {
368 9078 : if (desc_state_.registered_events != 0)
369 9078 : svc_.scheduler().deregister_descriptor(fd_);
370 9078 : ::close(fd_);
371 9078 : fd_ = -1;
372 : }
373 :
374 40914 : desc_state_.fd = -1;
375 40914 : desc_state_.registered_events = 0;
376 :
377 40914 : local_endpoint_ = endpoint{};
378 40914 : }
379 :
380 : } // namespace boost::corosio::detail
381 :
382 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
|