TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/io/io_object.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/capy/ex/executor_ref.hpp>
17 :
18 : #include <atomic>
19 : #include <coroutine>
20 : #include <cstddef>
21 : #include <memory>
22 : #include <optional>
23 : #include <stop_token>
24 : #include <system_error>
25 :
26 : #include <errno.h>
27 :
28 : #include <netinet/in.h>
29 : #include <sys/socket.h>
30 : #include <sys/uio.h>
31 :
32 : namespace boost::corosio::detail {
33 :
34 : /** Base operation for reactor-based backends.
35 :
36 : Holds per-operation state that depends on the concrete backend
37 : socket/acceptor types: coroutine handle, executor, output
38 : pointers, file descriptor, stop_callback, and type-specific
39 : impl pointers.
40 :
41 : Fields shared across all backends (errn, bytes_transferred,
42 : cancelled, impl_ptr, perform_io, complete) live in
43 : reactor_op_base so the scheduler and descriptor_state can
44 : access them without template instantiation.
45 :
46 : @tparam Socket The backend socket impl type (forward-declared).
47 : @tparam Acceptor The backend acceptor impl type (forward-declared).
48 : */
49 : template<class Socket, class Acceptor>
50 : struct reactor_op : reactor_op_base
51 : {
52 : /// Stop-token callback that invokes cancel() on the target op.
53 : struct canceller
54 : {
55 : reactor_op* op;
56 HIT 199 : void operator()() const noexcept
57 : {
58 199 : op->cancel();
59 199 : }
60 : };
61 :
62 : /// Caller's coroutine handle to resume on completion.
63 : std::coroutine_handle<> h;
64 :
65 : /// Executor for dispatching the completion.
66 : capy::executor_ref ex;
67 :
68 : /// Output pointer for the error code.
69 : std::error_code* ec_out = nullptr;
70 :
71 : /// Output pointer for bytes transferred.
72 : std::size_t* bytes_out = nullptr;
73 :
74 : /// File descriptor this operation targets.
75 : int fd = -1;
76 :
77 : /// Stop-token callback registration.
78 : std::optional<std::stop_callback<canceller>> stop_cb;
79 :
80 : /// Owning socket impl (for stop_token cancellation).
81 : Socket* socket_impl_ = nullptr;
82 :
83 : /// Owning acceptor impl (for stop_token cancellation).
84 : Acceptor* acceptor_impl_ = nullptr;
85 :
86 41160 : reactor_op() = default;
87 :
88 : /// Reset operation state for reuse.
89 451984 : void reset() noexcept
90 : {
91 451984 : fd = -1;
92 451984 : errn = 0;
93 451984 : bytes_transferred = 0;
94 451984 : cancelled.store(false, std::memory_order_relaxed);
95 451984 : impl_ptr.reset();
96 451984 : socket_impl_ = nullptr;
97 451984 : acceptor_impl_ = nullptr;
98 451984 : }
99 :
100 : /// Return true if this is a read-direction operation.
101 44255 : virtual bool is_read_operation() const noexcept
102 : {
103 44255 : return false;
104 : }
105 :
106 : /// Cancel this operation via the owning impl.
107 : virtual void cancel() noexcept = 0;
108 :
109 : /// Destroy without invoking.
110 MIS 0 : void destroy() override
111 : {
112 0 : stop_cb.reset();
113 0 : reactor_op_base::destroy();
114 0 : }
115 :
116 : /// Arm the stop-token callback for a socket operation.
117 HIT 93380 : void start(std::stop_token const& token, Socket* impl)
118 : {
119 93380 : cancelled.store(false, std::memory_order_release);
120 93380 : stop_cb.reset();
121 93380 : socket_impl_ = impl;
122 93380 : acceptor_impl_ = nullptr;
123 :
124 93380 : if (token.stop_possible())
125 197 : stop_cb.emplace(token, canceller{this});
126 93380 : }
127 :
128 : /// Arm the stop-token callback for an acceptor operation.
129 4496 : void start(std::stop_token const& token, Acceptor* impl)
130 : {
131 4496 : cancelled.store(false, std::memory_order_release);
132 4496 : stop_cb.reset();
133 4496 : socket_impl_ = nullptr;
134 4496 : acceptor_impl_ = impl;
135 :
136 4496 : if (token.stop_possible())
137 9 : stop_cb.emplace(token, canceller{this});
138 4496 : }
139 : };
140 :
141 : /** Shared connect operation.
142 :
143 : Checks SO_ERROR for connect completion status. The operator()()
144 : and cancel() are provided by the concrete backend type.
145 :
146 : @tparam Base The backend's base op type.
147 : */
148 : template<class Base>
149 : struct reactor_connect_op : Base
150 : {
151 : /// Endpoint to connect to.
152 : endpoint target_endpoint;
153 :
154 : /// Reset operation state for reuse.
155 4500 : void reset() noexcept
156 : {
157 4500 : Base::reset();
158 4500 : target_endpoint = endpoint{};
159 4500 : }
160 :
161 4488 : void perform_io() noexcept override
162 : {
163 4488 : int err = 0;
164 4488 : socklen_t len = sizeof(err);
165 4488 : if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
166 MIS 0 : err = errno;
167 HIT 4488 : this->complete(err, 0);
168 4488 : }
169 : };
170 :
171 : /** Shared scatter-read operation.
172 :
173 : Uses readv() with an EINTR retry loop.
174 :
175 : @tparam Base The backend's base op type.
176 : */
177 : template<class Base>
178 : struct reactor_read_op : Base
179 : {
180 : /// Maximum scatter-gather buffer count.
181 : static constexpr std::size_t max_buffers = 16;
182 :
183 : /// Scatter-gather I/O vectors.
184 : iovec iovecs[max_buffers];
185 :
186 : /// Number of active I/O vectors.
187 : int iovec_count = 0;
188 :
189 : /// True for zero-length reads (completed immediately).
190 : bool empty_buffer_read = false;
191 :
192 : /// Return true (this is a read-direction operation).
193 44301 : bool is_read_operation() const noexcept override
194 : {
195 44301 : return !empty_buffer_read;
196 : }
197 :
198 221610 : void reset() noexcept
199 : {
200 221610 : Base::reset();
201 221610 : iovec_count = 0;
202 221610 : empty_buffer_read = false;
203 221610 : }
204 :
205 325 : void perform_io() noexcept override
206 : {
207 : ssize_t n;
208 : do
209 : {
210 325 : n = ::readv(this->fd, iovecs, iovec_count);
211 : }
212 325 : while (n < 0 && errno == EINTR);
213 :
214 325 : if (n >= 0)
215 97 : this->complete(0, static_cast<std::size_t>(n));
216 : else
217 228 : this->complete(errno, 0);
218 325 : }
219 : };
220 :
221 : /** Shared gather-write operation.
222 :
223 : Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
224 : which returns ssize_t (bytes written or -1 with errno set).
225 :
226 : @tparam Base The backend's base op type.
227 : @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
228 : */
229 : template<class Base, class WritePolicy>
230 : struct reactor_write_op : Base
231 : {
232 : /// The write syscall policy type.
233 : using write_policy = WritePolicy;
234 :
235 : /// Maximum scatter-gather buffer count.
236 : static constexpr std::size_t max_buffers = 16;
237 :
238 : /// Scatter-gather I/O vectors.
239 : iovec iovecs[max_buffers];
240 :
241 : /// Number of active I/O vectors.
242 : int iovec_count = 0;
243 :
244 221314 : void reset() noexcept
245 : {
246 221314 : Base::reset();
247 221314 : iovec_count = 0;
248 221314 : }
249 :
250 MIS 0 : void perform_io() noexcept override
251 : {
252 0 : ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
253 0 : if (n >= 0)
254 0 : this->complete(0, static_cast<std::size_t>(n));
255 : else
256 0 : this->complete(errno, 0);
257 0 : }
258 : };
259 :
260 : /** Shared accept operation.
261 :
262 : Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
263 : which returns the accepted fd or -1 with errno set.
264 :
265 : @tparam Base The backend's base op type.
266 : @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
267 : */
268 : template<class Base, class AcceptPolicy>
269 : struct reactor_accept_op : Base
270 : {
271 : /// File descriptor of the accepted connection.
272 : int accepted_fd = -1;
273 :
274 : /// Pointer to the peer socket implementation.
275 : io_object::implementation* peer_impl = nullptr;
276 :
277 : /// Output pointer for the accepted implementation.
278 : io_object::implementation** impl_out = nullptr;
279 :
280 : /// Peer address storage filled by accept.
281 : sockaddr_storage peer_storage{};
282 :
283 HIT 4496 : void reset() noexcept
284 : {
285 4496 : Base::reset();
286 4496 : accepted_fd = -1;
287 4496 : peer_impl = nullptr;
288 4496 : impl_out = nullptr;
289 4496 : peer_storage = {};
290 4496 : }
291 :
292 4480 : void perform_io() noexcept override
293 : {
294 4480 : int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
295 4480 : if (new_fd >= 0)
296 : {
297 4480 : accepted_fd = new_fd;
298 4480 : this->complete(0, 0);
299 : }
300 : else
301 : {
302 MIS 0 : this->complete(errno, 0);
303 : }
304 HIT 4480 : }
305 : };
306 :
307 : /** Shared connected send operation for datagram sockets.
308 :
309 : Uses sendmsg() with msg_name=nullptr (connected mode).
310 :
311 : @tparam Base The backend's base op type.
312 : */
313 : template<class Base>
314 : struct reactor_send_op : Base
315 : {
316 : /// Maximum scatter-gather buffer count.
317 : static constexpr std::size_t max_buffers = 16;
318 :
319 : /// Scatter-gather I/O vectors.
320 : iovec iovecs[max_buffers];
321 :
322 : /// Number of active I/O vectors.
323 : int iovec_count = 0;
324 :
325 6 : void reset() noexcept
326 : {
327 6 : Base::reset();
328 6 : iovec_count = 0;
329 6 : }
330 :
331 MIS 0 : void perform_io() noexcept override
332 : {
333 0 : msghdr msg{};
334 0 : msg.msg_iov = iovecs;
335 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
336 :
337 : #ifdef MSG_NOSIGNAL
338 0 : constexpr int send_flags = MSG_NOSIGNAL;
339 : #else
340 : constexpr int send_flags = 0;
341 : #endif
342 :
343 : ssize_t n;
344 : do
345 : {
346 0 : n = ::sendmsg(this->fd, &msg, send_flags);
347 : }
348 0 : while (n < 0 && errno == EINTR);
349 :
350 0 : if (n >= 0)
351 0 : this->complete(0, static_cast<std::size_t>(n));
352 : else
353 0 : this->complete(errno, 0);
354 0 : }
355 : };
356 :
357 : /** Shared connected recv operation for datagram sockets.
358 :
359 : Uses recvmsg() with msg_name=nullptr (connected mode).
360 : Unlike reactor_read_op, does not map n==0 to EOF
361 : (zero-length datagrams are valid).
362 :
363 : @tparam Base The backend's base op type.
364 : */
365 : template<class Base>
366 : struct reactor_recv_op : Base
367 : {
368 : /// Maximum scatter-gather buffer count.
369 : static constexpr std::size_t max_buffers = 16;
370 :
371 : /// Scatter-gather I/O vectors.
372 : iovec iovecs[max_buffers];
373 :
374 : /// Number of active I/O vectors.
375 : int iovec_count = 0;
376 :
377 : /// Return true (this is a read-direction operation).
378 HIT 2 : bool is_read_operation() const noexcept override
379 : {
380 2 : return true;
381 : }
382 :
383 4 : void reset() noexcept
384 : {
385 4 : Base::reset();
386 4 : iovec_count = 0;
387 4 : }
388 :
389 MIS 0 : void perform_io() noexcept override
390 : {
391 0 : msghdr msg{};
392 0 : msg.msg_iov = iovecs;
393 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
394 :
395 : ssize_t n;
396 : do
397 : {
398 0 : n = ::recvmsg(this->fd, &msg, 0);
399 : }
400 0 : while (n < 0 && errno == EINTR);
401 :
402 0 : if (n >= 0)
403 0 : this->complete(0, static_cast<std::size_t>(n));
404 : else
405 0 : this->complete(errno, 0);
406 0 : }
407 : };
408 :
409 : /** Shared send_to operation for datagram sockets.
410 :
411 : Uses sendmsg() with the destination endpoint in msg_name.
412 :
413 : @tparam Base The backend's base op type.
414 : */
415 : template<class Base>
416 : struct reactor_send_to_op : Base
417 : {
418 : /// Maximum scatter-gather buffer count.
419 : static constexpr std::size_t max_buffers = 16;
420 :
421 : /// Scatter-gather I/O vectors.
422 : iovec iovecs[max_buffers];
423 :
424 : /// Number of active I/O vectors.
425 : int iovec_count = 0;
426 :
427 : /// Destination address storage.
428 : sockaddr_storage dest_storage{};
429 :
430 : /// Destination address length.
431 : socklen_t dest_len = 0;
432 :
433 HIT 22 : void reset() noexcept
434 : {
435 22 : Base::reset();
436 22 : iovec_count = 0;
437 22 : dest_storage = {};
438 22 : dest_len = 0;
439 22 : }
440 :
441 MIS 0 : void perform_io() noexcept override
442 : {
443 0 : msghdr msg{};
444 0 : msg.msg_name = &dest_storage;
445 0 : msg.msg_namelen = dest_len;
446 0 : msg.msg_iov = iovecs;
447 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
448 :
449 : #ifdef MSG_NOSIGNAL
450 0 : constexpr int send_flags = MSG_NOSIGNAL;
451 : #else
452 : constexpr int send_flags = 0;
453 : #endif
454 :
455 : ssize_t n;
456 : do
457 : {
458 0 : n = ::sendmsg(this->fd, &msg, send_flags);
459 : }
460 0 : while (n < 0 && errno == EINTR);
461 :
462 0 : if (n >= 0)
463 0 : this->complete(0, static_cast<std::size_t>(n));
464 : else
465 0 : this->complete(errno, 0);
466 0 : }
467 : };
468 :
469 : /** Shared recv_from operation for datagram sockets.
470 :
471 : Uses recvmsg() with msg_name to capture the source endpoint.
472 :
473 : @tparam Base The backend's base op type.
474 : */
475 : template<class Base>
476 : struct reactor_recv_from_op : Base
477 : {
478 : /// Maximum scatter-gather buffer count.
479 : static constexpr std::size_t max_buffers = 16;
480 :
481 : /// Scatter-gather I/O vectors.
482 : iovec iovecs[max_buffers];
483 :
484 : /// Number of active I/O vectors.
485 : int iovec_count = 0;
486 :
487 : /// Source address storage filled by recvmsg.
488 : sockaddr_storage source_storage{};
489 :
490 : /// Output pointer for the source endpoint (set by do_recv_from).
491 : endpoint* source_out = nullptr;
492 :
493 : /// Return true (this is a read-direction operation).
494 0 : bool is_read_operation() const noexcept override
495 : {
496 0 : return true;
497 : }
498 :
499 HIT 32 : void reset() noexcept
500 : {
501 32 : Base::reset();
502 32 : iovec_count = 0;
503 32 : source_storage = {};
504 32 : source_out = nullptr;
505 32 : }
506 :
507 2 : void perform_io() noexcept override
508 : {
509 2 : msghdr msg{};
510 2 : msg.msg_name = &source_storage;
511 2 : msg.msg_namelen = sizeof(source_storage);
512 2 : msg.msg_iov = iovecs;
513 2 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
514 :
515 : ssize_t n;
516 : do
517 : {
518 2 : n = ::recvmsg(this->fd, &msg, 0);
519 : }
520 2 : while (n < 0 && errno == EINTR);
521 :
522 2 : if (n >= 0)
523 2 : this->complete(0, static_cast<std::size_t>(n));
524 : else
525 MIS 0 : this->complete(errno, 0);
526 HIT 2 : }
527 : };
528 :
529 : } // namespace boost::corosio::detail
530 :
531 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
|