LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_basic_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.4 % 125 113 12
Test Date: 2026-03-17 15:52:49 Functions: 82.4 % 136 112 24

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/intrusive.hpp>
      14                 : #include <boost/corosio/detail/native_handle.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      17                 : #include <boost/corosio/native/detail/make_err.hpp>
      18                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      19                 : 
      20                 : #include <memory>
      21                 : #include <mutex>
      22                 : #include <utility>
      23                 : 
      24                 : #include <errno.h>
      25                 : #include <netinet/in.h>
      26                 : #include <sys/socket.h>
      27                 : #include <unistd.h>
      28                 : 
      29                 : namespace boost::corosio::detail {
      30                 : 
      31                 : /** CRTP base for reactor-backed socket implementations.
      32                 : 
      33                 :     Extracts the shared data members, virtual overrides, and
      34                 :     cancel/close/register logic that is identical across TCP
      35                 :     (reactor_stream_socket) and UDP (reactor_datagram_socket).
      36                 : 
      37                 :     Derived classes provide CRTP callbacks that enumerate their
      38                 :     specific op slots so cancel/close can iterate them generically.
      39                 : 
      40                 :     @tparam Derived   The concrete socket type (CRTP).
      41                 :     @tparam ImplBase  The public vtable base (tcp_socket::implementation
      42                 :                       or udp_socket::implementation).
      43                 :     @tparam Service   The backend's service type.
      44                 :     @tparam DescState The backend's descriptor_state type.
      45                 : */
      46                 : template<class Derived, class ImplBase, class Service, class DescState>
      47                 : class reactor_basic_socket
      48                 :     : public ImplBase
      49                 :     , public std::enable_shared_from_this<Derived>
      50                 :     , public intrusive_list<Derived>::node
      51                 : {
      52                 :     friend Derived;
      53                 : 
      54                 :     template<class, class, class, class, class, class>
      55                 :     friend class reactor_stream_socket;
      56                 : 
      57                 :     template<class, class, class, class, class, class, class, class>
      58                 :     friend class reactor_datagram_socket;
      59                 : 
      60 HIT       13621 :     explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {}
      61                 : 
      62                 : protected:
      63                 :     Service& svc_;
      64                 :     int fd_ = -1;
      65                 :     endpoint local_endpoint_;
      66                 : 
      67                 : public:
      68                 :     /// Per-descriptor state for persistent reactor registration.
      69                 :     DescState desc_state_;
      70                 : 
      71           13621 :     ~reactor_basic_socket() override = default;
      72                 : 
      73                 :     /// Return the underlying file descriptor.
      74           27936 :     native_handle_type native_handle() const noexcept override
      75                 :     {
      76           27936 :         return fd_;
      77                 :     }
      78                 : 
      79                 :     /// Return the cached local endpoint.
      80              68 :     endpoint local_endpoint() const noexcept override
      81                 :     {
      82              68 :         return local_endpoint_;
      83                 :     }
      84                 : 
      85                 :     /// Return true if the socket has an open file descriptor.
      86                 :     bool is_open() const noexcept
      87                 :     {
      88                 :         return fd_ >= 0;
      89                 :     }
      90                 : 
      91                 :     /// Set a socket option.
      92              80 :     std::error_code set_option(
      93                 :         int level,
      94                 :         int optname,
      95                 :         void const* data,
      96                 :         std::size_t size) noexcept override
      97                 :     {
      98              80 :         if (::setsockopt(
      99              80 :                 fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
     100 MIS           0 :             return make_err(errno);
     101 HIT          80 :         return {};
     102                 :     }
     103                 : 
     104                 :     /// Get a socket option.
     105                 :     std::error_code
     106              78 :     get_option(int level, int optname, void* data, std::size_t* size)
     107                 :         const noexcept override
     108                 :     {
     109              78 :         socklen_t len = static_cast<socklen_t>(*size);
     110              78 :         if (::getsockopt(fd_, level, optname, data, &len) != 0)
     111 MIS           0 :             return make_err(errno);
     112 HIT          78 :         *size = static_cast<std::size_t>(len);
     113              78 :         return {};
     114                 :     }
     115                 : 
     116                 :     /// Assign the file descriptor.
     117            4484 :     void set_socket(int fd) noexcept
     118                 :     {
     119            4484 :         fd_ = fd;
     120            4484 :     }
     121                 : 
     122                 :     /// Cache the local endpoint.
     123                 :     void set_local_endpoint(endpoint ep) noexcept
     124                 :     {
     125                 :         local_endpoint_ = ep;
     126                 :     }
     127                 : 
     128                 :     /** Bind the socket to a local endpoint.
     129                 : 
     130                 :         Calls ::bind() and caches the resulting local endpoint
     131                 :         via getsockname().
     132                 : 
     133                 :         @param ep The endpoint to bind to.
     134                 :         @return Error code on failure, empty on success.
     135                 :     */
     136              42 :     std::error_code do_bind(endpoint ep) noexcept
     137                 :     {
     138              42 :         sockaddr_storage storage{};
     139              42 :         socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
     140              42 :         if (::bind(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen) != 0)
     141 MIS           0 :             return make_err(errno);
     142                 : 
     143 HIT          42 :         sockaddr_storage local_storage{};
     144              42 :         socklen_t local_len = sizeof(local_storage);
     145              42 :         if (::getsockname(
     146              42 :                 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
     147                 :             0)
     148              42 :             local_endpoint_ = from_sockaddr(local_storage);
     149                 : 
     150              42 :         return {};
     151                 :     }
     152                 : 
     153                 :     /** Register an op with the reactor.
     154                 : 
     155                 :         Handles cached edge events and deferred cancellation.
     156                 :         Called on the EAGAIN/EINPROGRESS path when speculative
     157                 :         I/O failed.
     158                 :     */
     159                 :     template<class Op>
     160                 :     void register_op(
     161                 :         Op& op,
     162                 :         reactor_op_base*& desc_slot,
     163                 :         bool& ready_flag,
     164                 :         bool& cancel_flag) noexcept;
     165                 : 
     166                 :     /** Cancel a single pending operation.
     167                 : 
     168                 :         Claims the operation from its descriptor_state slot under
     169                 :         the mutex and posts it to the scheduler as cancelled.
     170                 :         Derived must implement:
     171                 :           op_to_desc_slot(Op&) -> reactor_op_base**
     172                 :           op_to_cancel_flag(Op&) -> bool*
     173                 :     */
     174                 :     template<class Op>
     175                 :     void cancel_single_op(Op& op) noexcept;
     176                 : 
     177                 :     /** Cancel all pending operations.
     178                 : 
     179                 :         Invoked by the derived class's cancel() override.
     180                 :         Derived must implement:
     181                 :           for_each_op(auto fn)
     182                 :           for_each_desc_entry(auto fn)
     183                 :     */
     184                 :     void do_cancel() noexcept;
     185                 : 
     186                 :     /** Close the socket and cancel pending operations.
     187                 : 
     188                 :         Invoked by the derived class's close_socket(). The
     189                 :         derived class may add backend-specific cleanup after
     190                 :         calling this method.
     191                 :         Derived must implement:
     192                 :           for_each_op(auto fn)
     193                 :           for_each_desc_entry(auto fn)
     194                 :     */
     195                 :     void do_close_socket() noexcept;
     196                 : };
     197                 : 
     198                 : template<class Derived, class ImplBase, class Service, class DescState>
     199                 : template<class Op>
     200                 : void
     201            4887 : reactor_basic_socket<Derived, ImplBase, Service, DescState>::register_op(
     202                 :     Op& op,
     203                 :     reactor_op_base*& desc_slot,
     204                 :     bool& ready_flag,
     205                 :     bool& cancel_flag) noexcept
     206                 : {
     207            4887 :     svc_.work_started();
     208                 : 
     209            4887 :     std::lock_guard lock(desc_state_.mutex);
     210            4887 :     bool io_done = false;
     211            4887 :     if (ready_flag)
     212                 :     {
     213             185 :         ready_flag = false;
     214             185 :         op.perform_io();
     215             185 :         io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     216             185 :         if (!io_done)
     217             185 :             op.errn = 0;
     218                 :     }
     219                 : 
     220            4887 :     if (cancel_flag)
     221                 :     {
     222 MIS           0 :         cancel_flag = false;
     223               0 :         op.cancelled.store(true, std::memory_order_relaxed);
     224                 :     }
     225                 : 
     226 HIT        4887 :     if (io_done || op.cancelled.load(std::memory_order_acquire))
     227                 :     {
     228 MIS           0 :         svc_.post(&op);
     229               0 :         svc_.work_finished();
     230                 :     }
     231                 :     else
     232                 :     {
     233 HIT        4887 :         desc_slot = &op;
     234                 :     }
     235            4887 : }
     236                 : 
     237                 : template<class Derived, class ImplBase, class Service, class DescState>
     238                 : template<class Op>
     239                 : void
     240             193 : reactor_basic_socket<Derived, ImplBase, Service, DescState>::cancel_single_op(
     241                 :     Op& op) noexcept
     242                 : {
     243             193 :     auto self = this->weak_from_this().lock();
     244             193 :     if (!self)
     245 MIS           0 :         return;
     246                 : 
     247 HIT         193 :     op.request_cancel();
     248                 : 
     249             193 :     auto* d                       = static_cast<Derived*>(this);
     250             193 :     reactor_op_base** desc_op_ptr = d->op_to_desc_slot(op);
     251                 : 
     252             193 :     if (desc_op_ptr)
     253                 :     {
     254             193 :         reactor_op_base* claimed = nullptr;
     255                 :         {
     256             193 :             std::lock_guard lock(desc_state_.mutex);
     257             193 :             if (*desc_op_ptr == &op)
     258             193 :                 claimed = std::exchange(*desc_op_ptr, nullptr);
     259                 :             else
     260                 :             {
     261 MIS           0 :                 bool* cflag = d->op_to_cancel_flag(op);
     262               0 :                 if (cflag)
     263               0 :                     *cflag = true;
     264                 :             }
     265 HIT         193 :         }
     266             193 :         if (claimed)
     267                 :         {
     268             193 :             op.impl_ptr = self;
     269             193 :             svc_.post(&op);
     270             193 :             svc_.work_finished();
     271                 :         }
     272                 :     }
     273             193 : }
     274                 : 
     275                 : template<class Derived, class ImplBase, class Service, class DescState>
     276                 : void
     277             189 : reactor_basic_socket<Derived, ImplBase, Service, DescState>::
     278                 :     do_cancel() noexcept
     279                 : {
     280             189 :     auto self = this->weak_from_this().lock();
     281             189 :     if (!self)
     282 MIS           0 :         return;
     283                 : 
     284 HIT         189 :     auto* d = static_cast<Derived*>(this);
     285                 : 
     286             764 :     d->for_each_op([](auto& op) { op.request_cancel(); });
     287                 : 
     288                 :     // Claim ops under a single lock acquisition
     289                 :     struct claimed_entry
     290                 :     {
     291                 :         reactor_op_base* op   = nullptr;
     292                 :         reactor_op_base* base = nullptr;
     293                 :     };
     294                 :     // Max 3 ops (conn, rd, wr)
     295             189 :     claimed_entry claimed[3];
     296             189 :     int count = 0;
     297                 : 
     298                 :     {
     299             189 :         std::lock_guard lock(desc_state_.mutex);
     300            1339 :         d->for_each_desc_entry([&](auto& op, reactor_op_base*& desc_slot) {
     301             575 :             if (desc_slot == &op)
     302                 :             {
     303             101 :                 claimed[count].op   = std::exchange(desc_slot, nullptr);
     304             101 :                 claimed[count].base = &op;
     305             101 :                 ++count;
     306                 :             }
     307                 :         });
     308             189 :     }
     309                 : 
     310             290 :     for (int i = 0; i < count; ++i)
     311                 :     {
     312             101 :         claimed[i].base->impl_ptr = self;
     313             101 :         svc_.post(claimed[i].base);
     314             101 :         svc_.work_finished();
     315                 :     }
     316             189 : }
     317                 : 
     318                 : template<class Derived, class ImplBase, class Service, class DescState>
     319                 : void
     320           40914 : reactor_basic_socket<Derived, ImplBase, Service, DescState>::
     321                 :     do_close_socket() noexcept
     322                 : {
     323           40914 :     auto self = this->weak_from_this().lock();
     324           40914 :     if (self)
     325                 :     {
     326           40914 :         auto* d = static_cast<Derived*>(this);
     327                 : 
     328          164264 :         d->for_each_op([](auto& op) { op.request_cancel(); });
     329                 : 
     330                 :         struct claimed_entry
     331                 :         {
     332                 :             reactor_op_base* base = nullptr;
     333                 :         };
     334           40914 :         claimed_entry claimed[3];
     335           40914 :         int count = 0;
     336                 : 
     337                 :         {
     338           40914 :             std::lock_guard lock(desc_state_.mutex);
     339           40914 :             d->for_each_desc_entry(
     340          246700 :                 [&](auto& /*op*/, reactor_op_base*& desc_slot) {
     341          123350 :                     auto* c = std::exchange(desc_slot, nullptr);
     342          123350 :                     if (c)
     343                 :                     {
     344               4 :                         claimed[count].base = c;
     345               4 :                         ++count;
     346                 :                     }
     347                 :                 });
     348           40914 :             desc_state_.read_ready             = false;
     349           40914 :             desc_state_.write_ready            = false;
     350           40914 :             desc_state_.read_cancel_pending    = false;
     351           40914 :             desc_state_.write_cancel_pending   = false;
     352           40914 :             desc_state_.connect_cancel_pending = false;
     353                 : 
     354           40914 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     355             217 :                 desc_state_.impl_ref_ = self;
     356           40914 :         }
     357                 : 
     358           40918 :         for (int i = 0; i < count; ++i)
     359                 :         {
     360               4 :             claimed[i].base->impl_ptr = self;
     361               4 :             svc_.post(claimed[i].base);
     362               4 :             svc_.work_finished();
     363                 :         }
     364                 :     }
     365                 : 
     366           40914 :     if (fd_ >= 0)
     367                 :     {
     368            9078 :         if (desc_state_.registered_events != 0)
     369            9078 :             svc_.scheduler().deregister_descriptor(fd_);
     370            9078 :         ::close(fd_);
     371            9078 :         fd_ = -1;
     372                 :     }
     373                 : 
     374           40914 :     desc_state_.fd                = -1;
     375           40914 :     desc_state_.registered_events = 0;
     376                 : 
     377           40914 :     local_endpoint_ = endpoint{};
     378           40914 : }
     379                 : 
     380                 : } // namespace boost::corosio::detail
     381                 : 
     382                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_BASIC_SOCKET_HPP
        

Generated by: LCOV version 2.3