TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/detail/tcp_service.hpp>
19 :
20 : #include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22 : #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23 :
24 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25 :
26 : #include <coroutine>
27 :
28 : #include <errno.h>
29 : #include <netinet/in.h>
30 : #include <netinet/tcp.h>
31 : #include <sys/epoll.h>
32 : #include <sys/socket.h>
33 : #include <unistd.h>
34 :
35 : /*
36 : epoll Socket Implementation
37 : ===========================
38 :
39 : Each I/O operation follows the same pattern:
40 : 1. Try the syscall immediately (non-blocking socket)
41 : 2. If it succeeds or fails with a real error, post to completion queue
42 : 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43 :
44 : This "try first" approach avoids unnecessary epoll round-trips for
45 : operations that can complete immediately (common for small reads/writes
46 : on fast local connections).
47 :
48 : One-Shot Registration
49 : ---------------------
50 : We use one-shot epoll registration: each operation registers, waits for
51 : one event, then unregisters. This simplifies the state machine since we
52 : don't need to track whether an fd is currently registered or handle
53 : re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54 : simplicity is worth it.
55 :
56 : Cancellation
57 : ------------
58 : See op.hpp for the completion/cancellation race handling via the
59 : `registered` atomic. cancel() must complete pending operations (post
60 : them with cancelled flag) so coroutines waiting on them can resume.
61 : close_socket() calls cancel() first to ensure this.
62 :
63 : Impl Lifetime with shared_ptr
64 : -----------------------------
65 : Socket impls use enable_shared_from_this. The service owns impls via
66 : shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67 : removal. When a user calls close(), we call cancel() which posts pending
68 : ops to the scheduler.
69 :
70 : CRITICAL: The posted ops must keep the impl alive until they complete.
71 : Otherwise the scheduler would process a freed op (use-after-free). The
72 : cancel() method captures shared_from_this() into op.impl_ptr before
73 : posting. When the op completes, impl_ptr is cleared, allowing the impl
74 : to be destroyed if no other references exist.
75 :
76 : Service Ownership
77 : -----------------
78 : epoll_tcp_service owns all socket impls. destroy_impl() removes the
79 : shared_ptr from the map, but the impl may survive if ops still hold
80 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
81 : in-flight ops will complete and release their refs.
82 : */
83 :
84 : namespace boost::corosio::detail {
85 :
86 : /** epoll TCP service implementation.
87 :
88 : Inherits from tcp_service to enable runtime polymorphism.
89 : Uses key_type = tcp_service for service lookup.
90 : */
91 : class BOOST_COROSIO_DECL epoll_tcp_service final
92 : : public reactor_socket_service<
93 : epoll_tcp_service,
94 : tcp_service,
95 : epoll_scheduler,
96 : epoll_tcp_socket>
97 : {
98 : public:
99 HIT 271 : explicit epoll_tcp_service(capy::execution_context& ctx)
100 271 : : reactor_socket_service(ctx)
101 : {
102 271 : }
103 :
104 : std::error_code open_socket(
105 : tcp_socket::implementation& impl,
106 : int family,
107 : int type,
108 : int protocol) override;
109 : };
110 :
111 : inline void
112 MIS 0 : epoll_connect_op::cancel() noexcept
113 : {
114 0 : if (socket_impl_)
115 0 : socket_impl_->cancel_single_op(*this);
116 : else
117 0 : request_cancel();
118 0 : }
119 :
120 : inline void
121 HIT 98 : epoll_read_op::cancel() noexcept
122 : {
123 98 : if (socket_impl_)
124 98 : socket_impl_->cancel_single_op(*this);
125 : else
126 MIS 0 : request_cancel();
127 HIT 98 : }
128 :
129 : inline void
130 MIS 0 : epoll_write_op::cancel() noexcept
131 : {
132 0 : if (socket_impl_)
133 0 : socket_impl_->cancel_single_op(*this);
134 : else
135 0 : request_cancel();
136 0 : }
137 :
138 : inline void
139 HIT 44943 : epoll_op::operator()()
140 : {
141 44943 : complete_io_op(*this);
142 44943 : }
143 :
144 : inline void
145 2466 : epoll_connect_op::operator()()
146 : {
147 2466 : complete_connect_op(*this);
148 2466 : }
149 :
150 7453 : inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
151 7453 : : reactor_stream_socket(svc)
152 : {
153 7453 : }
154 :
155 7453 : inline epoll_tcp_socket::~epoll_tcp_socket() = default;
156 :
157 : inline std::coroutine_handle<>
158 2466 : epoll_tcp_socket::connect(
159 : std::coroutine_handle<> h,
160 : capy::executor_ref ex,
161 : endpoint ep,
162 : std::stop_token token,
163 : std::error_code* ec)
164 : {
165 2466 : return do_connect(h, ex, ep, token, ec);
166 : }
167 :
168 : inline std::coroutine_handle<>
169 112099 : epoll_tcp_socket::read_some(
170 : std::coroutine_handle<> h,
171 : capy::executor_ref ex,
172 : buffer_param param,
173 : std::stop_token token,
174 : std::error_code* ec,
175 : std::size_t* bytes_out)
176 : {
177 112099 : return do_read_some(h, ex, param, token, ec, bytes_out);
178 : }
179 :
180 : inline std::coroutine_handle<>
181 111947 : epoll_tcp_socket::write_some(
182 : std::coroutine_handle<> h,
183 : capy::executor_ref ex,
184 : buffer_param param,
185 : std::stop_token token,
186 : std::error_code* ec,
187 : std::size_t* bytes_out)
188 : {
189 111947 : return do_write_some(h, ex, param, token, ec, bytes_out);
190 : }
191 :
192 : inline void
193 95 : epoll_tcp_socket::cancel() noexcept
194 : {
195 95 : do_cancel();
196 95 : }
197 :
198 : inline void
199 22331 : epoll_tcp_socket::close_socket() noexcept
200 : {
201 22331 : do_close_socket();
202 22331 : }
203 :
204 : inline std::error_code
205 2481 : epoll_tcp_service::open_socket(
206 : tcp_socket::implementation& impl, int family, int type, int protocol)
207 : {
208 2481 : auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
209 2481 : epoll_impl->close_socket();
210 :
211 2481 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
212 2481 : if (fd < 0)
213 MIS 0 : return make_err(errno);
214 :
215 HIT 2481 : if (family == AF_INET6)
216 : {
217 5 : int one = 1;
218 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
219 : }
220 :
221 2481 : epoll_impl->fd_ = fd;
222 :
223 : // Register fd with epoll (edge-triggered mode)
224 2481 : epoll_impl->desc_state_.fd = fd;
225 : {
226 2481 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
227 2481 : epoll_impl->desc_state_.read_op = nullptr;
228 2481 : epoll_impl->desc_state_.write_op = nullptr;
229 2481 : epoll_impl->desc_state_.connect_op = nullptr;
230 2481 : }
231 2481 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
232 :
233 2481 : return {};
234 : }
235 :
236 : } // namespace boost::corosio::detail
237 :
238 : #endif // BOOST_COROSIO_HAS_EPOLL
239 :
240 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
|