1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/corosio/detail/tcp_service.hpp>
18  
#include <boost/corosio/detail/tcp_service.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
20  
#include <boost/corosio/native/detail/epoll/epoll_tcp_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
22  
#include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23  

23  

24  
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
24  
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25  

25  

26  
#include <coroutine>
26  
#include <coroutine>
27  

27  

28  
#include <errno.h>
28  
#include <errno.h>
29  
#include <netinet/in.h>
29  
#include <netinet/in.h>
30  
#include <netinet/tcp.h>
30  
#include <netinet/tcp.h>
31  
#include <sys/epoll.h>
31  
#include <sys/epoll.h>
32  
#include <sys/socket.h>
32  
#include <sys/socket.h>
33  
#include <unistd.h>
33  
#include <unistd.h>
34  

34  

35  
/*
35  
/*
36  
    epoll Socket Implementation
36  
    epoll Socket Implementation
37  
    ===========================
37  
    ===========================
38  

38  

39  
    Each I/O operation follows the same pattern:
39  
    Each I/O operation follows the same pattern:
40  
      1. Try the syscall immediately (non-blocking socket)
40  
      1. Try the syscall immediately (non-blocking socket)
41  
      2. If it succeeds or fails with a real error, post to completion queue
41  
      2. If it succeeds or fails with a real error, post to completion queue
42  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
42  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
43  

43  

44  
    This "try first" approach avoids unnecessary epoll round-trips for
44  
    This "try first" approach avoids unnecessary epoll round-trips for
45  
    operations that can complete immediately (common for small reads/writes
45  
    operations that can complete immediately (common for small reads/writes
46  
    on fast local connections).
46  
    on fast local connections).
47  

47  

48  
    One-Shot Registration
48  
    One-Shot Registration
49  
    ---------------------
49  
    ---------------------
50  
    We use one-shot epoll registration: each operation registers, waits for
50  
    We use one-shot epoll registration: each operation registers, waits for
51  
    one event, then unregisters. This simplifies the state machine since we
51  
    one event, then unregisters. This simplifies the state machine since we
52  
    don't need to track whether an fd is currently registered or handle
52  
    don't need to track whether an fd is currently registered or handle
53  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
53  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
54  
    simplicity is worth it.
54  
    simplicity is worth it.
55  

55  

56  
    Cancellation
56  
    Cancellation
57  
    ------------
57  
    ------------
58  
    See op.hpp for the completion/cancellation race handling via the
58  
    See op.hpp for the completion/cancellation race handling via the
59  
    `registered` atomic. cancel() must complete pending operations (post
59  
    `registered` atomic. cancel() must complete pending operations (post
60  
    them with cancelled flag) so coroutines waiting on them can resume.
60  
    them with cancelled flag) so coroutines waiting on them can resume.
61  
    close_socket() calls cancel() first to ensure this.
61  
    close_socket() calls cancel() first to ensure this.
62  

62  

63  
    Impl Lifetime with shared_ptr
63  
    Impl Lifetime with shared_ptr
64  
    -----------------------------
64  
    -----------------------------
65  
    Socket impls use enable_shared_from_this. The service owns impls via
65  
    Socket impls use enable_shared_from_this. The service owns impls via
66  
    shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
66  
    shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
67  
    removal. When a user calls close(), we call cancel() which posts pending
67  
    removal. When a user calls close(), we call cancel() which posts pending
68  
    ops to the scheduler.
68  
    ops to the scheduler.
69  

69  

70  
    CRITICAL: The posted ops must keep the impl alive until they complete.
70  
    CRITICAL: The posted ops must keep the impl alive until they complete.
71  
    Otherwise the scheduler would process a freed op (use-after-free). The
71  
    Otherwise the scheduler would process a freed op (use-after-free). The
72  
    cancel() method captures shared_from_this() into op.impl_ptr before
72  
    cancel() method captures shared_from_this() into op.impl_ptr before
73  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
73  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
74  
    to be destroyed if no other references exist.
74  
    to be destroyed if no other references exist.
75  

75  

76  
    Service Ownership
76  
    Service Ownership
77  
    -----------------
77  
    -----------------
78  
    epoll_tcp_service owns all socket impls. destroy_impl() removes the
78  
    epoll_tcp_service owns all socket impls. destroy_impl() removes the
79  
    shared_ptr from the map, but the impl may survive if ops still hold
79  
    shared_ptr from the map, but the impl may survive if ops still hold
80  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
80  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
81  
    in-flight ops will complete and release their refs.
81  
    in-flight ops will complete and release their refs.
82  
*/
82  
*/
83  

83  

84  
namespace boost::corosio::detail {
84  
namespace boost::corosio::detail {
85  

85  

86  
/** epoll TCP service implementation.
86  
/** epoll TCP service implementation.
87  

87  

88  
    Inherits from tcp_service to enable runtime polymorphism.
88  
    Inherits from tcp_service to enable runtime polymorphism.
89  
    Uses key_type = tcp_service for service lookup.
89  
    Uses key_type = tcp_service for service lookup.
90  
*/
90  
*/
91  
class BOOST_COROSIO_DECL epoll_tcp_service final
91  
class BOOST_COROSIO_DECL epoll_tcp_service final
92  
    : public reactor_socket_service<
92  
    : public reactor_socket_service<
93  
          epoll_tcp_service,
93  
          epoll_tcp_service,
94  
          tcp_service,
94  
          tcp_service,
95  
          epoll_scheduler,
95  
          epoll_scheduler,
96  
          epoll_tcp_socket>
96  
          epoll_tcp_socket>
97  
{
97  
{
98  
public:
98  
public:
99  
    explicit epoll_tcp_service(capy::execution_context& ctx)
99  
    explicit epoll_tcp_service(capy::execution_context& ctx)
100  
        : reactor_socket_service(ctx)
100  
        : reactor_socket_service(ctx)
101  
    {
101  
    {
102  
    }
102  
    }
103  

103  

104  
    std::error_code open_socket(
104  
    std::error_code open_socket(
105  
        tcp_socket::implementation& impl,
105  
        tcp_socket::implementation& impl,
106  
        int family,
106  
        int family,
107  
        int type,
107  
        int type,
108  
        int protocol) override;
108  
        int protocol) override;
109  
};
109  
};
110  

110  

111  
inline void
111  
inline void
112  
epoll_connect_op::cancel() noexcept
112  
epoll_connect_op::cancel() noexcept
113  
{
113  
{
114  
    if (socket_impl_)
114  
    if (socket_impl_)
115  
        socket_impl_->cancel_single_op(*this);
115  
        socket_impl_->cancel_single_op(*this);
116  
    else
116  
    else
117  
        request_cancel();
117  
        request_cancel();
118  
}
118  
}
119  

119  

120  
inline void
120  
inline void
121  
epoll_read_op::cancel() noexcept
121  
epoll_read_op::cancel() noexcept
122  
{
122  
{
123  
    if (socket_impl_)
123  
    if (socket_impl_)
124  
        socket_impl_->cancel_single_op(*this);
124  
        socket_impl_->cancel_single_op(*this);
125  
    else
125  
    else
126  
        request_cancel();
126  
        request_cancel();
127  
}
127  
}
128  

128  

129  
inline void
129  
inline void
130  
epoll_write_op::cancel() noexcept
130  
epoll_write_op::cancel() noexcept
131  
{
131  
{
132  
    if (socket_impl_)
132  
    if (socket_impl_)
133  
        socket_impl_->cancel_single_op(*this);
133  
        socket_impl_->cancel_single_op(*this);
134  
    else
134  
    else
135  
        request_cancel();
135  
        request_cancel();
136  
}
136  
}
137  

137  

138  
inline void
138  
inline void
139  
epoll_op::operator()()
139  
epoll_op::operator()()
140  
{
140  
{
141  
    complete_io_op(*this);
141  
    complete_io_op(*this);
142  
}
142  
}
143  

143  

144  
inline void
144  
inline void
145  
epoll_connect_op::operator()()
145  
epoll_connect_op::operator()()
146  
{
146  
{
147  
    complete_connect_op(*this);
147  
    complete_connect_op(*this);
148  
}
148  
}
149  

149  

150  
inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
150  
inline epoll_tcp_socket::epoll_tcp_socket(epoll_tcp_service& svc) noexcept
151  
    : reactor_stream_socket(svc)
151  
    : reactor_stream_socket(svc)
152  
{
152  
{
153  
}
153  
}
154  

154  

155  
inline epoll_tcp_socket::~epoll_tcp_socket() = default;
155  
inline epoll_tcp_socket::~epoll_tcp_socket() = default;
156  

156  

157  
inline std::coroutine_handle<>
157  
inline std::coroutine_handle<>
158  
epoll_tcp_socket::connect(
158  
epoll_tcp_socket::connect(
159  
    std::coroutine_handle<> h,
159  
    std::coroutine_handle<> h,
160  
    capy::executor_ref ex,
160  
    capy::executor_ref ex,
161  
    endpoint ep,
161  
    endpoint ep,
162  
    std::stop_token token,
162  
    std::stop_token token,
163  
    std::error_code* ec)
163  
    std::error_code* ec)
164  
{
164  
{
165  
    return do_connect(h, ex, ep, token, ec);
165  
    return do_connect(h, ex, ep, token, ec);
166  
}
166  
}
167  

167  

168  
inline std::coroutine_handle<>
168  
inline std::coroutine_handle<>
169  
epoll_tcp_socket::read_some(
169  
epoll_tcp_socket::read_some(
170  
    std::coroutine_handle<> h,
170  
    std::coroutine_handle<> h,
171  
    capy::executor_ref ex,
171  
    capy::executor_ref ex,
172  
    buffer_param param,
172  
    buffer_param param,
173  
    std::stop_token token,
173  
    std::stop_token token,
174  
    std::error_code* ec,
174  
    std::error_code* ec,
175  
    std::size_t* bytes_out)
175  
    std::size_t* bytes_out)
176  
{
176  
{
177  
    return do_read_some(h, ex, param, token, ec, bytes_out);
177  
    return do_read_some(h, ex, param, token, ec, bytes_out);
178  
}
178  
}
179  

179  

180  
inline std::coroutine_handle<>
180  
inline std::coroutine_handle<>
181  
epoll_tcp_socket::write_some(
181  
epoll_tcp_socket::write_some(
182  
    std::coroutine_handle<> h,
182  
    std::coroutine_handle<> h,
183  
    capy::executor_ref ex,
183  
    capy::executor_ref ex,
184  
    buffer_param param,
184  
    buffer_param param,
185  
    std::stop_token token,
185  
    std::stop_token token,
186  
    std::error_code* ec,
186  
    std::error_code* ec,
187  
    std::size_t* bytes_out)
187  
    std::size_t* bytes_out)
188  
{
188  
{
189  
    return do_write_some(h, ex, param, token, ec, bytes_out);
189  
    return do_write_some(h, ex, param, token, ec, bytes_out);
190  
}
190  
}
191  

191  

192  
inline void
192  
inline void
193  
epoll_tcp_socket::cancel() noexcept
193  
epoll_tcp_socket::cancel() noexcept
194  
{
194  
{
195  
    do_cancel();
195  
    do_cancel();
196  
}
196  
}
197  

197  

198  
inline void
198  
inline void
199  
epoll_tcp_socket::close_socket() noexcept
199  
epoll_tcp_socket::close_socket() noexcept
200  
{
200  
{
201  
    do_close_socket();
201  
    do_close_socket();
202  
}
202  
}
203  

203  

204  
inline std::error_code
204  
inline std::error_code
205  
epoll_tcp_service::open_socket(
205  
epoll_tcp_service::open_socket(
206  
    tcp_socket::implementation& impl, int family, int type, int protocol)
206  
    tcp_socket::implementation& impl, int family, int type, int protocol)
207  
{
207  
{
208  
    auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
208  
    auto* epoll_impl = static_cast<epoll_tcp_socket*>(&impl);
209  
    epoll_impl->close_socket();
209  
    epoll_impl->close_socket();
210  

210  

211  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
211  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
212  
    if (fd < 0)
212  
    if (fd < 0)
213  
        return make_err(errno);
213  
        return make_err(errno);
214  

214  

215  
    if (family == AF_INET6)
215  
    if (family == AF_INET6)
216  
    {
216  
    {
217  
        int one = 1;
217  
        int one = 1;
218  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
218  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
219  
    }
219  
    }
220  

220  

221  
    epoll_impl->fd_ = fd;
221  
    epoll_impl->fd_ = fd;
222  

222  

223  
    // Register fd with epoll (edge-triggered mode)
223  
    // Register fd with epoll (edge-triggered mode)
224  
    epoll_impl->desc_state_.fd = fd;
224  
    epoll_impl->desc_state_.fd = fd;
225  
    {
225  
    {
226  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
226  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
227  
        epoll_impl->desc_state_.read_op    = nullptr;
227  
        epoll_impl->desc_state_.read_op    = nullptr;
228  
        epoll_impl->desc_state_.write_op   = nullptr;
228  
        epoll_impl->desc_state_.write_op   = nullptr;
229  
        epoll_impl->desc_state_.connect_op = nullptr;
229  
        epoll_impl->desc_state_.connect_op = nullptr;
230  
    }
230  
    }
231  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
231  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
232  

232  

233  
    return {};
233  
    return {};
234  
}
234  
}
235  

235  

236  
} // namespace boost::corosio::detail
236  
} // namespace boost::corosio::detail
237  

237  

238  
#endif // BOOST_COROSIO_HAS_EPOLL
238  
#endif // BOOST_COROSIO_HAS_EPOLL
239  

239  

240  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP
240  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_TCP_SERVICE_HPP