TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/tcp_acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_tcp_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_tcp_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25 :
26 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27 :
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <netinet/in.h>
34 : #include <sys/epoll.h>
35 : #include <sys/socket.h>
36 : #include <unistd.h>
37 :
38 : namespace boost::corosio::detail {
39 :
40 : /// State for epoll acceptor service.
41 : using epoll_tcp_acceptor_state =
42 : reactor_service_state<epoll_scheduler, epoll_tcp_acceptor>;
43 :
44 : /** epoll acceptor service implementation.
45 :
46 : Inherits from tcp_acceptor_service to enable runtime polymorphism.
47 : Uses key_type = tcp_acceptor_service for service lookup.
48 : */
49 : class BOOST_COROSIO_DECL epoll_tcp_acceptor_service final
50 : : public tcp_acceptor_service
51 : {
52 : public:
53 : explicit epoll_tcp_acceptor_service(capy::execution_context& ctx);
54 : ~epoll_tcp_acceptor_service() override;
55 :
56 : epoll_tcp_acceptor_service(epoll_tcp_acceptor_service const&) = delete;
57 : epoll_tcp_acceptor_service&
58 : operator=(epoll_tcp_acceptor_service const&) = delete;
59 :
60 : void shutdown() override;
61 :
62 : io_object::implementation* construct() override;
63 : void destroy(io_object::implementation*) override;
64 : void close(io_object::handle&) override;
65 : std::error_code open_acceptor_socket(
66 : tcp_acceptor::implementation& impl,
67 : int family,
68 : int type,
69 : int protocol) override;
70 : std::error_code
71 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
72 : std::error_code
73 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
74 :
75 HIT 152 : epoll_scheduler& scheduler() const noexcept
76 : {
77 152 : return state_->sched_;
78 : }
79 : void post(scheduler_op* op);
80 : void work_started() noexcept;
81 : void work_finished() noexcept;
82 :
83 : /** Get the TCP service for creating peer sockets during accept. */
84 : epoll_tcp_service* tcp_service() const noexcept;
85 :
86 : private:
87 : capy::execution_context& ctx_;
88 : std::unique_ptr<epoll_tcp_acceptor_state> state_;
89 : };
90 :
91 : inline void
92 6 : epoll_accept_op::cancel() noexcept
93 : {
94 6 : if (acceptor_impl_)
95 6 : acceptor_impl_->cancel_single_op(*this);
96 : else
97 MIS 0 : request_cancel();
98 HIT 6 : }
99 :
100 : inline void
101 2472 : epoll_accept_op::operator()()
102 : {
103 2472 : complete_accept_op<epoll_tcp_socket>(*this);
104 2472 : }
105 :
106 80 : inline epoll_tcp_acceptor::epoll_tcp_acceptor(
107 80 : epoll_tcp_acceptor_service& svc) noexcept
108 80 : : reactor_acceptor(svc)
109 : {
110 80 : }
111 :
112 : inline std::coroutine_handle<>
113 2472 : epoll_tcp_acceptor::accept(
114 : std::coroutine_handle<> h,
115 : capy::executor_ref ex,
116 : std::stop_token token,
117 : std::error_code* ec,
118 : io_object::implementation** impl_out)
119 : {
120 2472 : auto& op = acc_;
121 2472 : op.reset();
122 2472 : op.h = h;
123 2472 : op.ex = ex;
124 2472 : op.ec_out = ec;
125 2472 : op.impl_out = impl_out;
126 2472 : op.fd = fd_;
127 2472 : op.start(token, this);
128 :
129 2472 : sockaddr_storage peer_storage{};
130 2472 : socklen_t addrlen = sizeof(peer_storage);
131 : int accepted;
132 : do
133 : {
134 2472 : accepted = ::accept4(
135 : fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
136 : SOCK_NONBLOCK | SOCK_CLOEXEC);
137 : }
138 2472 : while (accepted < 0 && errno == EINTR);
139 :
140 2472 : if (accepted >= 0)
141 : {
142 : {
143 2 : std::lock_guard lock(desc_state_.mutex);
144 2 : desc_state_.read_ready = false;
145 2 : }
146 :
147 2 : if (svc_.scheduler().try_consume_inline_budget())
148 : {
149 MIS 0 : auto* socket_svc = svc_.tcp_service();
150 0 : if (socket_svc)
151 : {
152 : auto& impl =
153 0 : static_cast<epoll_tcp_socket&>(*socket_svc->construct());
154 0 : impl.set_socket(accepted);
155 :
156 0 : impl.desc_state_.fd = accepted;
157 : {
158 0 : std::lock_guard lock(impl.desc_state_.mutex);
159 0 : impl.desc_state_.read_op = nullptr;
160 0 : impl.desc_state_.write_op = nullptr;
161 0 : impl.desc_state_.connect_op = nullptr;
162 0 : }
163 0 : socket_svc->scheduler().register_descriptor(
164 : accepted, &impl.desc_state_);
165 :
166 0 : impl.set_endpoints(
167 : local_endpoint_, from_sockaddr(peer_storage));
168 :
169 0 : *ec = {};
170 0 : if (impl_out)
171 0 : *impl_out = &impl;
172 : }
173 : else
174 : {
175 0 : ::close(accepted);
176 0 : *ec = make_err(ENOENT);
177 0 : if (impl_out)
178 0 : *impl_out = nullptr;
179 : }
180 0 : return dispatch_coro(ex, h);
181 : }
182 :
183 HIT 2 : op.accepted_fd = accepted;
184 2 : op.peer_storage = peer_storage;
185 2 : op.complete(0, 0);
186 2 : op.impl_ptr = shared_from_this();
187 2 : svc_.post(&op);
188 2 : return std::noop_coroutine();
189 : }
190 :
191 2470 : if (errno == EAGAIN || errno == EWOULDBLOCK)
192 : {
193 2470 : op.impl_ptr = shared_from_this();
194 2470 : svc_.work_started();
195 :
196 2470 : std::lock_guard lock(desc_state_.mutex);
197 2470 : bool io_done = false;
198 2470 : if (desc_state_.read_ready)
199 : {
200 MIS 0 : desc_state_.read_ready = false;
201 0 : op.perform_io();
202 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
203 0 : if (!io_done)
204 0 : op.errn = 0;
205 : }
206 :
207 HIT 2470 : if (io_done || op.cancelled.load(std::memory_order_acquire))
208 : {
209 MIS 0 : svc_.post(&op);
210 0 : svc_.work_finished();
211 : }
212 : else
213 : {
214 HIT 2470 : desc_state_.read_op = &op;
215 : }
216 2470 : return std::noop_coroutine();
217 2470 : }
218 :
219 MIS 0 : op.complete(errno, 0);
220 0 : op.impl_ptr = shared_from_this();
221 0 : svc_.post(&op);
222 : // completion is always posted to scheduler queue, never inline.
223 0 : return std::noop_coroutine();
224 : }
225 :
226 : inline void
227 HIT 2 : epoll_tcp_acceptor::cancel() noexcept
228 : {
229 2 : do_cancel();
230 2 : }
231 :
232 : inline void
233 318 : epoll_tcp_acceptor::close_socket() noexcept
234 : {
235 318 : do_close_socket();
236 318 : }
237 :
238 271 : inline epoll_tcp_acceptor_service::epoll_tcp_acceptor_service(
239 271 : capy::execution_context& ctx)
240 271 : : ctx_(ctx)
241 271 : , state_(
242 : std::make_unique<epoll_tcp_acceptor_state>(
243 271 : ctx.use_service<epoll_scheduler>()))
244 : {
245 271 : }
246 :
247 542 : inline epoll_tcp_acceptor_service::~epoll_tcp_acceptor_service() {}
248 :
249 : inline void
250 271 : epoll_tcp_acceptor_service::shutdown()
251 : {
252 271 : std::lock_guard lock(state_->mutex_);
253 :
254 271 : while (auto* impl = state_->impl_list_.pop_front())
255 MIS 0 : impl->close_socket();
256 :
257 : // Don't clear impl_ptrs_ here — same rationale as
258 : // epoll_tcp_service::shutdown(). Let ~state_ release ptrs
259 : // after scheduler shutdown has drained all queued ops.
260 HIT 271 : }
261 :
262 : inline io_object::implementation*
263 80 : epoll_tcp_acceptor_service::construct()
264 : {
265 80 : auto impl = std::make_shared<epoll_tcp_acceptor>(*this);
266 80 : auto* raw = impl.get();
267 :
268 80 : std::lock_guard lock(state_->mutex_);
269 80 : state_->impl_ptrs_.emplace(raw, std::move(impl));
270 80 : state_->impl_list_.push_back(raw);
271 :
272 80 : return raw;
273 80 : }
274 :
275 : inline void
276 80 : epoll_tcp_acceptor_service::destroy(io_object::implementation* impl)
277 : {
278 80 : auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(impl);
279 80 : epoll_impl->close_socket();
280 80 : std::lock_guard lock(state_->mutex_);
281 80 : state_->impl_list_.remove(epoll_impl);
282 80 : state_->impl_ptrs_.erase(epoll_impl);
283 80 : }
284 :
285 : inline void
286 159 : epoll_tcp_acceptor_service::close(io_object::handle& h)
287 : {
288 159 : static_cast<epoll_tcp_acceptor*>(h.get())->close_socket();
289 159 : }
290 :
291 : inline std::error_code
292 79 : epoll_tcp_acceptor_service::open_acceptor_socket(
293 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
294 : {
295 79 : auto* epoll_impl = static_cast<epoll_tcp_acceptor*>(&impl);
296 79 : epoll_impl->close_socket();
297 :
298 79 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
299 79 : if (fd < 0)
300 MIS 0 : return make_err(errno);
301 :
302 HIT 79 : if (family == AF_INET6)
303 : {
304 8 : int val = 0; // dual-stack default
305 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
306 : }
307 :
308 79 : epoll_impl->fd_ = fd;
309 :
310 : // Set up descriptor state but do NOT register with epoll yet
311 79 : epoll_impl->desc_state_.fd = fd;
312 : {
313 79 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
314 79 : epoll_impl->desc_state_.read_op = nullptr;
315 79 : }
316 :
317 79 : return {};
318 : }
319 :
320 : inline std::error_code
321 78 : epoll_tcp_acceptor_service::bind_acceptor(
322 : tcp_acceptor::implementation& impl, endpoint ep)
323 : {
324 78 : return static_cast<epoll_tcp_acceptor*>(&impl)->do_bind(ep);
325 : }
326 :
327 : inline std::error_code
328 75 : epoll_tcp_acceptor_service::listen_acceptor(
329 : tcp_acceptor::implementation& impl, int backlog)
330 : {
331 75 : return static_cast<epoll_tcp_acceptor*>(&impl)->do_listen(backlog);
332 : }
333 :
334 : inline void
335 11 : epoll_tcp_acceptor_service::post(scheduler_op* op)
336 : {
337 11 : state_->sched_.post(op);
338 11 : }
339 :
340 : inline void
341 2470 : epoll_tcp_acceptor_service::work_started() noexcept
342 : {
343 2470 : state_->sched_.work_started();
344 2470 : }
345 :
346 : inline void
347 9 : epoll_tcp_acceptor_service::work_finished() noexcept
348 : {
349 9 : state_->sched_.work_finished();
350 9 : }
351 :
352 : inline epoll_tcp_service*
353 2463 : epoll_tcp_acceptor_service::tcp_service() const noexcept
354 : {
355 2463 : auto* svc = ctx_.find_service<detail::tcp_service>();
356 2463 : return svc ? dynamic_cast<epoll_tcp_service*>(svc) : nullptr;
357 : }
358 :
359 : } // namespace boost::corosio::detail
360 :
361 : #endif // BOOST_COROSIO_HAS_EPOLL
362 :
363 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_ACCEPTOR_SERVICE_HPP
|