TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/detail/tcp_service.hpp>
19 :
20 : #include <boost/corosio/native/detail/select/select_tcp_socket.hpp>
21 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
22 : #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23 :
24 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25 :
26 : #include <coroutine>
27 : #include <mutex>
28 :
29 : #include <errno.h>
30 : #include <fcntl.h>
31 : #include <netinet/in.h>
32 : #include <netinet/tcp.h>
33 : #include <sys/select.h>
34 : #include <sys/socket.h>
35 : #include <unistd.h>
36 :
37 : /*
38 : Each I/O op tries the syscall speculatively; only registers with
39 : the reactor on EAGAIN. Fd is registered once at open time and
40 : stays registered until close. The reactor only marks ready_events_;
41 : actual I/O happens in invoke_deferred_io(). cancel() captures
42 : shared_from_this() into op.impl_ptr to keep the impl alive.
43 : */
44 :
45 : namespace boost::corosio::detail {
46 :
47 : /** select TCP service implementation.
48 :
49 : Inherits from tcp_service to enable runtime polymorphism.
50 : Uses key_type = tcp_service for service lookup.
51 : */
52 : class BOOST_COROSIO_DECL select_tcp_service final
53 : : public reactor_socket_service<
54 : select_tcp_service,
55 : tcp_service,
56 : select_scheduler,
57 : select_tcp_socket>
58 : {
59 : public:
60 HIT 195 : explicit select_tcp_service(capy::execution_context& ctx)
61 195 : : reactor_socket_service(ctx)
62 : {
63 195 : }
64 :
65 : std::error_code open_socket(
66 : tcp_socket::implementation& impl,
67 : int family,
68 : int type,
69 : int protocol) override;
70 : };
71 :
72 : inline void
73 MIS 0 : select_connect_op::cancel() noexcept
74 : {
75 0 : if (socket_impl_)
76 0 : socket_impl_->cancel_single_op(*this);
77 : else
78 0 : request_cancel();
79 0 : }
80 :
81 : inline void
82 HIT 93 : select_read_op::cancel() noexcept
83 : {
84 93 : if (socket_impl_)
85 93 : socket_impl_->cancel_single_op(*this);
86 : else
87 MIS 0 : request_cancel();
88 HIT 93 : }
89 :
90 : inline void
91 MIS 0 : select_write_op::cancel() noexcept
92 : {
93 0 : if (socket_impl_)
94 0 : socket_impl_->cancel_single_op(*this);
95 : else
96 0 : request_cancel();
97 0 : }
98 :
99 : inline void
100 HIT 43903 : select_op::operator()()
101 : {
102 43903 : complete_io_op(*this);
103 43903 : }
104 :
105 : inline void
106 2024 : select_connect_op::operator()()
107 : {
108 2024 : complete_connect_op(*this);
109 2024 : }
110 :
111 6090 : inline select_tcp_socket::select_tcp_socket(select_tcp_service& svc) noexcept
112 6090 : : reactor_stream_socket(svc)
113 : {
114 6090 : }
115 :
116 6090 : inline select_tcp_socket::~select_tcp_socket() = default;
117 :
118 : inline std::coroutine_handle<>
119 2024 : select_tcp_socket::connect(
120 : std::coroutine_handle<> h,
121 : capy::executor_ref ex,
122 : endpoint ep,
123 : std::stop_token token,
124 : std::error_code* ec)
125 : {
126 2024 : auto result = do_connect(h, ex, ep, token, ec);
127 : // Rebuild fd_sets so select() watches for writability
128 2024 : if (result == std::noop_coroutine())
129 2024 : svc_.scheduler().notify_reactor();
130 2024 : return result;
131 : }
132 :
133 : inline std::coroutine_handle<>
134 109511 : select_tcp_socket::read_some(
135 : std::coroutine_handle<> h,
136 : capy::executor_ref ex,
137 : buffer_param param,
138 : std::stop_token token,
139 : std::error_code* ec,
140 : std::size_t* bytes_out)
141 : {
142 109511 : return do_read_some(h, ex, param, token, ec, bytes_out);
143 : }
144 :
145 : inline std::coroutine_handle<>
146 109367 : select_tcp_socket::write_some(
147 : std::coroutine_handle<> h,
148 : capy::executor_ref ex,
149 : buffer_param param,
150 : std::stop_token token,
151 : std::error_code* ec,
152 : std::size_t* bytes_out)
153 : {
154 109367 : auto result = do_write_some(h, ex, param, token, ec, bytes_out);
155 : // Rebuild fd_sets so select() watches for writability
156 109367 : if (result == std::noop_coroutine())
157 21866 : svc_.scheduler().notify_reactor();
158 109367 : return result;
159 : }
160 :
161 : inline void
162 90 : select_tcp_socket::cancel() noexcept
163 : {
164 90 : do_cancel();
165 90 : }
166 :
167 : inline void
168 18279 : select_tcp_socket::close_socket() noexcept
169 : {
170 18279 : do_close_socket();
171 18279 : }
172 :
173 : inline std::error_code
174 2039 : select_tcp_service::open_socket(
175 : tcp_socket::implementation& impl, int family, int type, int protocol)
176 : {
177 2039 : auto* select_impl = static_cast<select_tcp_socket*>(&impl);
178 2039 : select_impl->close_socket();
179 :
180 2039 : int fd = ::socket(family, type, protocol);
181 2039 : if (fd < 0)
182 MIS 0 : return make_err(errno);
183 :
184 HIT 2039 : if (family == AF_INET6)
185 : {
186 5 : int one = 1;
187 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
188 : }
189 :
190 2039 : int flags = ::fcntl(fd, F_GETFL, 0);
191 2039 : if (flags == -1)
192 : {
193 MIS 0 : int errn = errno;
194 0 : ::close(fd);
195 0 : return make_err(errn);
196 : }
197 HIT 2039 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
198 : {
199 MIS 0 : int errn = errno;
200 0 : ::close(fd);
201 0 : return make_err(errn);
202 : }
203 HIT 2039 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
204 : {
205 MIS 0 : int errn = errno;
206 0 : ::close(fd);
207 0 : return make_err(errn);
208 : }
209 :
210 HIT 2039 : if (fd >= FD_SETSIZE)
211 : {
212 MIS 0 : ::close(fd);
213 0 : return make_err(EMFILE);
214 : }
215 :
216 : #ifdef SO_NOSIGPIPE
217 : {
218 : int one = 1;
219 : ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
220 : }
221 : #endif
222 :
223 HIT 2039 : select_impl->fd_ = fd;
224 :
225 2039 : select_impl->desc_state_.fd = fd;
226 : {
227 2039 : std::lock_guard lock(select_impl->desc_state_.mutex);
228 2039 : select_impl->desc_state_.read_op = nullptr;
229 2039 : select_impl->desc_state_.write_op = nullptr;
230 2039 : select_impl->desc_state_.connect_op = nullptr;
231 2039 : }
232 2039 : scheduler().register_descriptor(fd, &select_impl->desc_state_);
233 :
234 2039 : return {};
235 : }
236 :
237 : } // namespace boost::corosio::detail
238 :
239 : #endif // BOOST_COROSIO_HAS_SELECT
240 :
241 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
|