include/boost/corosio/native/detail/reactor/reactor_stream_socket.hpp

64.8% Lines (274/423) 72.2% List of functions (26/36)
f(x) Functions (36)
Function Calls Lines Blocks
boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::reactor_stream_socket(boost::corosio::detail::epoll_tcp_service&) :61 7453x 100.0% 100.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::reactor_stream_socket(boost::corosio::detail::select_tcp_service&) :61 6090x 100.0% 100.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::~reactor_stream_socket() :76 7453x 100.0% 100.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::~reactor_stream_socket() :76 6090x 100.0% 100.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::remote_endpoint() const :79 21x 100.0% 100.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::remote_endpoint() const :79 21x 100.0% 100.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::shutdown(boost::corosio::tcp_socket::shutdown_type) :85 3x 81.2% 83.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::shutdown(boost::corosio::tcp_socket::shutdown_type) :85 3x 81.2% 83.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::set_endpoints(boost::corosio::endpoint, boost::corosio::endpoint) :108 4926x 100.0% 100.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::set_endpoints(boost::corosio::endpoint, boost::corosio::endpoint) :108 4042x 100.0% 100.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::do_close_socket() :160 22331x 100.0% 100.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::do_close_socket() :160 18279x 100.0% 100.0% boost::corosio::detail::reactor_op_base** boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::op_to_desc_slot<boost::corosio::detail::epoll_connect_op>(boost::corosio::detail::epoll_connect_op&) :170 0 0.0% 0.0% boost::corosio::detail::reactor_op_base** boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::op_to_desc_slot<boost::corosio::detail::epoll_read_op>(boost::corosio::detail::epoll_read_op&) :170 98x 50.0% 50.0% boost::corosio::detail::reactor_op_base** boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::op_to_desc_slot<boost::corosio::detail::epoll_write_op>(boost::corosio::detail::epoll_write_op&) :170 0 0.0% 0.0% boost::corosio::detail::reactor_op_base** boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::op_to_desc_slot<boost::corosio::detail::select_connect_op>(boost::corosio::detail::select_connect_op&) :170 0 0.0% 0.0% boost::corosio::detail::reactor_op_base** boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::op_to_desc_slot<boost::corosio::detail::select_read_op>(boost::corosio::detail::select_read_op&) :170 93x 50.0% 50.0% boost::corosio::detail::reactor_op_base** boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::op_to_desc_slot<boost::corosio::detail::select_write_op>(boost::corosio::detail::select_write_op&) :170 0 0.0% 0.0% bool* boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::op_to_cancel_flag<boost::corosio::detail::epoll_connect_op>(boost::corosio::detail::epoll_connect_op&) :182 0 0.0% 0.0% bool* boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::op_to_cancel_flag<boost::corosio::detail::epoll_read_op>(boost::corosio::detail::epoll_read_op&) :182 0 0.0% 0.0% bool* boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::op_to_cancel_flag<boost::corosio::detail::epoll_write_op>(boost::corosio::detail::epoll_write_op&) :182 0 0.0% 0.0% bool* boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::op_to_cancel_flag<boost::corosio::detail::select_connect_op>(boost::corosio::detail::select_connect_op&) :182 0 0.0% 0.0% bool* boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::op_to_cancel_flag<boost::corosio::detail::select_read_op>(boost::corosio::detail::select_read_op&) :182 0 0.0% 0.0% bool* boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::op_to_cancel_flag<boost::corosio::detail::select_write_op>(boost::corosio::detail::select_write_op&) :182 0 0.0% 0.0% void boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::for_each_op<boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::descriptor_state>::do_cancel()::{lambda(auto:1&)#1}>(boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::descriptor_state>::do_cancel()::{lambda(auto:1&)#1}) :194 95x 100.0% 100.0% void boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::for_each_op<boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::descriptor_state>::do_close_socket()::{lambda(auto:1&)#1}>(boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::descriptor_state>::do_close_socket()::{lambda(auto:1&)#1}) :194 22331x 100.0% 100.0% void boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::for_each_op<boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_descriptor_state>::do_cancel()::{lambda(auto:1&)#1}>(boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_descriptor_state>::do_cancel()::{lambda(auto:1&)#1}) :194 90x 100.0% 100.0% void boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::for_each_op<boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_descriptor_state>::do_close_socket()::{lambda(auto:1&)#1}>(boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_descriptor_state>::do_close_socket()::{lambda(auto:1&)#1}) :194 18279x 100.0% 100.0% void boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::for_each_desc_entry<boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::descriptor_state>::do_cancel()::{lambda(auto:1&, boost::corosio::detail::reactor_op_base*&)#1}>(boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::descriptor_state>::do_cancel()::{lambda(auto:1&, boost::corosio::detail::reactor_op_base*&)#1}) :202 95x 100.0% 100.0% void boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::for_each_desc_entry<boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::descriptor_state>::do_close_socket()::{lambda(auto:1&, boost::corosio::detail::reactor_op_base*&)#1}>(boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::descriptor_state>::do_close_socket()::{lambda(auto:1&, boost::corosio::detail::reactor_op_base*&)#1}) :202 22331x 100.0% 100.0% void boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::for_each_desc_entry<boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_descriptor_state>::do_cancel()::{lambda(auto:1&, boost::corosio::detail::reactor_op_base*&)#1}>(boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_descriptor_state>::do_cancel()::{lambda(auto:1&, boost::corosio::detail::reactor_op_base*&)#1}) :202 90x 100.0% 100.0% void boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::for_each_desc_entry<boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_descriptor_state>::do_close_socket()::{lambda(auto:1&, boost::corosio::detail::reactor_op_base*&)#1}>(boost::corosio::detail::reactor_basic_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::tcp_socket::implementation, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_descriptor_state>::do_close_socket()::{lambda(auto:1&, boost::corosio::detail::reactor_op_base*&)#1}) :202 18279x 100.0% 100.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::do_connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token const&, std::error_code*) :218 2466x 47.5% 39.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::do_connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token const&, std::error_code*) :218 2024x 47.5% 39.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::epoll_tcp_socket, boost::corosio::detail::epoll_tcp_service, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::do_read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token const&, std::error_code*, unsigned long*) :289 112099x 98.1% 91.0% boost::corosio::detail::reactor_stream_socket<boost::corosio::detail::select_tcp_socket, boost::corosio::detail::select_tcp_service, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::do_read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token const&, std::error_code*, unsigned long*) :289 109511x 98.1% 91.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
12
13 #include <boost/corosio/tcp_socket.hpp>
14 #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
15 #include <boost/corosio/detail/dispatch_coro.hpp>
16 #include <boost/capy/buffers.hpp>
17
18 #include <coroutine>
19
20 #include <errno.h>
21 #include <sys/socket.h>
22 #include <sys/uio.h>
23
24 namespace boost::corosio::detail {
25
26 /** CRTP base for reactor-backed stream socket implementations.
27
28 Inherits shared data members and cancel/close/register logic
29 from reactor_basic_socket. Adds the TCP-specific remote
30 endpoint, shutdown, and I/O dispatch (connect, read, write).
31
32 @tparam Derived The concrete socket type (CRTP).
33 @tparam Service The backend's socket service type.
34 @tparam ConnOp The backend's connect op type.
35 @tparam ReadOp The backend's read op type.
36 @tparam WriteOp The backend's write op type.
37 @tparam DescState The backend's descriptor_state type.
38 */
39 template<
40 class Derived,
41 class Service,
42 class ConnOp,
43 class ReadOp,
44 class WriteOp,
45 class DescState>
46 class reactor_stream_socket
47 : public reactor_basic_socket<
48 Derived,
49 tcp_socket::implementation,
50 Service,
51 DescState>
52 {
53 using base_type = reactor_basic_socket<
54 Derived,
55 tcp_socket::implementation,
56 Service,
57 DescState>;
58 friend base_type;
59 friend Derived;
60
61 13543x explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
62
63 protected:
64 endpoint remote_endpoint_;
65
66 public:
67 /// Pending connect operation slot.
68 ConnOp conn_;
69
70 /// Pending read operation slot.
71 ReadOp rd_;
72
73 /// Pending write operation slot.
74 WriteOp wr_;
75
76 13543x ~reactor_stream_socket() override = default;
77
78 /// Return the cached remote endpoint.
79 42x endpoint remote_endpoint() const noexcept override
80 {
81 42x return remote_endpoint_;
82 }
83
84 /// Shut down part or all of the full-duplex connection.
85 6x std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override
86 {
87 int how;
88 6x switch (what)
89 {
90 2x case tcp_socket::shutdown_receive:
91 2x how = SHUT_RD;
92 2x break;
93 2x case tcp_socket::shutdown_send:
94 2x how = SHUT_WR;
95 2x break;
96 2x case tcp_socket::shutdown_both:
97 2x how = SHUT_RDWR;
98 2x break;
99 default:
100 return make_err(EINVAL);
101 }
102 6x if (::shutdown(this->fd_, how) != 0)
103 return make_err(errno);
104 6x return {};
105 }
106
107 /// Cache local and remote endpoints.
108 8968x void set_endpoints(endpoint local, endpoint remote) noexcept
109 {
110 8968x this->local_endpoint_ = local;
111 8968x remote_endpoint_ = remote;
112 8968x }
113
114 /** Shared connect dispatch.
115
116 Tries the connect syscall speculatively. On synchronous
117 completion, returns via inline budget or posts through queue.
118 On EINPROGRESS, registers with the reactor.
119 */
120 std::coroutine_handle<> do_connect(
121 std::coroutine_handle<>,
122 capy::executor_ref,
123 endpoint,
124 std::stop_token const&,
125 std::error_code*);
126
127 /** Shared scatter-read dispatch.
128
129 Tries readv() speculatively. On success or hard error,
130 returns via inline budget or posts through queue.
131 On EAGAIN, registers with the reactor.
132 */
133 std::coroutine_handle<> do_read_some(
134 std::coroutine_handle<>,
135 capy::executor_ref,
136 buffer_param,
137 std::stop_token const&,
138 std::error_code*,
139 std::size_t*);
140
141 /** Shared gather-write dispatch.
142
143 Tries the write via WriteOp::write_policy speculatively.
144 On success or hard error, returns via inline budget or
145 posts through queue. On EAGAIN, registers with the reactor.
146 */
147 std::coroutine_handle<> do_write_some(
148 std::coroutine_handle<>,
149 capy::executor_ref,
150 buffer_param,
151 std::stop_token const&,
152 std::error_code*,
153 std::size_t*);
154
155 /** Close the socket and cancel pending operations.
156
157 Extends the base do_close_socket() to also reset
158 the remote endpoint.
159 */
160 40610x void do_close_socket() noexcept
161 {
162 40610x base_type::do_close_socket();
163 40610x remote_endpoint_ = endpoint{};
164 40610x }
165
166 private:
167 // CRTP callbacks for reactor_basic_socket cancel/close
168
169 template<class Op>
170 191x reactor_op_base** op_to_desc_slot(Op& op) noexcept
171 {
172 191x if (&op == static_cast<void*>(&conn_))
173 return &this->desc_state_.connect_op;
174 191x if (&op == static_cast<void*>(&rd_))
175 191x return &this->desc_state_.read_op;
176 if (&op == static_cast<void*>(&wr_))
177 return &this->desc_state_.write_op;
178 return nullptr;
179 }
180
181 template<class Op>
182 bool* op_to_cancel_flag(Op& op) noexcept
183 {
184 if (&op == static_cast<void*>(&conn_))
185 return &this->desc_state_.connect_cancel_pending;
186 if (&op == static_cast<void*>(&rd_))
187 return &this->desc_state_.read_cancel_pending;
188 if (&op == static_cast<void*>(&wr_))
189 return &this->desc_state_.write_cancel_pending;
190 return nullptr;
191 }
192
193 template<class Fn>
194 40795x void for_each_op(Fn fn) noexcept
195 {
196 40795x fn(conn_);
197 40795x fn(rd_);
198 40795x fn(wr_);
199 40795x }
200
201 template<class Fn>
202 40795x void for_each_desc_entry(Fn fn) noexcept
203 {
204 40795x fn(conn_, this->desc_state_.connect_op);
205 40795x fn(rd_, this->desc_state_.read_op);
206 40795x fn(wr_, this->desc_state_.write_op);
207 40795x }
208 };
209
210 template<
211 class Derived,
212 class Service,
213 class ConnOp,
214 class ReadOp,
215 class WriteOp,
216 class DescState>
217 std::coroutine_handle<>
218 4490x reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState>::
219 do_connect(
220 std::coroutine_handle<> h,
221 capy::executor_ref ex,
222 endpoint ep,
223 std::stop_token const& token,
224 std::error_code* ec)
225 {
226 4490x auto& op = conn_;
227
228 4490x sockaddr_storage storage{};
229 4490x socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
230 int result =
231 4490x ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
232
233 4490x if (result == 0)
234 {
235 sockaddr_storage local_storage{};
236 socklen_t local_len = sizeof(local_storage);
237 if (::getsockname(
238 this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
239 &local_len) == 0)
240 this->local_endpoint_ = from_sockaddr(local_storage);
241 remote_endpoint_ = ep;
242 }
243
244 4490x if (result == 0 || errno != EINPROGRESS)
245 {
246 int err = (result < 0) ? errno : 0;
247 if (this->svc_.scheduler().try_consume_inline_budget())
248 {
249 *ec = err ? make_err(err) : std::error_code{};
250 return dispatch_coro(ex, h);
251 }
252 op.reset();
253 op.h = h;
254 op.ex = ex;
255 op.ec_out = ec;
256 op.fd = this->fd_;
257 op.target_endpoint = ep;
258 op.start(token, static_cast<Derived*>(this));
259 op.impl_ptr = this->shared_from_this();
260 op.complete(err, 0);
261 this->svc_.post(&op);
262 return std::noop_coroutine();
263 }
264
265 // EINPROGRESS — register with reactor
266 4490x op.reset();
267 4490x op.h = h;
268 4490x op.ex = ex;
269 4490x op.ec_out = ec;
270 4490x op.fd = this->fd_;
271 4490x op.target_endpoint = ep;
272 4490x op.start(token, static_cast<Derived*>(this));
273 4490x op.impl_ptr = this->shared_from_this();
274
275 4490x this->register_op(
276 4490x op, this->desc_state_.connect_op, this->desc_state_.write_ready,
277 4490x this->desc_state_.connect_cancel_pending);
278 4490x return std::noop_coroutine();
279 }
280
281 template<
282 class Derived,
283 class Service,
284 class ConnOp,
285 class ReadOp,
286 class WriteOp,
287 class DescState>
288 std::coroutine_handle<>
289 221610x reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState>::
290 do_read_some(
291 std::coroutine_handle<> h,
292 capy::executor_ref ex,
293 buffer_param param,
294 std::stop_token const& token,
295 std::error_code* ec,
296 std::size_t* bytes_out)
297 {
298 221610x auto& op = rd_;
299 221610x op.reset();
300
301 221610x capy::mutable_buffer bufs[ReadOp::max_buffers];
302 221610x op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
303
304 221610x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
305 {
306 2x op.empty_buffer_read = true;
307 2x op.h = h;
308 2x op.ex = ex;
309 2x op.ec_out = ec;
310 2x op.bytes_out = bytes_out;
311 2x op.start(token, static_cast<Derived*>(this));
312 2x op.impl_ptr = this->shared_from_this();
313 2x op.complete(0, 0);
314 2x this->svc_.post(&op);
315 2x return std::noop_coroutine();
316 }
317
318 443216x for (int i = 0; i < op.iovec_count; ++i)
319 {
320 221608x op.iovecs[i].iov_base = bufs[i].data();
321 221608x op.iovecs[i].iov_len = bufs[i].size();
322 }
323
324 // Speculative read
325 ssize_t n;
326 do
327 {
328 221608x n = ::readv(this->fd_, op.iovecs, op.iovec_count);
329 }
330 221608x while (n < 0 && errno == EINTR);
331
332 221608x if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
333 {
334 221221x int err = (n < 0) ? errno : 0;
335 221221x auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
336
337 221221x if (this->svc_.scheduler().try_consume_inline_budget())
338 {
339 177011x if (err)
340 *ec = make_err(err);
341 177011x else if (n == 0)
342 10x *ec = capy::error::eof;
343 else
344 177001x *ec = {};
345 177011x *bytes_out = bytes;
346 177011x return dispatch_coro(ex, h);
347 }
348 44210x op.h = h;
349 44210x op.ex = ex;
350 44210x op.ec_out = ec;
351 44210x op.bytes_out = bytes_out;
352 44210x op.start(token, static_cast<Derived*>(this));
353 44210x op.impl_ptr = this->shared_from_this();
354 44210x op.complete(err, bytes);
355 44210x this->svc_.post(&op);
356 44210x return std::noop_coroutine();
357 }
358
359 // EAGAIN — register with reactor
360 387x op.h = h;
361 387x op.ex = ex;
362 387x op.ec_out = ec;
363 387x op.bytes_out = bytes_out;
364 387x op.fd = this->fd_;
365 387x op.start(token, static_cast<Derived*>(this));
366 387x op.impl_ptr = this->shared_from_this();
367
368 387x this->register_op(
369 387x op, this->desc_state_.read_op, this->desc_state_.read_ready,
370 387x this->desc_state_.read_cancel_pending);
371 387x return std::noop_coroutine();
372 }
373
374 template<
375 class Derived,
376 class Service,
377 class ConnOp,
378 class ReadOp,
379 class WriteOp,
380 class DescState>
381 std::coroutine_handle<>
382 221314x reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState>::
383 do_write_some(
384 std::coroutine_handle<> h,
385 capy::executor_ref ex,
386 buffer_param param,
387 std::stop_token const& token,
388 std::error_code* ec,
389 std::size_t* bytes_out)
390 {
391 221314x auto& op = wr_;
392 221314x op.reset();
393
394 221314x capy::mutable_buffer bufs[WriteOp::max_buffers];
395 221314x op.iovec_count =
396 221314x static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
397
398 221314x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
399 {
400 2x op.h = h;
401 2x op.ex = ex;
402 2x op.ec_out = ec;
403 2x op.bytes_out = bytes_out;
404 2x op.start(token, static_cast<Derived*>(this));
405 2x op.impl_ptr = this->shared_from_this();
406 2x op.complete(0, 0);
407 2x this->svc_.post(&op);
408 2x return std::noop_coroutine();
409 }
410
411 442624x for (int i = 0; i < op.iovec_count; ++i)
412 {
413 221312x op.iovecs[i].iov_base = bufs[i].data();
414 221312x op.iovecs[i].iov_len = bufs[i].size();
415 }
416
417 // Speculative write via backend-specific write policy
418 ssize_t n =
419 221312x WriteOp::write_policy::write(this->fd_, op.iovecs, op.iovec_count);
420
421 221312x if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
422 {
423 221312x int err = (n < 0) ? errno : 0;
424 221312x auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
425
426 221312x if (this->svc_.scheduler().try_consume_inline_budget())
427 {
428 177067x *ec = err ? make_err(err) : std::error_code{};
429 177067x *bytes_out = bytes;
430 177067x return dispatch_coro(ex, h);
431 }
432 44245x op.h = h;
433 44245x op.ex = ex;
434 44245x op.ec_out = ec;
435 44245x op.bytes_out = bytes_out;
436 44245x op.start(token, static_cast<Derived*>(this));
437 44245x op.impl_ptr = this->shared_from_this();
438 44245x op.complete(err, bytes);
439 44245x this->svc_.post(&op);
440 44245x return std::noop_coroutine();
441 }
442
443 // EAGAIN — register with reactor
444 op.h = h;
445 op.ex = ex;
446 op.ec_out = ec;
447 op.bytes_out = bytes_out;
448 op.fd = this->fd_;
449 op.start(token, static_cast<Derived*>(this));
450 op.impl_ptr = this->shared_from_this();
451
452 this->register_op(
453 op, this->desc_state_.write_op, this->desc_state_.write_ready,
454 this->desc_state_.write_cancel_pending);
455 return std::noop_coroutine();
456 }
457
458 } // namespace boost::corosio::detail
459
460 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
461