LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_descriptor_state.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 74.3 % 74 55 19
Test Date: 2026-03-17 15:52:49 Functions: 100.0 % 4 4

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
      15                 : 
      16                 : #include <atomic>
      17                 : #include <cstdint>
      18                 : #include <memory>
      19                 : #include <mutex>
      20                 : 
      21                 : #include <errno.h>
      22                 : #include <sys/socket.h>
      23                 : 
      24                 : namespace boost::corosio::detail {
      25                 : 
      26                 : /// Shared reactor event constants.
      27                 : /// These match epoll numeric values; kqueue maps its events to the same.
      28                 : static constexpr std::uint32_t reactor_event_read  = 0x001;
      29                 : static constexpr std::uint32_t reactor_event_write = 0x004;
      30                 : static constexpr std::uint32_t reactor_event_error = 0x008;
      31                 : 
      32                 : /** Per-descriptor state shared across reactor backends.
      33                 : 
      34                 :     Tracks pending operations for a file descriptor. The fd is registered
      35                 :     once with the reactor and stays registered until closed. Uses deferred
      36                 :     I/O: the reactor sets ready_events atomically, then enqueues this state.
      37                 :     When popped by the scheduler, invoke_deferred_io() performs I/O under
      38                 :     the mutex and queues completed ops.
      39                 : 
      40                 :     Non-template: uses reactor_op_base pointers so the scheduler and
      41                 :     descriptor_state code exist as a single copy in the binary regardless
      42                 :     of how many backends are compiled in.
      43                 : 
      44                 :     @par Thread Safety
      45                 :     The mutex protects operation pointers and ready flags. ready_events_
      46                 :     and is_enqueued_ are atomic for lock-free reactor access.
      47                 : */
      48                 : struct reactor_descriptor_state : scheduler_op
      49                 : {
      50                 :     /// Protects operation pointers and ready/cancel flags.
      51                 :     std::mutex mutex;
      52                 : 
      53                 :     /// Pending read operation (guarded by `mutex`).
      54                 :     reactor_op_base* read_op = nullptr;
      55                 : 
      56                 :     /// Pending write operation (guarded by `mutex`).
      57                 :     reactor_op_base* write_op = nullptr;
      58                 : 
      59                 :     /// Pending connect operation (guarded by `mutex`).
      60                 :     reactor_op_base* connect_op = nullptr;
      61                 : 
      62                 :     /// True if a read edge event arrived before an op was registered.
      63                 :     bool read_ready = false;
      64                 : 
      65                 :     /// True if a write edge event arrived before an op was registered.
      66                 :     bool write_ready = false;
      67                 : 
      68                 :     /// Deferred read cancellation (IOCP-style cancel semantics).
      69                 :     bool read_cancel_pending = false;
      70                 : 
      71                 :     /// Deferred write cancellation (IOCP-style cancel semantics).
      72                 :     bool write_cancel_pending = false;
      73                 : 
      74                 :     /// Deferred connect cancellation (IOCP-style cancel semantics).
      75                 :     bool connect_cancel_pending = false;
      76                 : 
      77                 :     /// Event mask set during registration (no mutex needed).
      78                 :     std::uint32_t registered_events = 0;
      79                 : 
      80                 :     /// File descriptor this state tracks.
      81                 :     int fd = -1;
      82                 : 
      83                 :     /// Accumulated ready events (set by reactor, read by scheduler).
      84                 :     std::atomic<std::uint32_t> ready_events_{0};
      85                 : 
      86                 :     /// True while this state is queued in the scheduler's completed_ops.
      87                 :     std::atomic<bool> is_enqueued_{false};
      88                 : 
      89                 :     /// Owning scheduler for posting completions.
      90                 :     reactor_scheduler_base const* scheduler_ = nullptr;
      91                 : 
      92                 :     /// Prevents impl destruction while queued in the scheduler.
      93                 :     std::shared_ptr<void> impl_ref_;
      94                 : 
      95                 :     /// Add ready events atomically.
      96                 :     /// Release pairs with the consumer's acquire exchange on
      97                 :     /// ready_events_ so the consumer sees all flags. On x86 (TSO)
      98                 :     /// this compiles to the same LOCK OR as relaxed.
      99 HIT      160360 :     void add_ready_events(std::uint32_t ev) noexcept
     100                 :     {
     101          160360 :         ready_events_.fetch_or(ev, std::memory_order_release);
     102          160360 :     }
     103                 : 
     104                 :     /// Invoke deferred I/O and dispatch completions.
     105          160290 :     void operator()() override
     106                 :     {
     107          160290 :         invoke_deferred_io();
     108          160290 :     }
     109                 : 
     110                 :     /// Destroy without invoking.
     111                 :     /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
     112                 :     /// the self-referential cycle set by close_socket().
     113              70 :     void destroy() override
     114                 :     {
     115              70 :         impl_ref_.reset();
     116              70 :     }
     117                 : 
     118                 :     /** Perform deferred I/O and queue completions.
     119                 : 
     120                 :         Performs I/O under the mutex and queues completed ops. EAGAIN
     121                 :         ops stay parked in their slot for re-delivery on the next
     122                 :         edge event.
     123                 :     */
     124                 :     void invoke_deferred_io();
     125                 : };
     126                 : 
     127                 : inline void
     128          160290 : reactor_descriptor_state::invoke_deferred_io()
     129                 : {
     130          160290 :     std::shared_ptr<void> prevent_impl_destruction;
     131          160290 :     op_queue local_ops;
     132                 : 
     133                 :     {
     134          160290 :         std::lock_guard lock(mutex);
     135                 : 
     136                 :         // Must clear is_enqueued_ and move impl_ref_ under the same
     137                 :         // lock that processes I/O. close_socket() checks is_enqueued_
     138                 :         // under this mutex — without atomicity between the flag store
     139                 :         // and the ref move, close_socket() could see is_enqueued_==false,
     140                 :         // skip setting impl_ref_, and destroy the impl under us.
     141          160290 :         prevent_impl_destruction = std::move(impl_ref_);
     142          160290 :         is_enqueued_.store(false, std::memory_order_release);
     143                 : 
     144          160290 :         std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
     145          160290 :         if (ev == 0)
     146                 :         {
     147                 :             // Mutex unlocks here; compensate for work_cleanup's decrement
     148 MIS           0 :             scheduler_->compensating_work_started();
     149               0 :             return;
     150                 :         }
     151                 : 
     152 HIT      160290 :         int err = 0;
     153          160290 :         if (ev & reactor_event_error)
     154                 :         {
     155               2 :             socklen_t len = sizeof(err);
     156               2 :             if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     157 MIS           0 :                 err = errno;
     158 HIT           2 :             if (err == 0)
     159 MIS           0 :                 err = EIO;
     160                 :         }
     161                 : 
     162 HIT      160290 :         if (ev & reactor_event_read)
     163                 :         {
     164          133345 :             if (read_op)
     165                 :             {
     166            4622 :                 auto* rd = read_op;
     167            4622 :                 if (err)
     168 MIS           0 :                     rd->complete(err, 0);
     169                 :                 else
     170 HIT        4622 :                     rd->perform_io();
     171                 : 
     172            4622 :                 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
     173                 :                 {
     174              43 :                     rd->errn = 0;
     175                 :                 }
     176                 :                 else
     177                 :                 {
     178            4579 :                     read_op = nullptr;
     179            4579 :                     local_ops.push(rd);
     180                 :                 }
     181                 :             }
     182                 :             else
     183                 :             {
     184          128723 :                 read_ready = true;
     185                 :             }
     186                 :         }
     187          160290 :         if (ev & reactor_event_write)
     188                 :         {
     189           35861 :             bool had_write_op = (connect_op || write_op);
     190           35861 :             if (connect_op)
     191                 :             {
     192            4490 :                 auto* cn = connect_op;
     193            4490 :                 if (err)
     194               2 :                     cn->complete(err, 0);
     195                 :                 else
     196            4488 :                     cn->perform_io();
     197            4490 :                 connect_op = nullptr;
     198            4490 :                 local_ops.push(cn);
     199                 :             }
     200           35861 :             if (write_op)
     201                 :             {
     202 MIS           0 :                 auto* wr = write_op;
     203               0 :                 if (err)
     204               0 :                     wr->complete(err, 0);
     205                 :                 else
     206               0 :                     wr->perform_io();
     207                 : 
     208               0 :                 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
     209                 :                 {
     210               0 :                     wr->errn = 0;
     211                 :                 }
     212                 :                 else
     213                 :                 {
     214               0 :                     write_op = nullptr;
     215               0 :                     local_ops.push(wr);
     216                 :                 }
     217                 :             }
     218 HIT       35861 :             if (!had_write_op)
     219           31371 :                 write_ready = true;
     220                 :         }
     221          160290 :         if (err)
     222                 :         {
     223               2 :             if (read_op)
     224                 :             {
     225 MIS           0 :                 read_op->complete(err, 0);
     226               0 :                 local_ops.push(std::exchange(read_op, nullptr));
     227                 :             }
     228 HIT           2 :             if (write_op)
     229                 :             {
     230 MIS           0 :                 write_op->complete(err, 0);
     231               0 :                 local_ops.push(std::exchange(write_op, nullptr));
     232                 :             }
     233 HIT           2 :             if (connect_op)
     234                 :             {
     235 MIS           0 :                 connect_op->complete(err, 0);
     236               0 :                 local_ops.push(std::exchange(connect_op, nullptr));
     237                 :             }
     238                 :         }
     239 HIT      160290 :     }
     240                 : 
     241                 :     // Execute first handler inline — the scheduler's work_cleanup
     242                 :     // accounts for this as the "consumed" work item
     243          160290 :     scheduler_op* first = local_ops.pop();
     244          160290 :     if (first)
     245                 :     {
     246            9069 :         scheduler_->post_deferred_completions(local_ops);
     247            9069 :         (*first)();
     248                 :     }
     249                 :     else
     250                 :     {
     251          151221 :         scheduler_->compensating_work_started();
     252                 :     }
     253          160290 : }
     254                 : 
     255                 : } // namespace boost::corosio::detail
     256                 : 
     257                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
        

Generated by: LCOV version 2.3