include/boost/corosio/native/detail/epoll/epoll_tcp_service.hpp

79.3% Lines (46/58) 85.7% List of functions (12/14)
f(x) Functions (14)
Function Calls Lines Blocks
boost::corosio::detail::epoll_tcp_service::epoll_tcp_service(boost::capy::execution_context&) :99 271x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::cancel() :112 0 0.0% 0.0% boost::corosio::detail::epoll_read_op::cancel() :121 98x 80.0% 75.0% boost::corosio::detail::epoll_write_op::cancel() :130 0 0.0% 0.0% boost::corosio::detail::epoll_op::operator()() :139 44943x 100.0% 100.0% boost::corosio::detail::epoll_connect_op::operator()() :145 2466x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::epoll_tcp_socket(boost::corosio::detail::epoll_tcp_service&) :150 7453x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::~epoll_tcp_socket() :155 7453x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :158 2466x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :169 112099x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :181 111947x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::cancel() :193 95x 100.0% 100.0% boost::corosio::detail::epoll_tcp_socket::close_socket() :199 22331x 100.0% 100.0% boost::corosio::detail::epoll_tcp_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :205 2481x 94.4% 94.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/tcp_service.hpp>
19
20 #include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22 #include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23
24 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25
26 #include <coroutine>
27
28 #include <errno.h>
29 #include <netinet/in.h>
30 #include <netinet/tcp.h>
31 #include <sys/epoll.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 /*
36 epoll Socket Implementation
37 ===========================
38
39 Each I/O operation follows the same pattern:
40 1. Try the syscall immediately (non-blocking socket)
41 2. If it succeeds or fails with a real error, post to completion queue
42 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43
44 This "try first" approach avoids unnecessary epoll round-trips for
45 operations that can complete immediately (common for small reads/writes
46 on fast local connections).
47
48 One-Shot Registration
49 ---------------------
50 We use one-shot epoll registration: each operation registers, waits for
51 one event, then unregisters. This simplifies the state machine since we
52 don't need to track whether an fd is currently registered or handle
53 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54 simplicity is worth it.
55
56 Cancellation
57 ------------
58 See op.hpp for the completion/cancellation race handling via the
59 `registered` atomic. cancel() must complete pending operations (post
60 them with cancelled flag) so coroutines waiting on them can resume.
61 close_socket() calls cancel() first to ensure this.
62
63 Impl Lifetime with shared_ptr
64 -----------------------------
65 Socket impls use enable_shared_from_this. The service owns impls via
66 shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67 removal. When a user calls close(), we call cancel() which posts pending
68 ops to the scheduler.
69
70 CRITICAL: The posted ops must keep the impl alive until they complete.
71 Otherwise the scheduler would process a freed op (use-after-free). The
72 cancel() method captures shared_from_this() into op.impl_ptr before
73 posting. When the op completes, impl_ptr is cleared, allowing the impl
74 to be destroyed if no other references exist.
75
76 Service Ownership
77 -----------------
78 epoll_tcp_service owns all socket impls. destroy_impl() removes the
79 shared_ptr from the map, but the impl may survive if ops still hold
80 impl_ptr refs. shutdown() closes all sockets and clears the map; any
81 in-flight ops will complete and release their refs.
82 */
83
84 namespace boost::corosio::detail {
85
86 /** epoll TCP service implementation.
87
88 Inherits from tcp_service to enable runtime polymorphism.
89 Uses key_type = tcp_service for service lookup.
90 */
91 class BOOST_COROSIO_DECL epoll_tcp_service final
92 : public reactor_socket_service<
93 epoll_tcp_service,
94 tcp_service,
95 epoll_scheduler,
96 epoll_tcp_socket>
97 {
98 public:
99 271x explicit epoll_tcp_service(capy::execution_context& ctx)
100 271x : reactor_socket_service(ctx)
101 {
102 271x }
103
104 std::error_code open_socket(
105 tcp_socket::implementation& impl,
106 int family,
107 int type,
108 int protocol) override;
109 };
110
111 inline void
112 epoll_connect_op::cancel() noexcept
113 {
114 if (socket_impl_)
115 socket_impl_->cancel_single_op(*this);
116 else
117 request_cancel();
118 }
119
120 inline void
121 98x epoll_read_op::cancel() noexcept
122 {
123 98x if (socket_impl_)
124 98x socket_impl_->cancel_single_op(*this);
125 else
126 request_cancel();
127 98x }
128
129 inline void
130 epoll_write_op::cancel() noexcept
131 {
132 if (socket_impl_)
133 socket_impl_->cancel_single_op(*this);
134 else
135 request_cancel();
136 }
137
138 inline void
139 44943x epoll_op::operator()()
140 {
141 44943x complete_io_op(*this);
142 44943x }
143
144 inline void
145 2466x epoll_connect_op::operator()()
146 {
147 2466x complete_connect_op(*this);
148 2466x }
149
150 7453x inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
151 7453x : reactor_stream_socket(svc)
152 {
153 7453x }
154
155 7453x inline epoll_tcp_socket::~epoll_tcp_socket() = default;
156
157 inline std::coroutine_handle<>
158 2466x epoll_tcp_socket::connect(
159 std::coroutine_handle<> h,
160 capy::executor_ref ex,
161 endpoint ep,
162 std::stop_token token,
163 std::error_code* ec)
164 {
165 2466x return do_connect(h, ex, ep, token, ec);
166 }
167
168 inline std::coroutine_handle<>
169 112099x epoll_tcp_socket::read_some(
170 std::coroutine_handle<> h,
171 capy::executor_ref ex,
172 buffer_param param,
173 std::stop_token token,
174 std::error_code* ec,
175 std::size_t* bytes_out)
176 {
177 112099x return do_read_some(h, ex, param, token, ec, bytes_out);
178 }
179
180 inline std::coroutine_handle<>
181 111947x epoll_tcp_socket::write_some(
182 std::coroutine_handle<> h,
183 capy::executor_ref ex,
184 buffer_param param,
185 std::stop_token token,
186 std::error_code* ec,
187 std::size_t* bytes_out)
188 {
189 111947x return do_write_some(h, ex, param, token, ec, bytes_out);
190 }
191
192 inline void
193 95x epoll_tcp_socket::cancel() noexcept
194 {
195 95x do_cancel();
196 95x }
197
198 inline void
199 22331x epoll_tcp_socket::close_socket() noexcept
200 {
201 22331x do_close_socket();
202 22331x }
203
204 inline std::error_code
205 2481x epoll_tcp_service::open_socket(
206 tcp_socket::implementation& impl, int family, int type, int protocol)
207 {
208 2481x auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
209 2481x epoll_impl->close_socket();
210
211 2481x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
212 2481x if (fd < 0)
213 return make_err(errno);
214
215 2481x if (family == AF_INET6)
216 {
217 5x int one = 1;
218 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
219 }
220
221 2481x epoll_impl->fd_ = fd;
222
223 // Register fd with epoll (edge-triggered mode)
224 2481x epoll_impl->desc_state_.fd = fd;
225 {
226 2481x std::lock_guard lock(epoll_impl->desc_state_.mutex);
227 2481x epoll_impl->desc_state_.read_op = nullptr;
228 2481x epoll_impl->desc_state_.write_op = nullptr;
229 2481x epoll_impl->desc_state_.connect_op = nullptr;
230 2481x }
231 2481x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
232
233 2481x return {};
234 }
235
236 } // namespace boost::corosio::detail
237
238 #endif // BOOST_COROSIO_HAS_EPOLL
239
240 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
241