LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_stream_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 75.1 % 193 145 48
Test Date: 2026-03-17 15:52:49 Functions: 70.0 % 40 28 12

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/tcp_socket.hpp>
      14                 : #include <boost/corosio/native/detail/reactor/reactor_basic_socket.hpp>
      15                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      16                 : #include <boost/capy/buffers.hpp>
      17                 : 
      18                 : #include <coroutine>
      19                 : 
      20                 : #include <errno.h>
      21                 : #include <sys/socket.h>
      22                 : #include <sys/uio.h>
      23                 : 
      24                 : namespace boost::corosio::detail {
      25                 : 
      26                 : /** CRTP base for reactor-backed stream socket implementations.
      27                 : 
      28                 :     Inherits shared data members and cancel/close/register logic
      29                 :     from reactor_basic_socket. Adds the TCP-specific remote
      30                 :     endpoint, shutdown, and I/O dispatch (connect, read, write).
      31                 : 
      32                 :     @tparam Derived   The concrete socket type (CRTP).
      33                 :     @tparam Service   The backend's socket service type.
      34                 :     @tparam ConnOp    The backend's connect op type.
      35                 :     @tparam ReadOp    The backend's read op type.
      36                 :     @tparam WriteOp   The backend's write op type.
      37                 :     @tparam DescState The backend's descriptor_state type.
      38                 : */
      39                 : template<
      40                 :     class Derived,
      41                 :     class Service,
      42                 :     class ConnOp,
      43                 :     class ReadOp,
      44                 :     class WriteOp,
      45                 :     class DescState>
      46                 : class reactor_stream_socket
      47                 :     : public reactor_basic_socket<
      48                 :           Derived,
      49                 :           tcp_socket::implementation,
      50                 :           Service,
      51                 :           DescState>
      52                 : {
      53                 :     using base_type = reactor_basic_socket<
      54                 :         Derived,
      55                 :         tcp_socket::implementation,
      56                 :         Service,
      57                 :         DescState>;
      58                 :     friend base_type;
      59                 :     friend Derived;
      60                 : 
      61 HIT       13543 :     explicit reactor_stream_socket(Service& svc) noexcept : base_type(svc) {}
      62                 : 
      63                 : protected:
      64                 :     endpoint remote_endpoint_;
      65                 : 
      66                 : public:
      67                 :     /// Pending connect operation slot.
      68                 :     ConnOp conn_;
      69                 : 
      70                 :     /// Pending read operation slot.
      71                 :     ReadOp rd_;
      72                 : 
      73                 :     /// Pending write operation slot.
      74                 :     WriteOp wr_;
      75                 : 
      76           13543 :     ~reactor_stream_socket() override = default;
      77                 : 
      78                 :     /// Return the cached remote endpoint.
      79              42 :     endpoint remote_endpoint() const noexcept override
      80                 :     {
      81              42 :         return remote_endpoint_;
      82                 :     }
      83                 : 
      84                 :     /// Shut down part or all of the full-duplex connection.
      85               6 :     std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override
      86                 :     {
      87                 :         int how;
      88               6 :         switch (what)
      89                 :         {
      90               2 :         case tcp_socket::shutdown_receive:
      91               2 :             how = SHUT_RD;
      92               2 :             break;
      93               2 :         case tcp_socket::shutdown_send:
      94               2 :             how = SHUT_WR;
      95               2 :             break;
      96               2 :         case tcp_socket::shutdown_both:
      97               2 :             how = SHUT_RDWR;
      98               2 :             break;
      99 MIS           0 :         default:
     100               0 :             return make_err(EINVAL);
     101                 :         }
     102 HIT           6 :         if (::shutdown(this->fd_, how) != 0)
     103 MIS           0 :             return make_err(errno);
     104 HIT           6 :         return {};
     105                 :     }
     106                 : 
     107                 :     /// Cache local and remote endpoints.
     108            8968 :     void set_endpoints(endpoint local, endpoint remote) noexcept
     109                 :     {
     110            8968 :         this->local_endpoint_ = local;
     111            8968 :         remote_endpoint_      = remote;
     112            8968 :     }
     113                 : 
     114                 :     /** Shared connect dispatch.
     115                 : 
     116                 :         Tries the connect syscall speculatively. On synchronous
     117                 :         completion, returns via inline budget or posts through queue.
     118                 :         On EINPROGRESS, registers with the reactor.
     119                 :     */
     120                 :     std::coroutine_handle<> do_connect(
     121                 :         std::coroutine_handle<>,
     122                 :         capy::executor_ref,
     123                 :         endpoint,
     124                 :         std::stop_token const&,
     125                 :         std::error_code*);
     126                 : 
     127                 :     /** Shared scatter-read dispatch.
     128                 : 
     129                 :         Tries readv() speculatively. On success or hard error,
     130                 :         returns via inline budget or posts through queue.
     131                 :         On EAGAIN, registers with the reactor.
     132                 :     */
     133                 :     std::coroutine_handle<> do_read_some(
     134                 :         std::coroutine_handle<>,
     135                 :         capy::executor_ref,
     136                 :         buffer_param,
     137                 :         std::stop_token const&,
     138                 :         std::error_code*,
     139                 :         std::size_t*);
     140                 : 
     141                 :     /** Shared gather-write dispatch.
     142                 : 
     143                 :         Tries the write via WriteOp::write_policy speculatively.
     144                 :         On success or hard error, returns via inline budget or
     145                 :         posts through queue. On EAGAIN, registers with the reactor.
     146                 :     */
     147                 :     std::coroutine_handle<> do_write_some(
     148                 :         std::coroutine_handle<>,
     149                 :         capy::executor_ref,
     150                 :         buffer_param,
     151                 :         std::stop_token const&,
     152                 :         std::error_code*,
     153                 :         std::size_t*);
     154                 : 
     155                 :     /** Close the socket and cancel pending operations.
     156                 : 
     157                 :         Extends the base do_close_socket() to also reset
     158                 :         the remote endpoint.
     159                 :     */
     160           40610 :     void do_close_socket() noexcept
     161                 :     {
     162           40610 :         base_type::do_close_socket();
     163           40610 :         remote_endpoint_ = endpoint{};
     164           40610 :     }
     165                 : 
     166                 : private:
     167                 :     // CRTP callbacks for reactor_basic_socket cancel/close
     168                 : 
     169                 :     template<class Op>
     170             191 :     reactor_op_base** op_to_desc_slot(Op& op) noexcept
     171                 :     {
     172             191 :         if (&op == static_cast<void*>(&conn_))
     173 MIS           0 :             return &this->desc_state_.connect_op;
     174 HIT         191 :         if (&op == static_cast<void*>(&rd_))
     175             191 :             return &this->desc_state_.read_op;
     176 MIS           0 :         if (&op == static_cast<void*>(&wr_))
     177               0 :             return &this->desc_state_.write_op;
     178               0 :         return nullptr;
     179                 :     }
     180                 : 
     181                 :     template<class Op>
     182               0 :     bool* op_to_cancel_flag(Op& op) noexcept
     183                 :     {
     184               0 :         if (&op == static_cast<void*>(&conn_))
     185               0 :             return &this->desc_state_.connect_cancel_pending;
     186               0 :         if (&op == static_cast<void*>(&rd_))
     187               0 :             return &this->desc_state_.read_cancel_pending;
     188               0 :         if (&op == static_cast<void*>(&wr_))
     189               0 :             return &this->desc_state_.write_cancel_pending;
     190               0 :         return nullptr;
     191                 :     }
     192                 : 
     193                 :     template<class Fn>
     194 HIT       40795 :     void for_each_op(Fn fn) noexcept
     195                 :     {
     196           40795 :         fn(conn_);
     197           40795 :         fn(rd_);
     198           40795 :         fn(wr_);
     199           40795 :     }
     200                 : 
     201                 :     template<class Fn>
     202           40795 :     void for_each_desc_entry(Fn fn) noexcept
     203                 :     {
     204           40795 :         fn(conn_, this->desc_state_.connect_op);
     205           40795 :         fn(rd_, this->desc_state_.read_op);
     206           40795 :         fn(wr_, this->desc_state_.write_op);
     207           40795 :     }
     208                 : };
     209                 : 
     210                 : template<
     211                 :     class Derived,
     212                 :     class Service,
     213                 :     class ConnOp,
     214                 :     class ReadOp,
     215                 :     class WriteOp,
     216                 :     class DescState>
     217                 : std::coroutine_handle<>
     218            4490 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState>::
     219                 :     do_connect(
     220                 :         std::coroutine_handle<> h,
     221                 :         capy::executor_ref ex,
     222                 :         endpoint ep,
     223                 :         std::stop_token const& token,
     224                 :         std::error_code* ec)
     225                 : {
     226            4490 :     auto& op = conn_;
     227                 : 
     228            4490 :     sockaddr_storage storage{};
     229            4490 :     socklen_t addrlen = to_sockaddr(ep, socket_family(this->fd_), storage);
     230                 :     int result =
     231            4490 :         ::connect(this->fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
     232                 : 
     233            4490 :     if (result == 0)
     234                 :     {
     235 MIS           0 :         sockaddr_storage local_storage{};
     236               0 :         socklen_t local_len = sizeof(local_storage);
     237               0 :         if (::getsockname(
     238                 :                 this->fd_, reinterpret_cast<sockaddr*>(&local_storage),
     239               0 :                 &local_len) == 0)
     240               0 :             this->local_endpoint_ = from_sockaddr(local_storage);
     241               0 :         remote_endpoint_ = ep;
     242                 :     }
     243                 : 
     244 HIT        4490 :     if (result == 0 || errno != EINPROGRESS)
     245                 :     {
     246 MIS           0 :         int err = (result < 0) ? errno : 0;
     247               0 :         if (this->svc_.scheduler().try_consume_inline_budget())
     248                 :         {
     249               0 :             *ec = err ? make_err(err) : std::error_code{};
     250               0 :             return dispatch_coro(ex, h);
     251                 :         }
     252               0 :         op.reset();
     253               0 :         op.h               = h;
     254               0 :         op.ex              = ex;
     255               0 :         op.ec_out          = ec;
     256               0 :         op.fd              = this->fd_;
     257               0 :         op.target_endpoint = ep;
     258               0 :         op.start(token, static_cast<Derived*>(this));
     259               0 :         op.impl_ptr = this->shared_from_this();
     260               0 :         op.complete(err, 0);
     261               0 :         this->svc_.post(&op);
     262               0 :         return std::noop_coroutine();
     263                 :     }
     264                 : 
     265                 :     // EINPROGRESS — register with reactor
     266 HIT        4490 :     op.reset();
     267            4490 :     op.h               = h;
     268            4490 :     op.ex              = ex;
     269            4490 :     op.ec_out          = ec;
     270            4490 :     op.fd              = this->fd_;
     271            4490 :     op.target_endpoint = ep;
     272            4490 :     op.start(token, static_cast<Derived*>(this));
     273            4490 :     op.impl_ptr = this->shared_from_this();
     274                 : 
     275            4490 :     this->register_op(
     276            4490 :         op, this->desc_state_.connect_op, this->desc_state_.write_ready,
     277            4490 :         this->desc_state_.connect_cancel_pending);
     278            4490 :     return std::noop_coroutine();
     279                 : }
     280                 : 
     281                 : template<
     282                 :     class Derived,
     283                 :     class Service,
     284                 :     class ConnOp,
     285                 :     class ReadOp,
     286                 :     class WriteOp,
     287                 :     class DescState>
     288                 : std::coroutine_handle<>
     289          221610 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState>::
     290                 :     do_read_some(
     291                 :         std::coroutine_handle<> h,
     292                 :         capy::executor_ref ex,
     293                 :         buffer_param param,
     294                 :         std::stop_token const& token,
     295                 :         std::error_code* ec,
     296                 :         std::size_t* bytes_out)
     297                 : {
     298          221610 :     auto& op = rd_;
     299          221610 :     op.reset();
     300                 : 
     301          221610 :     capy::mutable_buffer bufs[ReadOp::max_buffers];
     302          221610 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
     303                 : 
     304          221610 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     305                 :     {
     306               2 :         op.empty_buffer_read = true;
     307               2 :         op.h                 = h;
     308               2 :         op.ex                = ex;
     309               2 :         op.ec_out            = ec;
     310               2 :         op.bytes_out         = bytes_out;
     311               2 :         op.start(token, static_cast<Derived*>(this));
     312               2 :         op.impl_ptr = this->shared_from_this();
     313               2 :         op.complete(0, 0);
     314               2 :         this->svc_.post(&op);
     315               2 :         return std::noop_coroutine();
     316                 :     }
     317                 : 
     318          443216 :     for (int i = 0; i < op.iovec_count; ++i)
     319                 :     {
     320          221608 :         op.iovecs[i].iov_base = bufs[i].data();
     321          221608 :         op.iovecs[i].iov_len  = bufs[i].size();
     322                 :     }
     323                 : 
     324                 :     // Speculative read
     325                 :     ssize_t n;
     326                 :     do
     327                 :     {
     328          221608 :         n = ::readv(this->fd_, op.iovecs, op.iovec_count);
     329                 :     }
     330          221608 :     while (n < 0 && errno == EINTR);
     331                 : 
     332          221608 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     333                 :     {
     334          221221 :         int err    = (n < 0) ? errno : 0;
     335          221221 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     336                 : 
     337          221221 :         if (this->svc_.scheduler().try_consume_inline_budget())
     338                 :         {
     339          177011 :             if (err)
     340 MIS           0 :                 *ec = make_err(err);
     341 HIT      177011 :             else if (n == 0)
     342              10 :                 *ec = capy::error::eof;
     343                 :             else
     344          177001 :                 *ec = {};
     345          177011 :             *bytes_out = bytes;
     346          177011 :             return dispatch_coro(ex, h);
     347                 :         }
     348           44210 :         op.h         = h;
     349           44210 :         op.ex        = ex;
     350           44210 :         op.ec_out    = ec;
     351           44210 :         op.bytes_out = bytes_out;
     352           44210 :         op.start(token, static_cast<Derived*>(this));
     353           44210 :         op.impl_ptr = this->shared_from_this();
     354           44210 :         op.complete(err, bytes);
     355           44210 :         this->svc_.post(&op);
     356           44210 :         return std::noop_coroutine();
     357                 :     }
     358                 : 
     359                 :     // EAGAIN — register with reactor
     360             387 :     op.h         = h;
     361             387 :     op.ex        = ex;
     362             387 :     op.ec_out    = ec;
     363             387 :     op.bytes_out = bytes_out;
     364             387 :     op.fd        = this->fd_;
     365             387 :     op.start(token, static_cast<Derived*>(this));
     366             387 :     op.impl_ptr = this->shared_from_this();
     367                 : 
     368             387 :     this->register_op(
     369             387 :         op, this->desc_state_.read_op, this->desc_state_.read_ready,
     370             387 :         this->desc_state_.read_cancel_pending);
     371             387 :     return std::noop_coroutine();
     372                 : }
     373                 : 
     374                 : template<
     375                 :     class Derived,
     376                 :     class Service,
     377                 :     class ConnOp,
     378                 :     class ReadOp,
     379                 :     class WriteOp,
     380                 :     class DescState>
     381                 : std::coroutine_handle<>
     382          221314 : reactor_stream_socket<Derived, Service, ConnOp, ReadOp, WriteOp, DescState>::
     383                 :     do_write_some(
     384                 :         std::coroutine_handle<> h,
     385                 :         capy::executor_ref ex,
     386                 :         buffer_param param,
     387                 :         std::stop_token const& token,
     388                 :         std::error_code* ec,
     389                 :         std::size_t* bytes_out)
     390                 : {
     391          221314 :     auto& op = wr_;
     392          221314 :     op.reset();
     393                 : 
     394          221314 :     capy::mutable_buffer bufs[WriteOp::max_buffers];
     395          221314 :     op.iovec_count =
     396          221314 :         static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
     397                 : 
     398          221314 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     399                 :     {
     400               2 :         op.h         = h;
     401               2 :         op.ex        = ex;
     402               2 :         op.ec_out    = ec;
     403               2 :         op.bytes_out = bytes_out;
     404               2 :         op.start(token, static_cast<Derived*>(this));
     405               2 :         op.impl_ptr = this->shared_from_this();
     406               2 :         op.complete(0, 0);
     407               2 :         this->svc_.post(&op);
     408               2 :         return std::noop_coroutine();
     409                 :     }
     410                 : 
     411          442624 :     for (int i = 0; i < op.iovec_count; ++i)
     412                 :     {
     413          221312 :         op.iovecs[i].iov_base = bufs[i].data();
     414          221312 :         op.iovecs[i].iov_len  = bufs[i].size();
     415                 :     }
     416                 : 
     417                 :     // Speculative write via backend-specific write policy
     418                 :     ssize_t n =
     419          221312 :         WriteOp::write_policy::write(this->fd_, op.iovecs, op.iovec_count);
     420                 : 
     421          221312 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     422                 :     {
     423          221312 :         int err    = (n < 0) ? errno : 0;
     424          221312 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     425                 : 
     426          221312 :         if (this->svc_.scheduler().try_consume_inline_budget())
     427                 :         {
     428          177067 :             *ec        = err ? make_err(err) : std::error_code{};
     429          177067 :             *bytes_out = bytes;
     430          177067 :             return dispatch_coro(ex, h);
     431                 :         }
     432           44245 :         op.h         = h;
     433           44245 :         op.ex        = ex;
     434           44245 :         op.ec_out    = ec;
     435           44245 :         op.bytes_out = bytes_out;
     436           44245 :         op.start(token, static_cast<Derived*>(this));
     437           44245 :         op.impl_ptr = this->shared_from_this();
     438           44245 :         op.complete(err, bytes);
     439           44245 :         this->svc_.post(&op);
     440           44245 :         return std::noop_coroutine();
     441                 :     }
     442                 : 
     443                 :     // EAGAIN — register with reactor
     444 MIS           0 :     op.h         = h;
     445               0 :     op.ex        = ex;
     446               0 :     op.ec_out    = ec;
     447               0 :     op.bytes_out = bytes_out;
     448               0 :     op.fd        = this->fd_;
     449               0 :     op.start(token, static_cast<Derived*>(this));
     450               0 :     op.impl_ptr = this->shared_from_this();
     451                 : 
     452               0 :     this->register_op(
     453               0 :         op, this->desc_state_.write_op, this->desc_state_.write_ready,
     454               0 :         this->desc_state_.write_cancel_pending);
     455               0 :     return std::noop_coroutine();
     456                 : }
     457                 : 
     458                 : } // namespace boost::corosio::detail
     459                 : 
     460                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP
        

Generated by: LCOV version 2.3