LCOV - code coverage report
Current view: top level - corosio/detail - timer_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.6 % 371 336 35
Test Date: 2026-03-17 15:52:49 Functions: 97.8 % 45 44 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/corosio
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      12                 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      13                 : 
      14                 : #include <boost/corosio/timer.hpp>
      15                 : #include <boost/corosio/io_context.hpp>
      16                 : #include <boost/corosio/detail/scheduler_op.hpp>
      17                 : #include <boost/corosio/native/native_scheduler.hpp>
      18                 : #include <boost/corosio/detail/intrusive.hpp>
      19                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      20                 : #include <boost/capy/error.hpp>
      21                 : #include <boost/capy/ex/execution_context.hpp>
      22                 : #include <boost/capy/ex/executor_ref.hpp>
      23                 : #include <system_error>
      24                 : 
      25                 : #include <atomic>
      26                 : #include <chrono>
      27                 : #include <coroutine>
      28                 : #include <cstddef>
      29                 : #include <limits>
      30                 : #include <mutex>
      31                 : #include <optional>
      32                 : #include <stop_token>
      33                 : #include <utility>
      34                 : #include <vector>
      35                 : 
      36                 : namespace boost::corosio::detail {
      37                 : 
      38                 : struct scheduler;
      39                 : 
      40                 : /*
      41                 :     Timer Service
      42                 :     =============
      43                 : 
      44                 :     Data Structures
      45                 :     ---------------
      46                 :     waiter_node holds per-waiter state: coroutine handle, executor,
      47                 :     error output, stop_token, embedded completion_op. Each concurrent
      48                 :     co_await t.wait() allocates one waiter_node.
      49                 : 
      50                 :     timer_service::implementation holds per-timer state: expiry,
      51                 :     heap index, and an intrusive_list of waiter_nodes. Multiple
      52                 :     coroutines can wait on the same timer simultaneously.
      53                 : 
      54                 :     timer_service owns a min-heap of active timers, a free list
      55                 :     of recycled impls, and a free list of recycled waiter_nodes. The
      56                 :     heap is ordered by expiry time; the scheduler queries
      57                 :     nearest_expiry() to set the epoll/timerfd timeout.
      58                 : 
      59                 :     Optimization Strategy
      60                 :     ---------------------
      61                 :     1. Deferred heap insertion — expires_after() stores the expiry
      62                 :        but does not insert into the heap. Insertion happens in wait().
      63                 :     2. Thread-local impl cache — single-slot per-thread cache.
      64                 :     3. Embedded completion_op — eliminates heap allocation per fire/cancel.
      65                 :     4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
      66                 :     5. might_have_pending_waits_ flag — skips lock when no wait issued.
      67                 :     6. Thread-local waiter cache — single-slot per-thread cache.
      68                 : 
      69                 :     Concurrency
      70                 :     -----------
      71                 :     stop_token callbacks can fire from any thread. The impl_
      72                 :     pointer on waiter_node is used as a "still in list" marker.
      73                 : */
      74                 : 
      75                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
      76                 : 
      77                 : inline void timer_service_invalidate_cache() noexcept;
      78                 : 
      79                 : // timer_service class body — member function definitions are
      80                 : // out-of-class (after implementation and waiter_node are complete)
      81                 : class BOOST_COROSIO_DECL timer_service final
      82                 :     : public capy::execution_context::service
      83                 :     , public io_object::io_service
      84                 : {
      85                 : public:
      86                 :     using clock_type = std::chrono::steady_clock;
      87                 :     using time_point = clock_type::time_point;
      88                 : 
      89                 :     /// Type-erased callback for earliest-expiry-changed notifications.
      90                 :     class callback
      91                 :     {
      92                 :         void* ctx_         = nullptr;
      93                 :         void (*fn_)(void*) = nullptr;
      94                 : 
      95                 :     public:
      96                 :         /// Construct an empty callback.
      97 HIT         466 :         callback() = default;
      98                 : 
      99                 :         /// Construct a callback with the given context and function.
     100             466 :         callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
     101                 : 
     102                 :         /// Return true if the callback is non-empty.
     103                 :         explicit operator bool() const noexcept
     104                 :         {
     105                 :             return fn_ != nullptr;
     106                 :         }
     107                 : 
     108                 :         /// Invoke the callback.
     109            4946 :         void operator()() const
     110                 :         {
     111            4946 :             if (fn_)
     112            4946 :                 fn_(ctx_);
     113            4946 :         }
     114                 :     };
     115                 : 
     116                 :     struct implementation;
     117                 : 
     118                 : private:
     119                 :     struct heap_entry
     120                 :     {
     121                 :         time_point time_;
     122                 :         implementation* timer_;
     123                 :     };
     124                 : 
     125                 :     scheduler* sched_ = nullptr;
     126                 :     mutable std::mutex mutex_;
     127                 :     std::vector<heap_entry> heap_;
     128                 :     implementation* free_list_     = nullptr;
     129                 :     waiter_node* waiter_free_list_ = nullptr;
     130                 :     callback on_earliest_changed_;
     131                 :     bool shutting_down_ = false;
     132                 :     // Avoids mutex in nearest_expiry() and empty()
     133                 :     mutable std::atomic<std::int64_t> cached_nearest_ns_{
     134                 :         (std::numeric_limits<std::int64_t>::max)()};
     135                 : 
     136                 : public:
     137                 :     /// Construct the timer service bound to a scheduler.
     138             466 :     inline timer_service(capy::execution_context&, scheduler& sched)
     139             466 :         : sched_(&sched)
     140                 :     {
     141             466 :     }
     142                 : 
     143                 :     /// Return the associated scheduler.
     144            9976 :     inline scheduler& get_scheduler() noexcept
     145                 :     {
     146            9976 :         return *sched_;
     147                 :     }
     148                 : 
     149                 :     /// Destroy the timer service.
     150             932 :     ~timer_service() override = default;
     151                 : 
     152                 :     timer_service(timer_service const&)            = delete;
     153                 :     timer_service& operator=(timer_service const&) = delete;
     154                 : 
     155                 :     /// Register a callback invoked when the earliest expiry changes.
     156             466 :     inline void set_on_earliest_changed(callback cb)
     157                 :     {
     158             466 :         on_earliest_changed_ = cb;
     159             466 :     }
     160                 : 
     161                 :     /// Return true if no timers are in the heap.
     162                 :     inline bool empty() const noexcept
     163                 :     {
     164                 :         return cached_nearest_ns_.load(std::memory_order_acquire) ==
     165                 :             (std::numeric_limits<std::int64_t>::max)();
     166                 :     }
     167                 : 
     168                 :     /// Return the nearest timer expiry without acquiring the mutex.
     169          123361 :     inline time_point nearest_expiry() const noexcept
     170                 :     {
     171          123361 :         auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
     172          123361 :         return time_point(time_point::duration(ns));
     173                 :     }
     174                 : 
     175                 :     /// Cancel all pending timers and free cached resources.
     176                 :     inline void shutdown() override;
     177                 : 
     178                 :     /// Construct a new timer implementation.
     179                 :     inline io_object::implementation* construct() override;
     180                 : 
     181                 :     /// Destroy a timer implementation, cancelling pending waiters.
     182                 :     inline void destroy(io_object::implementation* p) override;
     183                 : 
     184                 :     /// Cancel and recycle a timer implementation.
     185                 :     inline void destroy_impl(implementation& impl);
     186                 : 
     187                 :     /// Create or recycle a waiter node.
     188                 :     inline waiter_node* create_waiter();
     189                 : 
     190                 :     /// Return a waiter node to the cache or free list.
     191                 :     inline void destroy_waiter(waiter_node* w);
     192                 : 
     193                 :     /// Update the timer expiry, cancelling existing waiters.
     194                 :     inline std::size_t update_timer(implementation& impl, time_point new_time);
     195                 : 
     196                 :     /// Insert a waiter into the timer's waiter list and the heap.
     197                 :     inline void insert_waiter(implementation& impl, waiter_node* w);
     198                 : 
     199                 :     /// Cancel all waiters on a timer.
     200                 :     inline std::size_t cancel_timer(implementation& impl);
     201                 : 
     202                 :     /// Cancel a single waiter ( stop_token callback path ).
     203                 :     inline void cancel_waiter(waiter_node* w);
     204                 : 
     205                 :     /// Cancel one waiter on a timer.
     206                 :     inline std::size_t cancel_one_waiter(implementation& impl);
     207                 : 
     208                 :     /// Complete all waiters whose timers have expired.
     209                 :     inline std::size_t process_expired();
     210                 : 
     211                 : private:
     212          147442 :     inline void refresh_cached_nearest() noexcept
     213                 :     {
     214          147442 :         auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
     215          146953 :                                 : heap_[0].time_.time_since_epoch().count();
     216          147442 :         cached_nearest_ns_.store(ns, std::memory_order_release);
     217          147442 :     }
     218                 : 
     219                 :     inline void remove_timer_impl(implementation& impl);
     220                 :     inline void up_heap(std::size_t index);
     221                 :     inline void down_heap(std::size_t index);
     222                 :     inline void swap_heap(std::size_t i1, std::size_t i2);
     223                 : };
     224                 : 
     225                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
     226                 :     : intrusive_list<waiter_node>::node
     227                 : {
     228                 :     // Embedded completion op — avoids heap allocation per fire/cancel
     229                 :     struct completion_op final : scheduler_op
     230                 :     {
     231                 :         waiter_node* waiter_ = nullptr;
     232                 : 
     233                 :         static void do_complete(
     234                 :             void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
     235                 : 
     236             210 :         completion_op() noexcept : scheduler_op(&do_complete) {}
     237                 : 
     238                 :         void operator()() override;
     239                 :         void destroy() override;
     240                 :     };
     241                 : 
     242                 :     // Per-waiter stop_token cancellation
     243                 :     struct canceller
     244                 :     {
     245                 :         waiter_node* waiter_;
     246                 :         void operator()() const;
     247                 :     };
     248                 : 
     249                 :     // nullptr once removed from timer's waiter list (concurrency marker)
     250                 :     timer_service::implementation* impl_ = nullptr;
     251                 :     timer_service* svc_                  = nullptr;
     252                 :     std::coroutine_handle<> h_;
     253                 :     capy::executor_ref d_;
     254                 :     std::error_code* ec_out_ = nullptr;
     255                 :     std::stop_token token_;
     256                 :     std::optional<std::stop_callback<canceller>> stop_cb_;
     257                 :     completion_op op_;
     258                 :     std::error_code ec_value_;
     259                 :     waiter_node* next_free_ = nullptr;
     260                 : 
     261             210 :     waiter_node() noexcept
     262             210 :     {
     263             210 :         op_.waiter_ = this;
     264             210 :     }
     265                 : };
     266                 : 
     267                 : struct timer_service::implementation final : timer::implementation
     268                 : {
     269                 :     using clock_type = std::chrono::steady_clock;
     270                 :     using time_point = clock_type::time_point;
     271                 :     using duration   = clock_type::duration;
     272                 : 
     273                 :     timer_service* svc_ = nullptr;
     274                 :     intrusive_list<waiter_node> waiters_;
     275                 : 
     276                 :     // Free list linkage (reused when impl is on free_list)
     277                 :     implementation* next_free_ = nullptr;
     278                 : 
     279                 :     inline explicit implementation(timer_service& svc) noexcept;
     280                 : 
     281                 :     inline std::coroutine_handle<> wait(
     282                 :         std::coroutine_handle<>,
     283                 :         capy::executor_ref,
     284                 :         std::stop_token,
     285                 :         std::error_code*) override;
     286                 : };
     287                 : 
     288                 : // Thread-local caches avoid hot-path mutex acquisitions:
     289                 : // 1. Impl cache — single-slot, validated by comparing svc_
     290                 : // 2. Waiter cache — single-slot, no service affinity
     291                 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
     292                 : 
     293                 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
     294                 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
     295                 : 
     296                 : inline timer_service::implementation*
     297            5192 : try_pop_tl_cache(timer_service* svc) noexcept
     298                 : {
     299            5192 :     auto* impl = tl_cached_impl.get();
     300            5192 :     if (impl)
     301                 :     {
     302            4943 :         tl_cached_impl.set(nullptr);
     303            4943 :         if (impl->svc_ == svc)
     304            4943 :             return impl;
     305                 :         // Stale impl from a destroyed service
     306 MIS           0 :         delete impl;
     307                 :     }
     308 HIT         249 :     return nullptr;
     309                 : }
     310                 : 
     311                 : inline bool
     312            5184 : try_push_tl_cache(timer_service::implementation* impl) noexcept
     313                 : {
     314            5184 :     if (!tl_cached_impl.get())
     315                 :     {
     316            5104 :         tl_cached_impl.set(impl);
     317            5104 :         return true;
     318                 :     }
     319              80 :     return false;
     320                 : }
     321                 : 
     322                 : inline waiter_node*
     323            4992 : try_pop_waiter_tl_cache() noexcept
     324                 : {
     325            4992 :     auto* w = tl_cached_waiter.get();
     326            4992 :     if (w)
     327                 :     {
     328            4780 :         tl_cached_waiter.set(nullptr);
     329            4780 :         return w;
     330                 :     }
     331             212 :     return nullptr;
     332                 : }
     333                 : 
     334                 : inline bool
     335            4976 : try_push_waiter_tl_cache(waiter_node* w) noexcept
     336                 : {
     337            4976 :     if (!tl_cached_waiter.get())
     338                 :     {
     339            4896 :         tl_cached_waiter.set(w);
     340            4896 :         return true;
     341                 :     }
     342              80 :     return false;
     343                 : }
     344                 : 
     345                 : inline void
     346             466 : timer_service_invalidate_cache() noexcept
     347                 : {
     348             466 :     delete tl_cached_impl.get();
     349             466 :     tl_cached_impl.set(nullptr);
     350                 : 
     351             466 :     delete tl_cached_waiter.get();
     352             466 :     tl_cached_waiter.set(nullptr);
     353             466 : }
     354                 : 
     355                 : // timer_service out-of-class member function definitions
     356                 : 
     357             249 : inline timer_service::implementation::implementation(
     358             249 :     timer_service& svc) noexcept
     359             249 :     : svc_(&svc)
     360                 : {
     361             249 : }
     362                 : 
     363                 : inline void
     364             466 : timer_service::shutdown()
     365                 : {
     366             466 :     timer_service_invalidate_cache();
     367             466 :     shutting_down_ = true;
     368                 : 
     369                 :     // Snapshot impls and detach them from the heap so that
     370                 :     // coroutine-owned timer destructors (triggered by h.destroy()
     371                 :     // below) cannot re-enter remove_timer_impl() and mutate the
     372                 :     // vector during iteration.
     373             466 :     std::vector<implementation*> impls;
     374             466 :     impls.reserve(heap_.size());
     375             474 :     for (auto& entry : heap_)
     376                 :     {
     377               8 :         entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     378               8 :         impls.push_back(entry.timer_);
     379                 :     }
     380             466 :     heap_.clear();
     381             466 :     cached_nearest_ns_.store(
     382                 :         (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
     383                 : 
     384                 :     // Cancel waiting timers. Each waiter called work_started()
     385                 :     // in implementation::wait(). On IOCP the scheduler shutdown
     386                 :     // loop exits when outstanding_work_ reaches zero, so we must
     387                 :     // call work_finished() here to balance it. On other backends
     388                 :     // this is harmless.
     389             474 :     for (auto* impl : impls)
     390                 :     {
     391              16 :         while (auto* w = impl->waiters_.pop_front())
     392                 :         {
     393               8 :             w->stop_cb_.reset();
     394               8 :             auto h = std::exchange(w->h_, {});
     395               8 :             sched_->work_finished();
     396               8 :             if (h)
     397               8 :                 h.destroy();
     398               8 :             delete w;
     399               8 :         }
     400               8 :         delete impl;
     401                 :     }
     402                 : 
     403                 :     // Delete free-listed impls
     404             546 :     while (free_list_)
     405                 :     {
     406              80 :         auto* next = free_list_->next_free_;
     407              80 :         delete free_list_;
     408              80 :         free_list_ = next;
     409                 :     }
     410                 : 
     411                 :     // Delete free-listed waiters
     412             544 :     while (waiter_free_list_)
     413                 :     {
     414              78 :         auto* next = waiter_free_list_->next_free_;
     415              78 :         delete waiter_free_list_;
     416              78 :         waiter_free_list_ = next;
     417                 :     }
     418             466 : }
     419                 : 
     420                 : inline io_object::implementation*
     421            5192 : timer_service::construct()
     422                 : {
     423            5192 :     implementation* impl = try_pop_tl_cache(this);
     424            5192 :     if (impl)
     425                 :     {
     426            4943 :         impl->svc_        = this;
     427            4943 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     428            4943 :         impl->might_have_pending_waits_ = false;
     429            4943 :         return impl;
     430                 :     }
     431                 : 
     432             249 :     std::lock_guard lock(mutex_);
     433             249 :     if (free_list_)
     434                 :     {
     435 MIS           0 :         impl              = free_list_;
     436               0 :         free_list_        = impl->next_free_;
     437               0 :         impl->next_free_  = nullptr;
     438               0 :         impl->svc_        = this;
     439               0 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     440               0 :         impl->might_have_pending_waits_ = false;
     441                 :     }
     442                 :     else
     443                 :     {
     444 HIT         249 :         impl = new implementation(*this);
     445                 :     }
     446             249 :     return impl;
     447             249 : }
     448                 : 
     449                 : inline void
     450            5190 : timer_service::destroy(io_object::implementation* p)
     451                 : {
     452            5190 :     destroy_impl(static_cast<implementation&>(*p));
     453            5190 : }
     454                 : 
     455                 : inline void
     456            5190 : timer_service::destroy_impl(implementation& impl)
     457                 : {
     458                 :     // During shutdown the impl is owned by the shutdown loop.
     459                 :     // Re-entering here (from a coroutine-owned timer destructor
     460                 :     // triggered by h.destroy()) must not modify the heap or
     461                 :     // recycle the impl — shutdown deletes it directly.
     462            5190 :     if (shutting_down_)
     463            5110 :         return;
     464                 : 
     465            5184 :     cancel_timer(impl);
     466                 : 
     467            5184 :     if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
     468                 :     {
     469 MIS           0 :         std::lock_guard lock(mutex_);
     470               0 :         remove_timer_impl(impl);
     471               0 :         refresh_cached_nearest();
     472               0 :     }
     473                 : 
     474 HIT        5184 :     if (try_push_tl_cache(&impl))
     475            5104 :         return;
     476                 : 
     477              80 :     std::lock_guard lock(mutex_);
     478              80 :     impl.next_free_ = free_list_;
     479              80 :     free_list_      = &impl;
     480              80 : }
     481                 : 
     482                 : inline waiter_node*
     483            4992 : timer_service::create_waiter()
     484                 : {
     485            4992 :     if (auto* w = try_pop_waiter_tl_cache())
     486            4780 :         return w;
     487                 : 
     488             212 :     std::lock_guard lock(mutex_);
     489             212 :     if (waiter_free_list_)
     490                 :     {
     491               2 :         auto* w           = waiter_free_list_;
     492               2 :         waiter_free_list_ = w->next_free_;
     493               2 :         w->next_free_     = nullptr;
     494               2 :         return w;
     495                 :     }
     496                 : 
     497             210 :     return new waiter_node();
     498             212 : }
     499                 : 
     500                 : inline void
     501            4976 : timer_service::destroy_waiter(waiter_node* w)
     502                 : {
     503            4976 :     if (try_push_waiter_tl_cache(w))
     504            4896 :         return;
     505                 : 
     506              80 :     std::lock_guard lock(mutex_);
     507              80 :     w->next_free_     = waiter_free_list_;
     508              80 :     waiter_free_list_ = w;
     509              80 : }
     510                 : 
     511                 : inline std::size_t
     512               6 : timer_service::update_timer(implementation& impl, time_point new_time)
     513                 : {
     514                 :     bool in_heap =
     515               6 :         (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
     516               6 :     if (!in_heap && impl.waiters_.empty())
     517 MIS           0 :         return 0;
     518                 : 
     519 HIT           6 :     bool notify = false;
     520               6 :     intrusive_list<waiter_node> canceled;
     521                 : 
     522                 :     {
     523               6 :         std::lock_guard lock(mutex_);
     524                 : 
     525              16 :         while (auto* w = impl.waiters_.pop_front())
     526                 :         {
     527              10 :             w->impl_ = nullptr;
     528              10 :             canceled.push_back(w);
     529              10 :         }
     530                 : 
     531               6 :         if (impl.heap_index_ < heap_.size())
     532                 :         {
     533               6 :             time_point old_time           = heap_[impl.heap_index_].time_;
     534               6 :             heap_[impl.heap_index_].time_ = new_time;
     535                 : 
     536               6 :             if (new_time < old_time)
     537               6 :                 up_heap(impl.heap_index_);
     538                 :             else
     539 MIS           0 :                 down_heap(impl.heap_index_);
     540                 : 
     541 HIT           6 :             notify = (impl.heap_index_ == 0);
     542                 :         }
     543                 : 
     544               6 :         refresh_cached_nearest();
     545               6 :     }
     546                 : 
     547               6 :     std::size_t count = 0;
     548              16 :     while (auto* w = canceled.pop_front())
     549                 :     {
     550              10 :         w->ec_value_ = make_error_code(capy::error::canceled);
     551              10 :         sched_->post(&w->op_);
     552              10 :         ++count;
     553              10 :     }
     554                 : 
     555               6 :     if (notify)
     556               6 :         on_earliest_changed_();
     557                 : 
     558               6 :     return count;
     559                 : }
     560                 : 
     561                 : inline void
     562            4992 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
     563                 : {
     564            4992 :     bool notify = false;
     565                 :     {
     566            4992 :         std::lock_guard lock(mutex_);
     567            4992 :         if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
     568                 :         {
     569            4970 :             impl.heap_index_ = heap_.size();
     570            4970 :             heap_.push_back({impl.expiry_, &impl});
     571            4970 :             up_heap(heap_.size() - 1);
     572            4970 :             notify = (impl.heap_index_ == 0);
     573            4970 :             refresh_cached_nearest();
     574                 :         }
     575            4992 :         impl.waiters_.push_back(w);
     576            4992 :     }
     577            4992 :     if (notify)
     578            4940 :         on_earliest_changed_();
     579            4992 : }
     580                 : 
     581                 : inline std::size_t
     582            5192 : timer_service::cancel_timer(implementation& impl)
     583                 : {
     584            5192 :     if (!impl.might_have_pending_waits_)
     585            5168 :         return 0;
     586                 : 
     587                 :     // Not in heap and no waiters — just clear the flag
     588              24 :     if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
     589 MIS           0 :         impl.waiters_.empty())
     590                 :     {
     591               0 :         impl.might_have_pending_waits_ = false;
     592               0 :         return 0;
     593                 :     }
     594                 : 
     595 HIT          24 :     intrusive_list<waiter_node> canceled;
     596                 : 
     597                 :     {
     598              24 :         std::lock_guard lock(mutex_);
     599              24 :         remove_timer_impl(impl);
     600              52 :         while (auto* w = impl.waiters_.pop_front())
     601                 :         {
     602              28 :             w->impl_ = nullptr;
     603              28 :             canceled.push_back(w);
     604              28 :         }
     605              24 :         refresh_cached_nearest();
     606              24 :     }
     607                 : 
     608              24 :     impl.might_have_pending_waits_ = false;
     609                 : 
     610              24 :     std::size_t count = 0;
     611              52 :     while (auto* w = canceled.pop_front())
     612                 :     {
     613              28 :         w->ec_value_ = make_error_code(capy::error::canceled);
     614              28 :         sched_->post(&w->op_);
     615              28 :         ++count;
     616              28 :     }
     617                 : 
     618              24 :     return count;
     619                 : }
     620                 : 
     621                 : inline void
     622              30 : timer_service::cancel_waiter(waiter_node* w)
     623                 : {
     624                 :     {
     625              30 :         std::lock_guard lock(mutex_);
     626                 :         // Already removed by cancel_timer or process_expired
     627              30 :         if (!w->impl_)
     628 MIS           0 :             return;
     629 HIT          30 :         auto* impl = w->impl_;
     630              30 :         w->impl_   = nullptr;
     631              30 :         impl->waiters_.remove(w);
     632              30 :         if (impl->waiters_.empty())
     633                 :         {
     634              28 :             remove_timer_impl(*impl);
     635              28 :             impl->might_have_pending_waits_ = false;
     636                 :         }
     637              30 :         refresh_cached_nearest();
     638              30 :     }
     639                 : 
     640              30 :     w->ec_value_ = make_error_code(capy::error::canceled);
     641              30 :     sched_->post(&w->op_);
     642                 : }
     643                 : 
     644                 : inline std::size_t
     645               2 : timer_service::cancel_one_waiter(implementation& impl)
     646                 : {
     647               2 :     if (!impl.might_have_pending_waits_)
     648 MIS           0 :         return 0;
     649                 : 
     650 HIT           2 :     waiter_node* w = nullptr;
     651                 : 
     652                 :     {
     653               2 :         std::lock_guard lock(mutex_);
     654               2 :         w = impl.waiters_.pop_front();
     655               2 :         if (!w)
     656 MIS           0 :             return 0;
     657 HIT           2 :         w->impl_ = nullptr;
     658               2 :         if (impl.waiters_.empty())
     659                 :         {
     660 MIS           0 :             remove_timer_impl(impl);
     661               0 :             impl.might_have_pending_waits_ = false;
     662                 :         }
     663 HIT           2 :         refresh_cached_nearest();
     664               2 :     }
     665                 : 
     666               2 :     w->ec_value_ = make_error_code(capy::error::canceled);
     667               2 :     sched_->post(&w->op_);
     668               2 :     return 1;
     669                 : }
     670                 : 
     671                 : inline std::size_t
     672          142410 : timer_service::process_expired()
     673                 : {
     674          142410 :     intrusive_list<waiter_node> expired;
     675                 : 
     676                 :     {
     677          142410 :         std::lock_guard lock(mutex_);
     678          142410 :         auto now = clock_type::now();
     679                 : 
     680          147320 :         while (!heap_.empty() && heap_[0].time_ <= now)
     681                 :         {
     682            4910 :             implementation* t = heap_[0].timer_;
     683            4910 :             remove_timer_impl(*t);
     684            9824 :             while (auto* w = t->waiters_.pop_front())
     685                 :             {
     686            4914 :                 w->impl_     = nullptr;
     687            4914 :                 w->ec_value_ = {};
     688            4914 :                 expired.push_back(w);
     689            4914 :             }
     690            4910 :             t->might_have_pending_waits_ = false;
     691                 :         }
     692                 : 
     693          142410 :         refresh_cached_nearest();
     694          142410 :     }
     695                 : 
     696          142410 :     std::size_t count = 0;
     697          147324 :     while (auto* w = expired.pop_front())
     698                 :     {
     699            4914 :         sched_->post(&w->op_);
     700            4914 :         ++count;
     701            4914 :     }
     702                 : 
     703          142410 :     return count;
     704                 : }
     705                 : 
     706                 : inline void
     707            4962 : timer_service::remove_timer_impl(implementation& impl)
     708                 : {
     709            4962 :     std::size_t index = impl.heap_index_;
     710            4962 :     if (index >= heap_.size())
     711 MIS           0 :         return; // Not in heap
     712                 : 
     713 HIT        4962 :     if (index == heap_.size() - 1)
     714                 :     {
     715                 :         // Last element, just pop
     716             153 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     717             153 :         heap_.pop_back();
     718                 :     }
     719                 :     else
     720                 :     {
     721                 :         // Swap with last and reheapify
     722            4809 :         swap_heap(index, heap_.size() - 1);
     723            4809 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     724            4809 :         heap_.pop_back();
     725                 : 
     726            4809 :         if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
     727 MIS           0 :             up_heap(index);
     728                 :         else
     729 HIT        4809 :             down_heap(index);
     730                 :     }
     731                 : }
     732                 : 
     733                 : inline void
     734            4976 : timer_service::up_heap(std::size_t index)
     735                 : {
     736            9765 :     while (index > 0)
     737                 :     {
     738            4819 :         std::size_t parent = (index - 1) / 2;
     739            4819 :         if (!(heap_[index].time_ < heap_[parent].time_))
     740              30 :             break;
     741            4789 :         swap_heap(index, parent);
     742            4789 :         index = parent;
     743                 :     }
     744            4976 : }
     745                 : 
     746                 : inline void
     747            4809 : timer_service::down_heap(std::size_t index)
     748                 : {
     749            4809 :     std::size_t child = index * 2 + 1;
     750            4809 :     while (child < heap_.size())
     751                 :     {
     752               6 :         std::size_t min_child = (child + 1 == heap_.size() ||
     753 MIS           0 :                                  heap_[child].time_ < heap_[child + 1].time_)
     754 HIT           6 :             ? child
     755               6 :             : child + 1;
     756                 : 
     757               6 :         if (heap_[index].time_ < heap_[min_child].time_)
     758               6 :             break;
     759                 : 
     760 MIS           0 :         swap_heap(index, min_child);
     761               0 :         index = min_child;
     762               0 :         child = index * 2 + 1;
     763                 :     }
     764 HIT        4809 : }
     765                 : 
     766                 : inline void
     767            9598 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
     768                 : {
     769            9598 :     heap_entry tmp                = heap_[i1];
     770            9598 :     heap_[i1]                     = heap_[i2];
     771            9598 :     heap_[i2]                     = tmp;
     772            9598 :     heap_[i1].timer_->heap_index_ = i1;
     773            9598 :     heap_[i2].timer_->heap_index_ = i2;
     774            9598 : }
     775                 : 
     776                 : // waiter_node out-of-class member function definitions
     777                 : 
     778                 : inline void
     779              30 : waiter_node::canceller::operator()() const
     780                 : {
     781              30 :     waiter_->svc_->cancel_waiter(waiter_);
     782              30 : }
     783                 : 
     784                 : inline void
     785 MIS           0 : waiter_node::completion_op::do_complete(
     786                 :     [[maybe_unused]] void* owner,
     787                 :     scheduler_op* base,
     788                 :     std::uint32_t,
     789                 :     std::uint32_t)
     790                 : {
     791                 :     // owner is always non-null here. The destroy path (owner == nullptr)
     792                 :     // is unreachable because completion_op overrides destroy() directly,
     793                 :     // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
     794               0 :     BOOST_COROSIO_ASSERT(owner);
     795               0 :     static_cast<completion_op*>(base)->operator()();
     796               0 : }
     797                 : 
     798                 : inline void
     799 HIT        4976 : waiter_node::completion_op::operator()()
     800                 : {
     801            4976 :     auto* w = waiter_;
     802            4976 :     w->stop_cb_.reset();
     803            4976 :     if (w->ec_out_)
     804            4976 :         *w->ec_out_ = w->ec_value_;
     805                 : 
     806            4976 :     auto h      = w->h_;
     807            4976 :     auto d      = w->d_;
     808            4976 :     auto* svc   = w->svc_;
     809            4976 :     auto& sched = svc->get_scheduler();
     810                 : 
     811            4976 :     svc->destroy_waiter(w);
     812                 : 
     813            4976 :     d.post(h);
     814            4976 :     sched.work_finished();
     815            4976 : }
     816                 : 
     817                 : // GCC 14 false-positive: inlining ~optional<stop_callback> through
     818                 : // delete loses track that stop_cb_ was already .reset() above.
     819                 : #if defined(__GNUC__) && !defined(__clang__)
     820                 : #pragma GCC diagnostic push
     821                 : #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
     822                 : #endif
     823                 : inline void
     824               8 : waiter_node::completion_op::destroy()
     825                 : {
     826                 :     // Called during scheduler shutdown drain when this completion_op is
     827                 :     // in the scheduler's ready queue (posted by cancel_timer() or
     828                 :     // process_expired()). Balances the work_started() from
     829                 :     // implementation::wait(). The scheduler drain loop separately
     830                 :     // balances the work_started() from post(). On IOCP both decrements
     831                 :     // are required for outstanding_work_ to reach zero; on other
     832                 :     // backends this is harmless.
     833                 :     //
     834                 :     // This override also prevents scheduler_op::destroy() from calling
     835                 :     // do_complete(nullptr, ...). See also: timer_service::shutdown()
     836                 :     // which drains waiters still in the timer heap (the other path).
     837               8 :     auto* w = waiter_;
     838               8 :     w->stop_cb_.reset();
     839               8 :     auto h      = std::exchange(w->h_, {});
     840               8 :     auto& sched = w->svc_->get_scheduler();
     841               8 :     delete w;
     842               8 :     sched.work_finished();
     843               8 :     if (h)
     844               8 :         h.destroy();
     845               8 : }
     846                 : #if defined(__GNUC__) && !defined(__clang__)
     847                 : #pragma GCC diagnostic pop
     848                 : #endif
     849                 : 
     850                 : inline std::coroutine_handle<>
     851            4992 : timer_service::implementation::wait(
     852                 :     std::coroutine_handle<> h,
     853                 :     capy::executor_ref d,
     854                 :     std::stop_token token,
     855                 :     std::error_code* ec)
     856                 : {
     857                 :     // Already-expired fast path — no waiter_node, no mutex.
     858                 :     // Post instead of dispatch so the coroutine yields to the
     859                 :     // scheduler, allowing other queued work to run.
     860            4992 :     if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
     861                 :     {
     862            4970 :         if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
     863                 :         {
     864 MIS           0 :             if (ec)
     865               0 :                 *ec = {};
     866               0 :             d.post(h);
     867               0 :             return std::noop_coroutine();
     868                 :         }
     869                 :     }
     870                 : 
     871 HIT        4992 :     auto* w    = svc_->create_waiter();
     872            4992 :     w->impl_   = this;
     873            4992 :     w->svc_    = svc_;
     874            4992 :     w->h_      = h;
     875            4992 :     w->d_      = d;
     876            4992 :     w->token_  = std::move(token);
     877            4992 :     w->ec_out_ = ec;
     878                 : 
     879            4992 :     svc_->insert_waiter(*this, w);
     880            4992 :     might_have_pending_waits_ = true;
     881            4992 :     svc_->get_scheduler().work_started();
     882                 : 
     883            4992 :     if (w->token_.stop_possible())
     884              48 :         w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
     885                 : 
     886            4992 :     return std::noop_coroutine();
     887                 : }
     888                 : 
     889                 : // Free functions
     890                 : 
     891                 : struct timer_service_access
     892                 : {
     893            5192 :     static native_scheduler& get_scheduler(io_context& ctx) noexcept
     894                 :     {
     895            5192 :         return static_cast<native_scheduler&>(*ctx.sched_);
     896                 :     }
     897                 : };
     898                 : 
     899                 : // Bypass find_service() mutex by reading the scheduler's cached pointer
     900                 : inline io_object::io_service&
     901            5192 : timer_service_direct(capy::execution_context& ctx) noexcept
     902                 : {
     903            5192 :     return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
     904            5192 :                 .timer_svc_;
     905                 : }
     906                 : 
     907                 : inline std::size_t
     908               6 : timer_service_update_expiry(timer::implementation& base)
     909                 : {
     910               6 :     auto& impl = static_cast<timer_service::implementation&>(base);
     911               6 :     return impl.svc_->update_timer(impl, impl.expiry_);
     912                 : }
     913                 : 
     914                 : inline std::size_t
     915               8 : timer_service_cancel(timer::implementation& base) noexcept
     916                 : {
     917               8 :     auto& impl = static_cast<timer_service::implementation&>(base);
     918               8 :     return impl.svc_->cancel_timer(impl);
     919                 : }
     920                 : 
     921                 : inline std::size_t
     922               2 : timer_service_cancel_one(timer::implementation& base) noexcept
     923                 : {
     924               2 :     auto& impl = static_cast<timer_service::implementation&>(base);
     925               2 :     return impl.svc_->cancel_one_waiter(impl);
     926                 : }
     927                 : 
     928                 : inline timer_service&
     929             466 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
     930                 : {
     931             466 :     return ctx.make_service<timer_service>(sched);
     932                 : }
     933                 : 
     934                 : } // namespace boost::corosio::detail
     935                 : 
     936                 : #endif
        

Generated by: LCOV version 2.3