1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/corosio/detail/tcp_service.hpp>
18  
#include <boost/corosio/detail/tcp_service.hpp>
19  

19  

20  
#include <boost/corosio/native/detail/select/select_tcp_socket.hpp>
20  
#include <boost/corosio/native/detail/select/select_tcp_socket.hpp>
21  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
21  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
22  
#include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
22  
#include <boost/corosio/native/detail/reactor/reactor_socket_service.hpp>
23  

23  

24  
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
24  
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25  

25  

26  
#include <coroutine>
26  
#include <coroutine>
27  
#include <mutex>
27  
#include <mutex>
28  

28  

29  
#include <errno.h>
29  
#include <errno.h>
30  
#include <fcntl.h>
30  
#include <fcntl.h>
31  
#include <netinet/in.h>
31  
#include <netinet/in.h>
32  
#include <netinet/tcp.h>
32  
#include <netinet/tcp.h>
33  
#include <sys/select.h>
33  
#include <sys/select.h>
34  
#include <sys/socket.h>
34  
#include <sys/socket.h>
35  
#include <unistd.h>
35  
#include <unistd.h>
36  

36  

37  
/*
37  
/*
38  
    Each I/O op tries the syscall speculatively; only registers with
38  
    Each I/O op tries the syscall speculatively; only registers with
39  
    the reactor on EAGAIN. Fd is registered once at open time and
39  
    the reactor on EAGAIN. Fd is registered once at open time and
40  
    stays registered until close. The reactor only marks ready_events_;
40  
    stays registered until close. The reactor only marks ready_events_;
41  
    actual I/O happens in invoke_deferred_io(). cancel() captures
41  
    actual I/O happens in invoke_deferred_io(). cancel() captures
42  
    shared_from_this() into op.impl_ptr to keep the impl alive.
42  
    shared_from_this() into op.impl_ptr to keep the impl alive.
43  
*/
43  
*/
44  

44  

45  
namespace boost::corosio::detail {
45  
namespace boost::corosio::detail {
46  

46  

47  
/** select TCP service implementation.
47  
/** select TCP service implementation.
48  

48  

49  
    Inherits from tcp_service to enable runtime polymorphism.
49  
    Inherits from tcp_service to enable runtime polymorphism.
50  
    Uses key_type = tcp_service for service lookup.
50  
    Uses key_type = tcp_service for service lookup.
51  
*/
51  
*/
52  
class BOOST_COROSIO_DECL select_tcp_service final
52  
class BOOST_COROSIO_DECL select_tcp_service final
53  
    : public reactor_socket_service<
53  
    : public reactor_socket_service<
54  
          select_tcp_service,
54  
          select_tcp_service,
55  
          tcp_service,
55  
          tcp_service,
56  
          select_scheduler,
56  
          select_scheduler,
57  
          select_tcp_socket>
57  
          select_tcp_socket>
58  
{
58  
{
59  
public:
59  
public:
60  
    explicit select_tcp_service(capy::execution_context& ctx)
60  
    explicit select_tcp_service(capy::execution_context& ctx)
61  
        : reactor_socket_service(ctx)
61  
        : reactor_socket_service(ctx)
62  
    {
62  
    {
63  
    }
63  
    }
64  

64  

65  
    std::error_code open_socket(
65  
    std::error_code open_socket(
66  
        tcp_socket::implementation& impl,
66  
        tcp_socket::implementation& impl,
67  
        int family,
67  
        int family,
68  
        int type,
68  
        int type,
69  
        int protocol) override;
69  
        int protocol) override;
70  
};
70  
};
71  

71  

72  
inline void
72  
inline void
73  
select_connect_op::cancel() noexcept
73  
select_connect_op::cancel() noexcept
74  
{
74  
{
75  
    if (socket_impl_)
75  
    if (socket_impl_)
76  
        socket_impl_->cancel_single_op(*this);
76  
        socket_impl_->cancel_single_op(*this);
77  
    else
77  
    else
78  
        request_cancel();
78  
        request_cancel();
79  
}
79  
}
80  

80  

81  
inline void
81  
inline void
82  
select_read_op::cancel() noexcept
82  
select_read_op::cancel() noexcept
83  
{
83  
{
84  
    if (socket_impl_)
84  
    if (socket_impl_)
85  
        socket_impl_->cancel_single_op(*this);
85  
        socket_impl_->cancel_single_op(*this);
86  
    else
86  
    else
87  
        request_cancel();
87  
        request_cancel();
88  
}
88  
}
89  

89  

90  
inline void
90  
inline void
91  
select_write_op::cancel() noexcept
91  
select_write_op::cancel() noexcept
92  
{
92  
{
93  
    if (socket_impl_)
93  
    if (socket_impl_)
94  
        socket_impl_->cancel_single_op(*this);
94  
        socket_impl_->cancel_single_op(*this);
95  
    else
95  
    else
96  
        request_cancel();
96  
        request_cancel();
97  
}
97  
}
98  

98  

99  
inline void
99  
inline void
100  
select_op::operator()()
100  
select_op::operator()()
101  
{
101  
{
102  
    complete_io_op(*this);
102  
    complete_io_op(*this);
103  
}
103  
}
104  

104  

105  
inline void
105  
inline void
106  
select_connect_op::operator()()
106  
select_connect_op::operator()()
107  
{
107  
{
108  
    complete_connect_op(*this);
108  
    complete_connect_op(*this);
109  
}
109  
}
110  

110  

111  
inline select_tcp_socket::select_tcp_socket(select_tcp_service& svc) noexcept
111  
inline select_tcp_socket::select_tcp_socket(select_tcp_service& svc) noexcept
112  
    : reactor_stream_socket(svc)
112  
    : reactor_stream_socket(svc)
113  
{
113  
{
114  
}
114  
}
115  

115  

116  
inline select_tcp_socket::~select_tcp_socket() = default;
116  
inline select_tcp_socket::~select_tcp_socket() = default;
117  

117  

118  
inline std::coroutine_handle<>
118  
inline std::coroutine_handle<>
119  
select_tcp_socket::connect(
119  
select_tcp_socket::connect(
120  
    std::coroutine_handle<> h,
120  
    std::coroutine_handle<> h,
121  
    capy::executor_ref ex,
121  
    capy::executor_ref ex,
122  
    endpoint ep,
122  
    endpoint ep,
123  
    std::stop_token token,
123  
    std::stop_token token,
124  
    std::error_code* ec)
124  
    std::error_code* ec)
125  
{
125  
{
126  
    auto result = do_connect(h, ex, ep, token, ec);
126  
    auto result = do_connect(h, ex, ep, token, ec);
127  
    // Rebuild fd_sets so select() watches for writability
127  
    // Rebuild fd_sets so select() watches for writability
128  
    if (result == std::noop_coroutine())
128  
    if (result == std::noop_coroutine())
129  
        svc_.scheduler().notify_reactor();
129  
        svc_.scheduler().notify_reactor();
130  
    return result;
130  
    return result;
131  
}
131  
}
132  

132  

133  
inline std::coroutine_handle<>
133  
inline std::coroutine_handle<>
134  
select_tcp_socket::read_some(
134  
select_tcp_socket::read_some(
135  
    std::coroutine_handle<> h,
135  
    std::coroutine_handle<> h,
136  
    capy::executor_ref ex,
136  
    capy::executor_ref ex,
137  
    buffer_param param,
137  
    buffer_param param,
138  
    std::stop_token token,
138  
    std::stop_token token,
139  
    std::error_code* ec,
139  
    std::error_code* ec,
140  
    std::size_t* bytes_out)
140  
    std::size_t* bytes_out)
141  
{
141  
{
142  
    return do_read_some(h, ex, param, token, ec, bytes_out);
142  
    return do_read_some(h, ex, param, token, ec, bytes_out);
143  
}
143  
}
144  

144  

145  
inline std::coroutine_handle<>
145  
inline std::coroutine_handle<>
146  
select_tcp_socket::write_some(
146  
select_tcp_socket::write_some(
147  
    std::coroutine_handle<> h,
147  
    std::coroutine_handle<> h,
148  
    capy::executor_ref ex,
148  
    capy::executor_ref ex,
149  
    buffer_param param,
149  
    buffer_param param,
150  
    std::stop_token token,
150  
    std::stop_token token,
151  
    std::error_code* ec,
151  
    std::error_code* ec,
152  
    std::size_t* bytes_out)
152  
    std::size_t* bytes_out)
153  
{
153  
{
154  
    auto result = do_write_some(h, ex, param, token, ec, bytes_out);
154  
    auto result = do_write_some(h, ex, param, token, ec, bytes_out);
155  
    // Rebuild fd_sets so select() watches for writability
155  
    // Rebuild fd_sets so select() watches for writability
156  
    if (result == std::noop_coroutine())
156  
    if (result == std::noop_coroutine())
157  
        svc_.scheduler().notify_reactor();
157  
        svc_.scheduler().notify_reactor();
158  
    return result;
158  
    return result;
159  
}
159  
}
160  

160  

161  
inline void
161  
inline void
162  
select_tcp_socket::cancel() noexcept
162  
select_tcp_socket::cancel() noexcept
163  
{
163  
{
164  
    do_cancel();
164  
    do_cancel();
165  
}
165  
}
166  

166  

167  
inline void
167  
inline void
168  
select_tcp_socket::close_socket() noexcept
168  
select_tcp_socket::close_socket() noexcept
169  
{
169  
{
170  
    do_close_socket();
170  
    do_close_socket();
171  
}
171  
}
172  

172  

173  
inline std::error_code
173  
inline std::error_code
174  
select_tcp_service::open_socket(
174  
select_tcp_service::open_socket(
175  
    tcp_socket::implementation& impl, int family, int type, int protocol)
175  
    tcp_socket::implementation& impl, int family, int type, int protocol)
176  
{
176  
{
177  
    auto* select_impl = static_cast<select_tcp_socket*>(&impl);
177  
    auto* select_impl = static_cast<select_tcp_socket*>(&impl);
178  
    select_impl->close_socket();
178  
    select_impl->close_socket();
179  

179  

180  
    int fd = ::socket(family, type, protocol);
180  
    int fd = ::socket(family, type, protocol);
181  
    if (fd < 0)
181  
    if (fd < 0)
182  
        return make_err(errno);
182  
        return make_err(errno);
183  

183  

184  
    if (family == AF_INET6)
184  
    if (family == AF_INET6)
185  
    {
185  
    {
186  
        int one = 1;
186  
        int one = 1;
187  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
187  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
188  
    }
188  
    }
189  

189  

190  
    int flags = ::fcntl(fd, F_GETFL, 0);
190  
    int flags = ::fcntl(fd, F_GETFL, 0);
191  
    if (flags == -1)
191  
    if (flags == -1)
192  
    {
192  
    {
193  
        int errn = errno;
193  
        int errn = errno;
194  
        ::close(fd);
194  
        ::close(fd);
195  
        return make_err(errn);
195  
        return make_err(errn);
196  
    }
196  
    }
197  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
197  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
198  
    {
198  
    {
199  
        int errn = errno;
199  
        int errn = errno;
200  
        ::close(fd);
200  
        ::close(fd);
201  
        return make_err(errn);
201  
        return make_err(errn);
202  
    }
202  
    }
203  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
203  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
204  
    {
204  
    {
205  
        int errn = errno;
205  
        int errn = errno;
206  
        ::close(fd);
206  
        ::close(fd);
207  
        return make_err(errn);
207  
        return make_err(errn);
208  
    }
208  
    }
209  

209  

210  
    if (fd >= FD_SETSIZE)
210  
    if (fd >= FD_SETSIZE)
211  
    {
211  
    {
212  
        ::close(fd);
212  
        ::close(fd);
213  
        return make_err(EMFILE);
213  
        return make_err(EMFILE);
214  
    }
214  
    }
215  

215  

216  
#ifdef SO_NOSIGPIPE
216  
#ifdef SO_NOSIGPIPE
217  
    {
217  
    {
218  
        int one = 1;
218  
        int one = 1;
219  
        ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
219  
        ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
220  
    }
220  
    }
221  
#endif
221  
#endif
222  

222  

223  
    select_impl->fd_ = fd;
223  
    select_impl->fd_ = fd;
224  

224  

225  
    select_impl->desc_state_.fd = fd;
225  
    select_impl->desc_state_.fd = fd;
226  
    {
226  
    {
227  
        std::lock_guard lock(select_impl->desc_state_.mutex);
227  
        std::lock_guard lock(select_impl->desc_state_.mutex);
228  
        select_impl->desc_state_.read_op    = nullptr;
228  
        select_impl->desc_state_.read_op    = nullptr;
229  
        select_impl->desc_state_.write_op   = nullptr;
229  
        select_impl->desc_state_.write_op   = nullptr;
230  
        select_impl->desc_state_.connect_op = nullptr;
230  
        select_impl->desc_state_.connect_op = nullptr;
231  
    }
231  
    }
232  
    scheduler().register_descriptor(fd, &select_impl->desc_state_);
232  
    scheduler().register_descriptor(fd, &select_impl->desc_state_);
233  

233  

234  
    return {};
234  
    return {};
235  
}
235  
}
236  

236  

237  
} // namespace boost::corosio::detail
237  
} // namespace boost::corosio::detail
238  

238  

239  
#endif // BOOST_COROSIO_HAS_SELECT
239  
#endif // BOOST_COROSIO_HAS_SELECT
240  

240  

241  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP
241  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_TCP_SERVICE_HPP