LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 68.2 % 154 105 49
Test Date: 2026-03-17 15:52:49 Functions: 76.5 % 68 52 16

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/io/io_object.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/capy/ex/executor_ref.hpp>
      17                 : 
      18                 : #include <atomic>
      19                 : #include <coroutine>
      20                 : #include <cstddef>
      21                 : #include <memory>
      22                 : #include <optional>
      23                 : #include <stop_token>
      24                 : #include <system_error>
      25                 : 
      26                 : #include <errno.h>
      27                 : 
      28                 : #include <netinet/in.h>
      29                 : #include <sys/socket.h>
      30                 : #include <sys/uio.h>
      31                 : 
      32                 : namespace boost::corosio::detail {
      33                 : 
      34                 : /** Base operation for reactor-based backends.
      35                 : 
      36                 :     Holds per-operation state that depends on the concrete backend
      37                 :     socket/acceptor types: coroutine handle, executor, output
      38                 :     pointers, file descriptor, stop_callback, and type-specific
      39                 :     impl pointers.
      40                 : 
      41                 :     Fields shared across all backends (errn, bytes_transferred,
      42                 :     cancelled, impl_ptr, perform_io, complete) live in
      43                 :     reactor_op_base so the scheduler and descriptor_state can
      44                 :     access them without template instantiation.
      45                 : 
      46                 :     @tparam Socket The backend socket impl type (forward-declared).
      47                 :     @tparam Acceptor The backend acceptor impl type (forward-declared).
      48                 : */
      49                 : template<class Socket, class Acceptor>
      50                 : struct reactor_op : reactor_op_base
      51                 : {
      52                 :     /// Stop-token callback that invokes cancel() on the target op.
      53                 :     struct canceller
      54                 :     {
      55                 :         reactor_op* op;
      56 HIT         199 :         void operator()() const noexcept
      57                 :         {
      58             199 :             op->cancel();
      59             199 :         }
      60                 :     };
      61                 : 
      62                 :     /// Caller's coroutine handle to resume on completion.
      63                 :     std::coroutine_handle<> h;
      64                 : 
      65                 :     /// Executor for dispatching the completion.
      66                 :     capy::executor_ref ex;
      67                 : 
      68                 :     /// Output pointer for the error code.
      69                 :     std::error_code* ec_out = nullptr;
      70                 : 
      71                 :     /// Output pointer for bytes transferred.
      72                 :     std::size_t* bytes_out = nullptr;
      73                 : 
      74                 :     /// File descriptor this operation targets.
      75                 :     int fd = -1;
      76                 : 
      77                 :     /// Stop-token callback registration.
      78                 :     std::optional<std::stop_callback<canceller>> stop_cb;
      79                 : 
      80                 :     /// Owning socket impl (for stop_token cancellation).
      81                 :     Socket* socket_impl_ = nullptr;
      82                 : 
      83                 :     /// Owning acceptor impl (for stop_token cancellation).
      84                 :     Acceptor* acceptor_impl_ = nullptr;
      85                 : 
      86           41160 :     reactor_op() = default;
      87                 : 
      88                 :     /// Reset operation state for reuse.
      89          451984 :     void reset() noexcept
      90                 :     {
      91          451984 :         fd                = -1;
      92          451984 :         errn              = 0;
      93          451984 :         bytes_transferred = 0;
      94          451984 :         cancelled.store(false, std::memory_order_relaxed);
      95          451984 :         impl_ptr.reset();
      96          451984 :         socket_impl_   = nullptr;
      97          451984 :         acceptor_impl_ = nullptr;
      98          451984 :     }
      99                 : 
     100                 :     /// Return true if this is a read-direction operation.
     101           44255 :     virtual bool is_read_operation() const noexcept
     102                 :     {
     103           44255 :         return false;
     104                 :     }
     105                 : 
     106                 :     /// Cancel this operation via the owning impl.
     107                 :     virtual void cancel() noexcept = 0;
     108                 : 
     109                 :     /// Destroy without invoking.
     110 MIS           0 :     void destroy() override
     111                 :     {
     112               0 :         stop_cb.reset();
     113               0 :         reactor_op_base::destroy();
     114               0 :     }
     115                 : 
     116                 :     /// Arm the stop-token callback for a socket operation.
     117 HIT       93380 :     void start(std::stop_token const& token, Socket* impl)
     118                 :     {
     119           93380 :         cancelled.store(false, std::memory_order_release);
     120           93380 :         stop_cb.reset();
     121           93380 :         socket_impl_   = impl;
     122           93380 :         acceptor_impl_ = nullptr;
     123                 : 
     124           93380 :         if (token.stop_possible())
     125             197 :             stop_cb.emplace(token, canceller{this});
     126           93380 :     }
     127                 : 
     128                 :     /// Arm the stop-token callback for an acceptor operation.
     129            4496 :     void start(std::stop_token const& token, Acceptor* impl)
     130                 :     {
     131            4496 :         cancelled.store(false, std::memory_order_release);
     132            4496 :         stop_cb.reset();
     133            4496 :         socket_impl_   = nullptr;
     134            4496 :         acceptor_impl_ = impl;
     135                 : 
     136            4496 :         if (token.stop_possible())
     137               9 :             stop_cb.emplace(token, canceller{this});
     138            4496 :     }
     139                 : };
     140                 : 
     141                 : /** Shared connect operation.
     142                 : 
     143                 :     Checks SO_ERROR for connect completion status. The operator()()
     144                 :     and cancel() are provided by the concrete backend type.
     145                 : 
     146                 :     @tparam Base The backend's base op type.
     147                 : */
     148                 : template<class Base>
     149                 : struct reactor_connect_op : Base
     150                 : {
     151                 :     /// Endpoint to connect to.
     152                 :     endpoint target_endpoint;
     153                 : 
     154                 :     /// Reset operation state for reuse.
     155            4500 :     void reset() noexcept
     156                 :     {
     157            4500 :         Base::reset();
     158            4500 :         target_endpoint = endpoint{};
     159            4500 :     }
     160                 : 
     161            4488 :     void perform_io() noexcept override
     162                 :     {
     163            4488 :         int err       = 0;
     164            4488 :         socklen_t len = sizeof(err);
     165            4488 :         if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     166 MIS           0 :             err = errno;
     167 HIT        4488 :         this->complete(err, 0);
     168            4488 :     }
     169                 : };
     170                 : 
     171                 : /** Shared scatter-read operation.
     172                 : 
     173                 :     Uses readv() with an EINTR retry loop.
     174                 : 
     175                 :     @tparam Base The backend's base op type.
     176                 : */
     177                 : template<class Base>
     178                 : struct reactor_read_op : Base
     179                 : {
     180                 :     /// Maximum scatter-gather buffer count.
     181                 :     static constexpr std::size_t max_buffers = 16;
     182                 : 
     183                 :     /// Scatter-gather I/O vectors.
     184                 :     iovec iovecs[max_buffers];
     185                 : 
     186                 :     /// Number of active I/O vectors.
     187                 :     int iovec_count = 0;
     188                 : 
     189                 :     /// True for zero-length reads (completed immediately).
     190                 :     bool empty_buffer_read = false;
     191                 : 
     192                 :     /// Return true (this is a read-direction operation).
     193           44301 :     bool is_read_operation() const noexcept override
     194                 :     {
     195           44301 :         return !empty_buffer_read;
     196                 :     }
     197                 : 
     198          221610 :     void reset() noexcept
     199                 :     {
     200          221610 :         Base::reset();
     201          221610 :         iovec_count       = 0;
     202          221610 :         empty_buffer_read = false;
     203          221610 :     }
     204                 : 
     205             325 :     void perform_io() noexcept override
     206                 :     {
     207                 :         ssize_t n;
     208                 :         do
     209                 :         {
     210             325 :             n = ::readv(this->fd, iovecs, iovec_count);
     211                 :         }
     212             325 :         while (n < 0 && errno == EINTR);
     213                 : 
     214             325 :         if (n >= 0)
     215              97 :             this->complete(0, static_cast<std::size_t>(n));
     216                 :         else
     217             228 :             this->complete(errno, 0);
     218             325 :     }
     219                 : };
     220                 : 
     221                 : /** Shared gather-write operation.
     222                 : 
     223                 :     Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
     224                 :     which returns ssize_t (bytes written or -1 with errno set).
     225                 : 
     226                 :     @tparam Base The backend's base op type.
     227                 :     @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
     228                 : */
     229                 : template<class Base, class WritePolicy>
     230                 : struct reactor_write_op : Base
     231                 : {
     232                 :     /// The write syscall policy type.
     233                 :     using write_policy = WritePolicy;
     234                 : 
     235                 :     /// Maximum scatter-gather buffer count.
     236                 :     static constexpr std::size_t max_buffers = 16;
     237                 : 
     238                 :     /// Scatter-gather I/O vectors.
     239                 :     iovec iovecs[max_buffers];
     240                 : 
     241                 :     /// Number of active I/O vectors.
     242                 :     int iovec_count = 0;
     243                 : 
     244          221314 :     void reset() noexcept
     245                 :     {
     246          221314 :         Base::reset();
     247          221314 :         iovec_count = 0;
     248          221314 :     }
     249                 : 
     250 MIS           0 :     void perform_io() noexcept override
     251                 :     {
     252               0 :         ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
     253               0 :         if (n >= 0)
     254               0 :             this->complete(0, static_cast<std::size_t>(n));
     255                 :         else
     256               0 :             this->complete(errno, 0);
     257               0 :     }
     258                 : };
     259                 : 
     260                 : /** Shared accept operation.
     261                 : 
     262                 :     Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
     263                 :     which returns the accepted fd or -1 with errno set.
     264                 : 
     265                 :     @tparam Base The backend's base op type.
     266                 :     @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
     267                 : */
     268                 : template<class Base, class AcceptPolicy>
     269                 : struct reactor_accept_op : Base
     270                 : {
     271                 :     /// File descriptor of the accepted connection.
     272                 :     int accepted_fd = -1;
     273                 : 
     274                 :     /// Pointer to the peer socket implementation.
     275                 :     io_object::implementation* peer_impl = nullptr;
     276                 : 
     277                 :     /// Output pointer for the accepted implementation.
     278                 :     io_object::implementation** impl_out = nullptr;
     279                 : 
     280                 :     /// Peer address storage filled by accept.
     281                 :     sockaddr_storage peer_storage{};
     282                 : 
     283 HIT        4496 :     void reset() noexcept
     284                 :     {
     285            4496 :         Base::reset();
     286            4496 :         accepted_fd  = -1;
     287            4496 :         peer_impl    = nullptr;
     288            4496 :         impl_out     = nullptr;
     289            4496 :         peer_storage = {};
     290            4496 :     }
     291                 : 
     292            4480 :     void perform_io() noexcept override
     293                 :     {
     294            4480 :         int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
     295            4480 :         if (new_fd >= 0)
     296                 :         {
     297            4480 :             accepted_fd = new_fd;
     298            4480 :             this->complete(0, 0);
     299                 :         }
     300                 :         else
     301                 :         {
     302 MIS           0 :             this->complete(errno, 0);
     303                 :         }
     304 HIT        4480 :     }
     305                 : };
     306                 : 
     307                 : /** Shared connected send operation for datagram sockets.
     308                 : 
     309                 :     Uses sendmsg() with msg_name=nullptr (connected mode).
     310                 : 
     311                 :     @tparam Base The backend's base op type.
     312                 : */
     313                 : template<class Base>
     314                 : struct reactor_send_op : Base
     315                 : {
     316                 :     /// Maximum scatter-gather buffer count.
     317                 :     static constexpr std::size_t max_buffers = 16;
     318                 : 
     319                 :     /// Scatter-gather I/O vectors.
     320                 :     iovec iovecs[max_buffers];
     321                 : 
     322                 :     /// Number of active I/O vectors.
     323                 :     int iovec_count = 0;
     324                 : 
     325               6 :     void reset() noexcept
     326                 :     {
     327               6 :         Base::reset();
     328               6 :         iovec_count = 0;
     329               6 :     }
     330                 : 
     331 MIS           0 :     void perform_io() noexcept override
     332                 :     {
     333               0 :         msghdr msg{};
     334               0 :         msg.msg_iov    = iovecs;
     335               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     336                 : 
     337                 : #ifdef MSG_NOSIGNAL
     338               0 :         constexpr int send_flags = MSG_NOSIGNAL;
     339                 : #else
     340                 :         constexpr int send_flags = 0;
     341                 : #endif
     342                 : 
     343                 :         ssize_t n;
     344                 :         do
     345                 :         {
     346               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     347                 :         }
     348               0 :         while (n < 0 && errno == EINTR);
     349                 : 
     350               0 :         if (n >= 0)
     351               0 :             this->complete(0, static_cast<std::size_t>(n));
     352                 :         else
     353               0 :             this->complete(errno, 0);
     354               0 :     }
     355                 : };
     356                 : 
     357                 : /** Shared connected recv operation for datagram sockets.
     358                 : 
     359                 :     Uses recvmsg() with msg_name=nullptr (connected mode).
     360                 :     Unlike reactor_read_op, does not map n==0 to EOF
     361                 :     (zero-length datagrams are valid).
     362                 : 
     363                 :     @tparam Base The backend's base op type.
     364                 : */
     365                 : template<class Base>
     366                 : struct reactor_recv_op : Base
     367                 : {
     368                 :     /// Maximum scatter-gather buffer count.
     369                 :     static constexpr std::size_t max_buffers = 16;
     370                 : 
     371                 :     /// Scatter-gather I/O vectors.
     372                 :     iovec iovecs[max_buffers];
     373                 : 
     374                 :     /// Number of active I/O vectors.
     375                 :     int iovec_count = 0;
     376                 : 
     377                 :     /// Return true (this is a read-direction operation).
     378 HIT           2 :     bool is_read_operation() const noexcept override
     379                 :     {
     380               2 :         return true;
     381                 :     }
     382                 : 
     383               4 :     void reset() noexcept
     384                 :     {
     385               4 :         Base::reset();
     386               4 :         iovec_count = 0;
     387               4 :     }
     388                 : 
     389 MIS           0 :     void perform_io() noexcept override
     390                 :     {
     391               0 :         msghdr msg{};
     392               0 :         msg.msg_iov    = iovecs;
     393               0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     394                 : 
     395                 :         ssize_t n;
     396                 :         do
     397                 :         {
     398               0 :             n = ::recvmsg(this->fd, &msg, 0);
     399                 :         }
     400               0 :         while (n < 0 && errno == EINTR);
     401                 : 
     402               0 :         if (n >= 0)
     403               0 :             this->complete(0, static_cast<std::size_t>(n));
     404                 :         else
     405               0 :             this->complete(errno, 0);
     406               0 :     }
     407                 : };
     408                 : 
     409                 : /** Shared send_to operation for datagram sockets.
     410                 : 
     411                 :     Uses sendmsg() with the destination endpoint in msg_name.
     412                 : 
     413                 :     @tparam Base The backend's base op type.
     414                 : */
     415                 : template<class Base>
     416                 : struct reactor_send_to_op : Base
     417                 : {
     418                 :     /// Maximum scatter-gather buffer count.
     419                 :     static constexpr std::size_t max_buffers = 16;
     420                 : 
     421                 :     /// Scatter-gather I/O vectors.
     422                 :     iovec iovecs[max_buffers];
     423                 : 
     424                 :     /// Number of active I/O vectors.
     425                 :     int iovec_count = 0;
     426                 : 
     427                 :     /// Destination address storage.
     428                 :     sockaddr_storage dest_storage{};
     429                 : 
     430                 :     /// Destination address length.
     431                 :     socklen_t dest_len = 0;
     432                 : 
     433 HIT          22 :     void reset() noexcept
     434                 :     {
     435              22 :         Base::reset();
     436              22 :         iovec_count  = 0;
     437              22 :         dest_storage = {};
     438              22 :         dest_len     = 0;
     439              22 :     }
     440                 : 
     441 MIS           0 :     void perform_io() noexcept override
     442                 :     {
     443               0 :         msghdr msg{};
     444               0 :         msg.msg_name    = &dest_storage;
     445               0 :         msg.msg_namelen = dest_len;
     446               0 :         msg.msg_iov     = iovecs;
     447               0 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     448                 : 
     449                 : #ifdef MSG_NOSIGNAL
     450               0 :         constexpr int send_flags = MSG_NOSIGNAL;
     451                 : #else
     452                 :         constexpr int send_flags = 0;
     453                 : #endif
     454                 : 
     455                 :         ssize_t n;
     456                 :         do
     457                 :         {
     458               0 :             n = ::sendmsg(this->fd, &msg, send_flags);
     459                 :         }
     460               0 :         while (n < 0 && errno == EINTR);
     461                 : 
     462               0 :         if (n >= 0)
     463               0 :             this->complete(0, static_cast<std::size_t>(n));
     464                 :         else
     465               0 :             this->complete(errno, 0);
     466               0 :     }
     467                 : };
     468                 : 
     469                 : /** Shared recv_from operation for datagram sockets.
     470                 : 
     471                 :     Uses recvmsg() with msg_name to capture the source endpoint.
     472                 : 
     473                 :     @tparam Base The backend's base op type.
     474                 : */
     475                 : template<class Base>
     476                 : struct reactor_recv_from_op : Base
     477                 : {
     478                 :     /// Maximum scatter-gather buffer count.
     479                 :     static constexpr std::size_t max_buffers = 16;
     480                 : 
     481                 :     /// Scatter-gather I/O vectors.
     482                 :     iovec iovecs[max_buffers];
     483                 : 
     484                 :     /// Number of active I/O vectors.
     485                 :     int iovec_count = 0;
     486                 : 
     487                 :     /// Source address storage filled by recvmsg.
     488                 :     sockaddr_storage source_storage{};
     489                 : 
     490                 :     /// Output pointer for the source endpoint (set by do_recv_from).
     491                 :     endpoint* source_out = nullptr;
     492                 : 
     493                 :     /// Return true (this is a read-direction operation).
     494               0 :     bool is_read_operation() const noexcept override
     495                 :     {
     496               0 :         return true;
     497                 :     }
     498                 : 
     499 HIT          32 :     void reset() noexcept
     500                 :     {
     501              32 :         Base::reset();
     502              32 :         iovec_count    = 0;
     503              32 :         source_storage = {};
     504              32 :         source_out     = nullptr;
     505              32 :     }
     506                 : 
     507               2 :     void perform_io() noexcept override
     508                 :     {
     509               2 :         msghdr msg{};
     510               2 :         msg.msg_name    = &source_storage;
     511               2 :         msg.msg_namelen = sizeof(source_storage);
     512               2 :         msg.msg_iov     = iovecs;
     513               2 :         msg.msg_iovlen  = static_cast<std::size_t>(iovec_count);
     514                 : 
     515                 :         ssize_t n;
     516                 :         do
     517                 :         {
     518               2 :             n = ::recvmsg(this->fd, &msg, 0);
     519                 :         }
     520               2 :         while (n < 0 && errno == EINTR);
     521                 : 
     522               2 :         if (n >= 0)
     523               2 :             this->complete(0, static_cast<std::size_t>(n));
     524                 :         else
     525 MIS           0 :             this->complete(errno, 0);
     526 HIT           2 :     }
     527                 : };
     528                 : 
     529                 : } // namespace boost::corosio::detail
     530                 : 
     531                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
        

Generated by: LCOV version 2.3