1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Steve Gerbino
3  
// Copyright (c) 2026 Steve Gerbino
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/cppalliance/corosio
8  
// Official repository: https://github.com/cppalliance/corosio
9  
//
9  
//
10  

10  

11  
#ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
11  
#ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12  
#define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12  
#define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13  

13  

14  
#include <boost/corosio/timer.hpp>
14  
#include <boost/corosio/timer.hpp>
15  
#include <boost/corosio/io_context.hpp>
15  
#include <boost/corosio/io_context.hpp>
16  
#include <boost/corosio/detail/scheduler_op.hpp>
16  
#include <boost/corosio/detail/scheduler_op.hpp>
17  
#include <boost/corosio/native/native_scheduler.hpp>
17  
#include <boost/corosio/native/native_scheduler.hpp>
18  
#include <boost/corosio/detail/intrusive.hpp>
18  
#include <boost/corosio/detail/intrusive.hpp>
19  
#include <boost/corosio/detail/thread_local_ptr.hpp>
19  
#include <boost/corosio/detail/thread_local_ptr.hpp>
20  
#include <boost/capy/error.hpp>
20  
#include <boost/capy/error.hpp>
21  
#include <boost/capy/ex/execution_context.hpp>
21  
#include <boost/capy/ex/execution_context.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
23  
#include <system_error>
23  
#include <system_error>
24  

24  

25  
#include <atomic>
25  
#include <atomic>
26  
#include <chrono>
26  
#include <chrono>
27  
#include <coroutine>
27  
#include <coroutine>
28  
#include <cstddef>
28  
#include <cstddef>
29  
#include <limits>
29  
#include <limits>
30  
#include <mutex>
30  
#include <mutex>
31  
#include <optional>
31  
#include <optional>
32  
#include <stop_token>
32  
#include <stop_token>
33  
#include <utility>
33  
#include <utility>
34  
#include <vector>
34  
#include <vector>
35  

35  

36  
namespace boost::corosio::detail {
36  
namespace boost::corosio::detail {
37  

37  

38  
struct scheduler;
38  
struct scheduler;
39  

39  

40  
/*
40  
/*
41  
    Timer Service
41  
    Timer Service
42  
    =============
42  
    =============
43  

43  

44  
    Data Structures
44  
    Data Structures
45  
    ---------------
45  
    ---------------
46  
    waiter_node holds per-waiter state: coroutine handle, executor,
46  
    waiter_node holds per-waiter state: coroutine handle, executor,
47  
    error output, stop_token, embedded completion_op. Each concurrent
47  
    error output, stop_token, embedded completion_op. Each concurrent
48  
    co_await t.wait() allocates one waiter_node.
48  
    co_await t.wait() allocates one waiter_node.
49  

49  

50  
    timer_service::implementation holds per-timer state: expiry,
50  
    timer_service::implementation holds per-timer state: expiry,
51  
    heap index, and an intrusive_list of waiter_nodes. Multiple
51  
    heap index, and an intrusive_list of waiter_nodes. Multiple
52  
    coroutines can wait on the same timer simultaneously.
52  
    coroutines can wait on the same timer simultaneously.
53  

53  

54  
    timer_service owns a min-heap of active timers, a free list
54  
    timer_service owns a min-heap of active timers, a free list
55  
    of recycled impls, and a free list of recycled waiter_nodes. The
55  
    of recycled impls, and a free list of recycled waiter_nodes. The
56  
    heap is ordered by expiry time; the scheduler queries
56  
    heap is ordered by expiry time; the scheduler queries
57  
    nearest_expiry() to set the epoll/timerfd timeout.
57  
    nearest_expiry() to set the epoll/timerfd timeout.
58  

58  

59  
    Optimization Strategy
59  
    Optimization Strategy
60  
    ---------------------
60  
    ---------------------
61  
    1. Deferred heap insertion — expires_after() stores the expiry
61  
    1. Deferred heap insertion — expires_after() stores the expiry
62  
       but does not insert into the heap. Insertion happens in wait().
62  
       but does not insert into the heap. Insertion happens in wait().
63  
    2. Thread-local impl cache — single-slot per-thread cache.
63  
    2. Thread-local impl cache — single-slot per-thread cache.
64  
    3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64  
    3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65  
    4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65  
    4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66  
    5. might_have_pending_waits_ flag — skips lock when no wait issued.
66  
    5. might_have_pending_waits_ flag — skips lock when no wait issued.
67  
    6. Thread-local waiter cache — single-slot per-thread cache.
67  
    6. Thread-local waiter cache — single-slot per-thread cache.
68  

68  

69  
    Concurrency
69  
    Concurrency
70  
    -----------
70  
    -----------
71  
    stop_token callbacks can fire from any thread. The impl_
71  
    stop_token callbacks can fire from any thread. The impl_
72  
    pointer on waiter_node is used as a "still in list" marker.
72  
    pointer on waiter_node is used as a "still in list" marker.
73  
*/
73  
*/
74  

74  

75  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76  

76  

77  
inline void timer_service_invalidate_cache() noexcept;
77  
inline void timer_service_invalidate_cache() noexcept;
78  

78  

79  
// timer_service class body — member function definitions are
79  
// timer_service class body — member function definitions are
80  
// out-of-class (after implementation and waiter_node are complete)
80  
// out-of-class (after implementation and waiter_node are complete)
81  
class BOOST_COROSIO_DECL timer_service final
81  
class BOOST_COROSIO_DECL timer_service final
82  
    : public capy::execution_context::service
82  
    : public capy::execution_context::service
83  
    , public io_object::io_service
83  
    , public io_object::io_service
84  
{
84  
{
85  
public:
85  
public:
86  
    using clock_type = std::chrono::steady_clock;
86  
    using clock_type = std::chrono::steady_clock;
87  
    using time_point = clock_type::time_point;
87  
    using time_point = clock_type::time_point;
88  

88  

89  
    /// Type-erased callback for earliest-expiry-changed notifications.
89  
    /// Type-erased callback for earliest-expiry-changed notifications.
90  
    class callback
90  
    class callback
91  
    {
91  
    {
92  
        void* ctx_         = nullptr;
92  
        void* ctx_         = nullptr;
93  
        void (*fn_)(void*) = nullptr;
93  
        void (*fn_)(void*) = nullptr;
94  

94  

95  
    public:
95  
    public:
96  
        /// Construct an empty callback.
96  
        /// Construct an empty callback.
97  
        callback() = default;
97  
        callback() = default;
98  

98  

99  
        /// Construct a callback with the given context and function.
99  
        /// Construct a callback with the given context and function.
100  
        callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
100  
        callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
101  

101  

102  
        /// Return true if the callback is non-empty.
102  
        /// Return true if the callback is non-empty.
103  
        explicit operator bool() const noexcept
103  
        explicit operator bool() const noexcept
104  
        {
104  
        {
105  
            return fn_ != nullptr;
105  
            return fn_ != nullptr;
106  
        }
106  
        }
107  

107  

108  
        /// Invoke the callback.
108  
        /// Invoke the callback.
109  
        void operator()() const
109  
        void operator()() const
110  
        {
110  
        {
111  
            if (fn_)
111  
            if (fn_)
112  
                fn_(ctx_);
112  
                fn_(ctx_);
113  
        }
113  
        }
114  
    };
114  
    };
115  

115  

116  
    struct implementation;
116  
    struct implementation;
117  

117  

118  
private:
118  
private:
119  
    struct heap_entry
119  
    struct heap_entry
120  
    {
120  
    {
121  
        time_point time_;
121  
        time_point time_;
122  
        implementation* timer_;
122  
        implementation* timer_;
123  
    };
123  
    };
124  

124  

125  
    scheduler* sched_ = nullptr;
125  
    scheduler* sched_ = nullptr;
126  
    mutable std::mutex mutex_;
126  
    mutable std::mutex mutex_;
127  
    std::vector<heap_entry> heap_;
127  
    std::vector<heap_entry> heap_;
128  
    implementation* free_list_     = nullptr;
128  
    implementation* free_list_     = nullptr;
129  
    waiter_node* waiter_free_list_ = nullptr;
129  
    waiter_node* waiter_free_list_ = nullptr;
130  
    callback on_earliest_changed_;
130  
    callback on_earliest_changed_;
 
131 +
    bool shutting_down_ = false;
131  
    // Avoids mutex in nearest_expiry() and empty()
132  
    // Avoids mutex in nearest_expiry() and empty()
132  
    mutable std::atomic<std::int64_t> cached_nearest_ns_{
133  
    mutable std::atomic<std::int64_t> cached_nearest_ns_{
133  
        (std::numeric_limits<std::int64_t>::max)()};
134  
        (std::numeric_limits<std::int64_t>::max)()};
134  

135  

135  
public:
136  
public:
136  
    /// Construct the timer service bound to a scheduler.
137  
    /// Construct the timer service bound to a scheduler.
137  
    inline timer_service(capy::execution_context&, scheduler& sched)
138  
    inline timer_service(capy::execution_context&, scheduler& sched)
138  
        : sched_(&sched)
139  
        : sched_(&sched)
139  
    {
140  
    {
140  
    }
141  
    }
141  

142  

142  
    /// Return the associated scheduler.
143  
    /// Return the associated scheduler.
143  
    inline scheduler& get_scheduler() noexcept
144  
    inline scheduler& get_scheduler() noexcept
144  
    {
145  
    {
145  
        return *sched_;
146  
        return *sched_;
146  
    }
147  
    }
147  

148  

148  
    /// Destroy the timer service.
149  
    /// Destroy the timer service.
149  
    ~timer_service() override = default;
150  
    ~timer_service() override = default;
150  

151  

151  
    timer_service(timer_service const&)            = delete;
152  
    timer_service(timer_service const&)            = delete;
152  
    timer_service& operator=(timer_service const&) = delete;
153  
    timer_service& operator=(timer_service const&) = delete;
153  

154  

154  
    /// Register a callback invoked when the earliest expiry changes.
155  
    /// Register a callback invoked when the earliest expiry changes.
155  
    inline void set_on_earliest_changed(callback cb)
156  
    inline void set_on_earliest_changed(callback cb)
156  
    {
157  
    {
157  
        on_earliest_changed_ = cb;
158  
        on_earliest_changed_ = cb;
158  
    }
159  
    }
159  

160  

160  
    /// Return true if no timers are in the heap.
161  
    /// Return true if no timers are in the heap.
161  
    inline bool empty() const noexcept
162  
    inline bool empty() const noexcept
162  
    {
163  
    {
163  
        return cached_nearest_ns_.load(std::memory_order_acquire) ==
164  
        return cached_nearest_ns_.load(std::memory_order_acquire) ==
164  
            (std::numeric_limits<std::int64_t>::max)();
165  
            (std::numeric_limits<std::int64_t>::max)();
165  
    }
166  
    }
166  

167  

167  
    /// Return the nearest timer expiry without acquiring the mutex.
168  
    /// Return the nearest timer expiry without acquiring the mutex.
168  
    inline time_point nearest_expiry() const noexcept
169  
    inline time_point nearest_expiry() const noexcept
169  
    {
170  
    {
170  
        auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
171  
        auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
171  
        return time_point(time_point::duration(ns));
172  
        return time_point(time_point::duration(ns));
172  
    }
173  
    }
173  

174  

174  
    /// Cancel all pending timers and free cached resources.
175  
    /// Cancel all pending timers and free cached resources.
175  
    inline void shutdown() override;
176  
    inline void shutdown() override;
176  

177  

177  
    /// Construct a new timer implementation.
178  
    /// Construct a new timer implementation.
178  
    inline io_object::implementation* construct() override;
179  
    inline io_object::implementation* construct() override;
179  

180  

180  
    /// Destroy a timer implementation, cancelling pending waiters.
181  
    /// Destroy a timer implementation, cancelling pending waiters.
181  
    inline void destroy(io_object::implementation* p) override;
182  
    inline void destroy(io_object::implementation* p) override;
182  

183  

183  
    /// Cancel and recycle a timer implementation.
184  
    /// Cancel and recycle a timer implementation.
184  
    inline void destroy_impl(implementation& impl);
185  
    inline void destroy_impl(implementation& impl);
185  

186  

186  
    /// Create or recycle a waiter node.
187  
    /// Create or recycle a waiter node.
187  
    inline waiter_node* create_waiter();
188  
    inline waiter_node* create_waiter();
188  

189  

189  
    /// Return a waiter node to the cache or free list.
190  
    /// Return a waiter node to the cache or free list.
190  
    inline void destroy_waiter(waiter_node* w);
191  
    inline void destroy_waiter(waiter_node* w);
191  

192  

192  
    /// Update the timer expiry, cancelling existing waiters.
193  
    /// Update the timer expiry, cancelling existing waiters.
193  
    inline std::size_t update_timer(implementation& impl, time_point new_time);
194  
    inline std::size_t update_timer(implementation& impl, time_point new_time);
194  

195  

195  
    /// Insert a waiter into the timer's waiter list and the heap.
196  
    /// Insert a waiter into the timer's waiter list and the heap.
196  
    inline void insert_waiter(implementation& impl, waiter_node* w);
197  
    inline void insert_waiter(implementation& impl, waiter_node* w);
197  

198  

198  
    /// Cancel all waiters on a timer.
199  
    /// Cancel all waiters on a timer.
199  
    inline std::size_t cancel_timer(implementation& impl);
200  
    inline std::size_t cancel_timer(implementation& impl);
200  

201  

201  
    /// Cancel a single waiter ( stop_token callback path ).
202  
    /// Cancel a single waiter ( stop_token callback path ).
202  
    inline void cancel_waiter(waiter_node* w);
203  
    inline void cancel_waiter(waiter_node* w);
203  

204  

204  
    /// Cancel one waiter on a timer.
205  
    /// Cancel one waiter on a timer.
205  
    inline std::size_t cancel_one_waiter(implementation& impl);
206  
    inline std::size_t cancel_one_waiter(implementation& impl);
206  

207  

207  
    /// Complete all waiters whose timers have expired.
208  
    /// Complete all waiters whose timers have expired.
208  
    inline std::size_t process_expired();
209  
    inline std::size_t process_expired();
209  

210  

210  
private:
211  
private:
211  
    inline void refresh_cached_nearest() noexcept
212  
    inline void refresh_cached_nearest() noexcept
212  
    {
213  
    {
213  
        auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
214  
        auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
214  
                                : heap_[0].time_.time_since_epoch().count();
215  
                                : heap_[0].time_.time_since_epoch().count();
215  
        cached_nearest_ns_.store(ns, std::memory_order_release);
216  
        cached_nearest_ns_.store(ns, std::memory_order_release);
216  
    }
217  
    }
217  

218  

218  
    inline void remove_timer_impl(implementation& impl);
219  
    inline void remove_timer_impl(implementation& impl);
219  
    inline void up_heap(std::size_t index);
220  
    inline void up_heap(std::size_t index);
220  
    inline void down_heap(std::size_t index);
221  
    inline void down_heap(std::size_t index);
221  
    inline void swap_heap(std::size_t i1, std::size_t i2);
222  
    inline void swap_heap(std::size_t i1, std::size_t i2);
222  
};
223  
};
223  

224  

224  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
225  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
225  
    : intrusive_list<waiter_node>::node
226  
    : intrusive_list<waiter_node>::node
226  
{
227  
{
227  
    // Embedded completion op — avoids heap allocation per fire/cancel
228  
    // Embedded completion op — avoids heap allocation per fire/cancel
228  
    struct completion_op final : scheduler_op
229  
    struct completion_op final : scheduler_op
229  
    {
230  
    {
230  
        waiter_node* waiter_ = nullptr;
231  
        waiter_node* waiter_ = nullptr;
231  

232  

232  
        static void do_complete(
233  
        static void do_complete(
233  
            void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
234  
            void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
234  

235  

235  
        completion_op() noexcept : scheduler_op(&do_complete) {}
236  
        completion_op() noexcept : scheduler_op(&do_complete) {}
236  

237  

237  
        void operator()() override;
238  
        void operator()() override;
238  
        void destroy() override;
239  
        void destroy() override;
239  
    };
240  
    };
240  

241  

241  
    // Per-waiter stop_token cancellation
242  
    // Per-waiter stop_token cancellation
242  
    struct canceller
243  
    struct canceller
243  
    {
244  
    {
244  
        waiter_node* waiter_;
245  
        waiter_node* waiter_;
245  
        void operator()() const;
246  
        void operator()() const;
246  
    };
247  
    };
247  

248  

248  
    // nullptr once removed from timer's waiter list (concurrency marker)
249  
    // nullptr once removed from timer's waiter list (concurrency marker)
249  
    timer_service::implementation* impl_ = nullptr;
250  
    timer_service::implementation* impl_ = nullptr;
250  
    timer_service* svc_                  = nullptr;
251  
    timer_service* svc_                  = nullptr;
251  
    std::coroutine_handle<> h_;
252  
    std::coroutine_handle<> h_;
252  
    capy::executor_ref d_;
253  
    capy::executor_ref d_;
253  
    std::error_code* ec_out_ = nullptr;
254  
    std::error_code* ec_out_ = nullptr;
254  
    std::stop_token token_;
255  
    std::stop_token token_;
255  
    std::optional<std::stop_callback<canceller>> stop_cb_;
256  
    std::optional<std::stop_callback<canceller>> stop_cb_;
256  
    completion_op op_;
257  
    completion_op op_;
257  
    std::error_code ec_value_;
258  
    std::error_code ec_value_;
258  
    waiter_node* next_free_ = nullptr;
259  
    waiter_node* next_free_ = nullptr;
259  

260  

260  
    waiter_node() noexcept
261  
    waiter_node() noexcept
261  
    {
262  
    {
262  
        op_.waiter_ = this;
263  
        op_.waiter_ = this;
263  
    }
264  
    }
264  
};
265  
};
265  

266  

266  
struct timer_service::implementation final : timer::implementation
267  
struct timer_service::implementation final : timer::implementation
267  
{
268  
{
268  
    using clock_type = std::chrono::steady_clock;
269  
    using clock_type = std::chrono::steady_clock;
269  
    using time_point = clock_type::time_point;
270  
    using time_point = clock_type::time_point;
270  
    using duration   = clock_type::duration;
271  
    using duration   = clock_type::duration;
271  

272  

272  
    timer_service* svc_ = nullptr;
273  
    timer_service* svc_ = nullptr;
273  
    intrusive_list<waiter_node> waiters_;
274  
    intrusive_list<waiter_node> waiters_;
274  

275  

275  
    // Free list linkage (reused when impl is on free_list)
276  
    // Free list linkage (reused when impl is on free_list)
276  
    implementation* next_free_ = nullptr;
277  
    implementation* next_free_ = nullptr;
277  

278  

278  
    inline explicit implementation(timer_service& svc) noexcept;
279  
    inline explicit implementation(timer_service& svc) noexcept;
279  

280  

280  
    inline std::coroutine_handle<> wait(
281  
    inline std::coroutine_handle<> wait(
281  
        std::coroutine_handle<>,
282  
        std::coroutine_handle<>,
282  
        capy::executor_ref,
283  
        capy::executor_ref,
283  
        std::stop_token,
284  
        std::stop_token,
284  
        std::error_code*) override;
285  
        std::error_code*) override;
285  
};
286  
};
286  

287  

287  
// Thread-local caches avoid hot-path mutex acquisitions:
288  
// Thread-local caches avoid hot-path mutex acquisitions:
288  
// 1. Impl cache — single-slot, validated by comparing svc_
289  
// 1. Impl cache — single-slot, validated by comparing svc_
289  
// 2. Waiter cache — single-slot, no service affinity
290  
// 2. Waiter cache — single-slot, no service affinity
290  
// All caches are cleared by timer_service_invalidate_cache() during shutdown.
291  
// All caches are cleared by timer_service_invalidate_cache() during shutdown.
291  

292  

292  
inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
293  
inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
293  
inline thread_local_ptr<waiter_node> tl_cached_waiter;
294  
inline thread_local_ptr<waiter_node> tl_cached_waiter;
294  

295  

295  
inline timer_service::implementation*
296  
inline timer_service::implementation*
296  
try_pop_tl_cache(timer_service* svc) noexcept
297  
try_pop_tl_cache(timer_service* svc) noexcept
297  
{
298  
{
298  
    auto* impl = tl_cached_impl.get();
299  
    auto* impl = tl_cached_impl.get();
299  
    if (impl)
300  
    if (impl)
300  
    {
301  
    {
301  
        tl_cached_impl.set(nullptr);
302  
        tl_cached_impl.set(nullptr);
302  
        if (impl->svc_ == svc)
303  
        if (impl->svc_ == svc)
303  
            return impl;
304  
            return impl;
304  
        // Stale impl from a destroyed service
305  
        // Stale impl from a destroyed service
305  
        delete impl;
306  
        delete impl;
306  
    }
307  
    }
307  
    return nullptr;
308  
    return nullptr;
308  
}
309  
}
309  

310  

310  
inline bool
311  
inline bool
311  
try_push_tl_cache(timer_service::implementation* impl) noexcept
312  
try_push_tl_cache(timer_service::implementation* impl) noexcept
312  
{
313  
{
313  
    if (!tl_cached_impl.get())
314  
    if (!tl_cached_impl.get())
314  
    {
315  
    {
315  
        tl_cached_impl.set(impl);
316  
        tl_cached_impl.set(impl);
316  
        return true;
317  
        return true;
317  
    }
318  
    }
318  
    return false;
319  
    return false;
319  
}
320  
}
320  

321  

321  
inline waiter_node*
322  
inline waiter_node*
322  
try_pop_waiter_tl_cache() noexcept
323  
try_pop_waiter_tl_cache() noexcept
323  
{
324  
{
324  
    auto* w = tl_cached_waiter.get();
325  
    auto* w = tl_cached_waiter.get();
325  
    if (w)
326  
    if (w)
326  
    {
327  
    {
327  
        tl_cached_waiter.set(nullptr);
328  
        tl_cached_waiter.set(nullptr);
328  
        return w;
329  
        return w;
329  
    }
330  
    }
330  
    return nullptr;
331  
    return nullptr;
331  
}
332  
}
332  

333  

333  
inline bool
334  
inline bool
334  
try_push_waiter_tl_cache(waiter_node* w) noexcept
335  
try_push_waiter_tl_cache(waiter_node* w) noexcept
335  
{
336  
{
336  
    if (!tl_cached_waiter.get())
337  
    if (!tl_cached_waiter.get())
337  
    {
338  
    {
338  
        tl_cached_waiter.set(w);
339  
        tl_cached_waiter.set(w);
339  
        return true;
340  
        return true;
340  
    }
341  
    }
341  
    return false;
342  
    return false;
342  
}
343  
}
343  

344  

344  
inline void
345  
inline void
345  
timer_service_invalidate_cache() noexcept
346  
timer_service_invalidate_cache() noexcept
346  
{
347  
{
347  
    delete tl_cached_impl.get();
348  
    delete tl_cached_impl.get();
348  
    tl_cached_impl.set(nullptr);
349  
    tl_cached_impl.set(nullptr);
349  

350  

350  
    delete tl_cached_waiter.get();
351  
    delete tl_cached_waiter.get();
351  
    tl_cached_waiter.set(nullptr);
352  
    tl_cached_waiter.set(nullptr);
352  
}
353  
}
353  

354  

354  
// timer_service out-of-class member function definitions
355  
// timer_service out-of-class member function definitions
355  

356  

356  
inline timer_service::implementation::implementation(
357  
inline timer_service::implementation::implementation(
357  
    timer_service& svc) noexcept
358  
    timer_service& svc) noexcept
358  
    : svc_(&svc)
359  
    : svc_(&svc)
359  
{
360  
{
360  
}
361  
}
361  

362  

362  
inline void
363  
inline void
363  
timer_service::shutdown()
364  
timer_service::shutdown()
364  
{
365  
{
365  
    timer_service_invalidate_cache();
366  
    timer_service_invalidate_cache();
 
367 +
    shutting_down_ = true;
366  

368  

367 -
    // Cancel waiting timers still in the heap.
369 +
    // Snapshot impls and detach them from the heap so that
368 -
    // Each waiter called work_started() in implementation::wait().
370 +
    // coroutine-owned timer destructors (triggered by h.destroy()
369 -
    // On IOCP the scheduler shutdown loop exits when outstanding_work_
371 +
    // below) cannot re-enter remove_timer_impl() and mutate the
370 -
    // reaches zero, so we must call work_finished() here to balance it.
372 +
    // vector during iteration.
371 -
    // On other backends this is harmless (their drain loops exit when
373 +
    std::vector<implementation*> impls;
372 -
    // the queue is empty, not based on outstanding_work_).
374 +
    impls.reserve(heap_.size());
373  
    for (auto& entry : heap_)
375  
    for (auto& entry : heap_)
374  
    {
376  
    {
375 -
        auto* impl = entry.timer_;
377 +
        entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
 
378 +
        impls.push_back(entry.timer_);
 
379 +
    }
 
380 +
    heap_.clear();
 
381 +
    cached_nearest_ns_.store(
 
382 +
        (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
 
383 +

 
384 +
    // Cancel waiting timers. Each waiter called work_started()
 
385 +
    // in implementation::wait(). On IOCP the scheduler shutdown
 
386 +
    // loop exits when outstanding_work_ reaches zero, so we must
 
387 +
    // call work_finished() here to balance it. On other backends
 
388 +
    // this is harmless.
 
389 +
    for (auto* impl : impls)
 
390 +
    {
376  
        while (auto* w = impl->waiters_.pop_front())
391  
        while (auto* w = impl->waiters_.pop_front())
377  
        {
392  
        {
378  
            w->stop_cb_.reset();
393  
            w->stop_cb_.reset();
379  
            auto h = std::exchange(w->h_, {});
394  
            auto h = std::exchange(w->h_, {});
380  
            sched_->work_finished();
395  
            sched_->work_finished();
381  
            if (h)
396  
            if (h)
382  
                h.destroy();
397  
                h.destroy();
383  
            delete w;
398  
            delete w;
384 -
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
 
385  
        }
399  
        }
386  
        delete impl;
400  
        delete impl;
387 -
    heap_.clear();
 
388 -
    cached_nearest_ns_.store(
 
389 -
        (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
 
390  
    }
401  
    }
391  

402  

392  
    // Delete free-listed impls
403  
    // Delete free-listed impls
393  
    while (free_list_)
404  
    while (free_list_)
394  
    {
405  
    {
395  
        auto* next = free_list_->next_free_;
406  
        auto* next = free_list_->next_free_;
396  
        delete free_list_;
407  
        delete free_list_;
397  
        free_list_ = next;
408  
        free_list_ = next;
398  
    }
409  
    }
399  

410  

400  
    // Delete free-listed waiters
411  
    // Delete free-listed waiters
401  
    while (waiter_free_list_)
412  
    while (waiter_free_list_)
402  
    {
413  
    {
403  
        auto* next = waiter_free_list_->next_free_;
414  
        auto* next = waiter_free_list_->next_free_;
404  
        delete waiter_free_list_;
415  
        delete waiter_free_list_;
405  
        waiter_free_list_ = next;
416  
        waiter_free_list_ = next;
406  
    }
417  
    }
407  
}
418  
}
408  

419  

409  
inline io_object::implementation*
420  
inline io_object::implementation*
410  
timer_service::construct()
421  
timer_service::construct()
411  
{
422  
{
412  
    implementation* impl = try_pop_tl_cache(this);
423  
    implementation* impl = try_pop_tl_cache(this);
413  
    if (impl)
424  
    if (impl)
414  
    {
425  
    {
415  
        impl->svc_        = this;
426  
        impl->svc_        = this;
416  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
427  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
417  
        impl->might_have_pending_waits_ = false;
428  
        impl->might_have_pending_waits_ = false;
418  
        return impl;
429  
        return impl;
419  
    }
430  
    }
420  

431  

421  
    std::lock_guard lock(mutex_);
432  
    std::lock_guard lock(mutex_);
422  
    if (free_list_)
433  
    if (free_list_)
423  
    {
434  
    {
424  
        impl              = free_list_;
435  
        impl              = free_list_;
425  
        free_list_        = impl->next_free_;
436  
        free_list_        = impl->next_free_;
426  
        impl->next_free_  = nullptr;
437  
        impl->next_free_  = nullptr;
427  
        impl->svc_        = this;
438  
        impl->svc_        = this;
428  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
439  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
429  
        impl->might_have_pending_waits_ = false;
440  
        impl->might_have_pending_waits_ = false;
430  
    }
441  
    }
431  
    else
442  
    else
432  
    {
443  
    {
433  
        impl = new implementation(*this);
444  
        impl = new implementation(*this);
434  
    }
445  
    }
435  
    return impl;
446  
    return impl;
436  
}
447  
}
437  

448  

438  
inline void
449  
inline void
439  
timer_service::destroy(io_object::implementation* p)
450  
timer_service::destroy(io_object::implementation* p)
440  
{
451  
{
441  
    destroy_impl(static_cast<implementation&>(*p));
452  
    destroy_impl(static_cast<implementation&>(*p));
442  
}
453  
}
443  

454  

444  
inline void
455  
inline void
445  
timer_service::destroy_impl(implementation& impl)
456  
timer_service::destroy_impl(implementation& impl)
446  
{
457  
{
 
458 +
    // During shutdown the impl is owned by the shutdown loop.
 
459 +
    // Re-entering here (from a coroutine-owned timer destructor
 
460 +
    // triggered by h.destroy()) must not modify the heap or
 
461 +
    // recycle the impl — shutdown deletes it directly.
 
462 +
    if (shutting_down_)
 
463 +
        return;
 
464 +

447  
    cancel_timer(impl);
465  
    cancel_timer(impl);
448  

466  

449  
    if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
467  
    if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
450  
    {
468  
    {
451  
        std::lock_guard lock(mutex_);
469  
        std::lock_guard lock(mutex_);
452  
        remove_timer_impl(impl);
470  
        remove_timer_impl(impl);
453  
        refresh_cached_nearest();
471  
        refresh_cached_nearest();
454  
    }
472  
    }
455  

473  

456  
    if (try_push_tl_cache(&impl))
474  
    if (try_push_tl_cache(&impl))
457  
        return;
475  
        return;
458  

476  

459  
    std::lock_guard lock(mutex_);
477  
    std::lock_guard lock(mutex_);
460  
    impl.next_free_ = free_list_;
478  
    impl.next_free_ = free_list_;
461  
    free_list_      = &impl;
479  
    free_list_      = &impl;
462  
}
480  
}
463  

481  

464  
inline waiter_node*
482  
inline waiter_node*
465  
timer_service::create_waiter()
483  
timer_service::create_waiter()
466  
{
484  
{
467  
    if (auto* w = try_pop_waiter_tl_cache())
485  
    if (auto* w = try_pop_waiter_tl_cache())
468  
        return w;
486  
        return w;
469  

487  

470  
    std::lock_guard lock(mutex_);
488  
    std::lock_guard lock(mutex_);
471  
    if (waiter_free_list_)
489  
    if (waiter_free_list_)
472  
    {
490  
    {
473  
        auto* w           = waiter_free_list_;
491  
        auto* w           = waiter_free_list_;
474  
        waiter_free_list_ = w->next_free_;
492  
        waiter_free_list_ = w->next_free_;
475  
        w->next_free_     = nullptr;
493  
        w->next_free_     = nullptr;
476  
        return w;
494  
        return w;
477  
    }
495  
    }
478  

496  

479  
    return new waiter_node();
497  
    return new waiter_node();
480  
}
498  
}
481  

499  

482  
inline void
500  
inline void
483  
timer_service::destroy_waiter(waiter_node* w)
501  
timer_service::destroy_waiter(waiter_node* w)
484  
{
502  
{
485  
    if (try_push_waiter_tl_cache(w))
503  
    if (try_push_waiter_tl_cache(w))
486  
        return;
504  
        return;
487  

505  

488  
    std::lock_guard lock(mutex_);
506  
    std::lock_guard lock(mutex_);
489  
    w->next_free_     = waiter_free_list_;
507  
    w->next_free_     = waiter_free_list_;
490  
    waiter_free_list_ = w;
508  
    waiter_free_list_ = w;
491  
}
509  
}
492  

510  

493  
inline std::size_t
511  
inline std::size_t
494  
timer_service::update_timer(implementation& impl, time_point new_time)
512  
timer_service::update_timer(implementation& impl, time_point new_time)
495  
{
513  
{
496  
    bool in_heap =
514  
    bool in_heap =
497  
        (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
515  
        (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
498  
    if (!in_heap && impl.waiters_.empty())
516  
    if (!in_heap && impl.waiters_.empty())
499  
        return 0;
517  
        return 0;
500  

518  

501  
    bool notify = false;
519  
    bool notify = false;
502  
    intrusive_list<waiter_node> canceled;
520  
    intrusive_list<waiter_node> canceled;
503  

521  

504  
    {
522  
    {
505  
        std::lock_guard lock(mutex_);
523  
        std::lock_guard lock(mutex_);
506  

524  

507  
        while (auto* w = impl.waiters_.pop_front())
525  
        while (auto* w = impl.waiters_.pop_front())
508  
        {
526  
        {
509  
            w->impl_ = nullptr;
527  
            w->impl_ = nullptr;
510  
            canceled.push_back(w);
528  
            canceled.push_back(w);
511  
        }
529  
        }
512  

530  

513  
        if (impl.heap_index_ < heap_.size())
531  
        if (impl.heap_index_ < heap_.size())
514  
        {
532  
        {
515  
            time_point old_time           = heap_[impl.heap_index_].time_;
533  
            time_point old_time           = heap_[impl.heap_index_].time_;
516  
            heap_[impl.heap_index_].time_ = new_time;
534  
            heap_[impl.heap_index_].time_ = new_time;
517  

535  

518  
            if (new_time < old_time)
536  
            if (new_time < old_time)
519  
                up_heap(impl.heap_index_);
537  
                up_heap(impl.heap_index_);
520  
            else
538  
            else
521  
                down_heap(impl.heap_index_);
539  
                down_heap(impl.heap_index_);
522  

540  

523  
            notify = (impl.heap_index_ == 0);
541  
            notify = (impl.heap_index_ == 0);
524  
        }
542  
        }
525  

543  

526  
        refresh_cached_nearest();
544  
        refresh_cached_nearest();
527  
    }
545  
    }
528  

546  

529  
    std::size_t count = 0;
547  
    std::size_t count = 0;
530  
    while (auto* w = canceled.pop_front())
548  
    while (auto* w = canceled.pop_front())
531  
    {
549  
    {
532  
        w->ec_value_ = make_error_code(capy::error::canceled);
550  
        w->ec_value_ = make_error_code(capy::error::canceled);
533  
        sched_->post(&w->op_);
551  
        sched_->post(&w->op_);
534  
        ++count;
552  
        ++count;
535  
    }
553  
    }
536  

554  

537  
    if (notify)
555  
    if (notify)
538  
        on_earliest_changed_();
556  
        on_earliest_changed_();
539  

557  

540  
    return count;
558  
    return count;
541  
}
559  
}
542  

560  

543  
inline void
561  
inline void
544  
timer_service::insert_waiter(implementation& impl, waiter_node* w)
562  
timer_service::insert_waiter(implementation& impl, waiter_node* w)
545  
{
563  
{
546  
    bool notify = false;
564  
    bool notify = false;
547  
    {
565  
    {
548  
        std::lock_guard lock(mutex_);
566  
        std::lock_guard lock(mutex_);
549  
        if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
567  
        if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
550  
        {
568  
        {
551  
            impl.heap_index_ = heap_.size();
569  
            impl.heap_index_ = heap_.size();
552  
            heap_.push_back({impl.expiry_, &impl});
570  
            heap_.push_back({impl.expiry_, &impl});
553  
            up_heap(heap_.size() - 1);
571  
            up_heap(heap_.size() - 1);
554  
            notify = (impl.heap_index_ == 0);
572  
            notify = (impl.heap_index_ == 0);
555  
            refresh_cached_nearest();
573  
            refresh_cached_nearest();
556  
        }
574  
        }
557  
        impl.waiters_.push_back(w);
575  
        impl.waiters_.push_back(w);
558  
    }
576  
    }
559  
    if (notify)
577  
    if (notify)
560  
        on_earliest_changed_();
578  
        on_earliest_changed_();
561  
}
579  
}
562  

580  

563  
inline std::size_t
581  
inline std::size_t
564  
timer_service::cancel_timer(implementation& impl)
582  
timer_service::cancel_timer(implementation& impl)
565  
{
583  
{
566  
    if (!impl.might_have_pending_waits_)
584  
    if (!impl.might_have_pending_waits_)
567  
        return 0;
585  
        return 0;
568  

586  

569  
    // Not in heap and no waiters — just clear the flag
587  
    // Not in heap and no waiters — just clear the flag
570  
    if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
588  
    if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
571  
        impl.waiters_.empty())
589  
        impl.waiters_.empty())
572  
    {
590  
    {
573  
        impl.might_have_pending_waits_ = false;
591  
        impl.might_have_pending_waits_ = false;
574  
        return 0;
592  
        return 0;
575  
    }
593  
    }
576  

594  

577  
    intrusive_list<waiter_node> canceled;
595  
    intrusive_list<waiter_node> canceled;
578  

596  

579  
    {
597  
    {
580  
        std::lock_guard lock(mutex_);
598  
        std::lock_guard lock(mutex_);
581  
        remove_timer_impl(impl);
599  
        remove_timer_impl(impl);
582  
        while (auto* w = impl.waiters_.pop_front())
600  
        while (auto* w = impl.waiters_.pop_front())
583  
        {
601  
        {
584  
            w->impl_ = nullptr;
602  
            w->impl_ = nullptr;
585  
            canceled.push_back(w);
603  
            canceled.push_back(w);
586  
        }
604  
        }
587  
        refresh_cached_nearest();
605  
        refresh_cached_nearest();
588  
    }
606  
    }
589  

607  

590  
    impl.might_have_pending_waits_ = false;
608  
    impl.might_have_pending_waits_ = false;
591  

609  

592  
    std::size_t count = 0;
610  
    std::size_t count = 0;
593  
    while (auto* w = canceled.pop_front())
611  
    while (auto* w = canceled.pop_front())
594  
    {
612  
    {
595  
        w->ec_value_ = make_error_code(capy::error::canceled);
613  
        w->ec_value_ = make_error_code(capy::error::canceled);
596  
        sched_->post(&w->op_);
614  
        sched_->post(&w->op_);
597  
        ++count;
615  
        ++count;
598  
    }
616  
    }
599  

617  

600  
    return count;
618  
    return count;
601  
}
619  
}
602  

620  

603  
inline void
621  
inline void
604  
timer_service::cancel_waiter(waiter_node* w)
622  
timer_service::cancel_waiter(waiter_node* w)
605  
{
623  
{
606  
    {
624  
    {
607  
        std::lock_guard lock(mutex_);
625  
        std::lock_guard lock(mutex_);
608  
        // Already removed by cancel_timer or process_expired
626  
        // Already removed by cancel_timer or process_expired
609  
        if (!w->impl_)
627  
        if (!w->impl_)
610  
            return;
628  
            return;
611  
        auto* impl = w->impl_;
629  
        auto* impl = w->impl_;
612  
        w->impl_   = nullptr;
630  
        w->impl_   = nullptr;
613  
        impl->waiters_.remove(w);
631  
        impl->waiters_.remove(w);
614  
        if (impl->waiters_.empty())
632  
        if (impl->waiters_.empty())
615  
        {
633  
        {
616  
            remove_timer_impl(*impl);
634  
            remove_timer_impl(*impl);
617  
            impl->might_have_pending_waits_ = false;
635  
            impl->might_have_pending_waits_ = false;
618  
        }
636  
        }
619  
        refresh_cached_nearest();
637  
        refresh_cached_nearest();
620  
    }
638  
    }
621  

639  

622  
    w->ec_value_ = make_error_code(capy::error::canceled);
640  
    w->ec_value_ = make_error_code(capy::error::canceled);
623  
    sched_->post(&w->op_);
641  
    sched_->post(&w->op_);
624  
}
642  
}
625  

643  

626  
inline std::size_t
644  
inline std::size_t
627  
timer_service::cancel_one_waiter(implementation& impl)
645  
timer_service::cancel_one_waiter(implementation& impl)
628  
{
646  
{
629  
    if (!impl.might_have_pending_waits_)
647  
    if (!impl.might_have_pending_waits_)
630  
        return 0;
648  
        return 0;
631  

649  

632  
    waiter_node* w = nullptr;
650  
    waiter_node* w = nullptr;
633  

651  

634  
    {
652  
    {
635  
        std::lock_guard lock(mutex_);
653  
        std::lock_guard lock(mutex_);
636  
        w = impl.waiters_.pop_front();
654  
        w = impl.waiters_.pop_front();
637  
        if (!w)
655  
        if (!w)
638  
            return 0;
656  
            return 0;
639  
        w->impl_ = nullptr;
657  
        w->impl_ = nullptr;
640  
        if (impl.waiters_.empty())
658  
        if (impl.waiters_.empty())
641  
        {
659  
        {
642  
            remove_timer_impl(impl);
660  
            remove_timer_impl(impl);
643  
            impl.might_have_pending_waits_ = false;
661  
            impl.might_have_pending_waits_ = false;
644  
        }
662  
        }
645  
        refresh_cached_nearest();
663  
        refresh_cached_nearest();
646  
    }
664  
    }
647  

665  

648  
    w->ec_value_ = make_error_code(capy::error::canceled);
666  
    w->ec_value_ = make_error_code(capy::error::canceled);
649  
    sched_->post(&w->op_);
667  
    sched_->post(&w->op_);
650  
    return 1;
668  
    return 1;
651  
}
669  
}
652  

670  

653  
inline std::size_t
671  
inline std::size_t
654  
timer_service::process_expired()
672  
timer_service::process_expired()
655  
{
673  
{
656  
    intrusive_list<waiter_node> expired;
674  
    intrusive_list<waiter_node> expired;
657  

675  

658  
    {
676  
    {
659  
        std::lock_guard lock(mutex_);
677  
        std::lock_guard lock(mutex_);
660  
        auto now = clock_type::now();
678  
        auto now = clock_type::now();
661  

679  

662  
        while (!heap_.empty() && heap_[0].time_ <= now)
680  
        while (!heap_.empty() && heap_[0].time_ <= now)
663  
        {
681  
        {
664  
            implementation* t = heap_[0].timer_;
682  
            implementation* t = heap_[0].timer_;
665  
            remove_timer_impl(*t);
683  
            remove_timer_impl(*t);
666  
            while (auto* w = t->waiters_.pop_front())
684  
            while (auto* w = t->waiters_.pop_front())
667  
            {
685  
            {
668  
                w->impl_     = nullptr;
686  
                w->impl_     = nullptr;
669  
                w->ec_value_ = {};
687  
                w->ec_value_ = {};
670  
                expired.push_back(w);
688  
                expired.push_back(w);
671  
            }
689  
            }
672  
            t->might_have_pending_waits_ = false;
690  
            t->might_have_pending_waits_ = false;
673  
        }
691  
        }
674  

692  

675  
        refresh_cached_nearest();
693  
        refresh_cached_nearest();
676  
    }
694  
    }
677  

695  

678  
    std::size_t count = 0;
696  
    std::size_t count = 0;
679  
    while (auto* w = expired.pop_front())
697  
    while (auto* w = expired.pop_front())
680  
    {
698  
    {
681  
        sched_->post(&w->op_);
699  
        sched_->post(&w->op_);
682  
        ++count;
700  
        ++count;
683  
    }
701  
    }
684  

702  

685  
    return count;
703  
    return count;
686  
}
704  
}
687  

705  

688  
inline void
706  
inline void
689  
timer_service::remove_timer_impl(implementation& impl)
707  
timer_service::remove_timer_impl(implementation& impl)
690  
{
708  
{
691  
    std::size_t index = impl.heap_index_;
709  
    std::size_t index = impl.heap_index_;
692  
    if (index >= heap_.size())
710  
    if (index >= heap_.size())
693  
        return; // Not in heap
711  
        return; // Not in heap
694  

712  

695  
    if (index == heap_.size() - 1)
713  
    if (index == heap_.size() - 1)
696  
    {
714  
    {
697  
        // Last element, just pop
715  
        // Last element, just pop
698  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
716  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
699  
        heap_.pop_back();
717  
        heap_.pop_back();
700  
    }
718  
    }
701  
    else
719  
    else
702  
    {
720  
    {
703  
        // Swap with last and reheapify
721  
        // Swap with last and reheapify
704  
        swap_heap(index, heap_.size() - 1);
722  
        swap_heap(index, heap_.size() - 1);
705  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
723  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
706  
        heap_.pop_back();
724  
        heap_.pop_back();
707  

725  

708  
        if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
726  
        if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
709  
            up_heap(index);
727  
            up_heap(index);
710  
        else
728  
        else
711  
            down_heap(index);
729  
            down_heap(index);
712  
    }
730  
    }
713  
}
731  
}
714  

732  

715  
inline void
733  
inline void
716  
timer_service::up_heap(std::size_t index)
734  
timer_service::up_heap(std::size_t index)
717  
{
735  
{
718  
    while (index > 0)
736  
    while (index > 0)
719  
    {
737  
    {
720  
        std::size_t parent = (index - 1) / 2;
738  
        std::size_t parent = (index - 1) / 2;
721  
        if (!(heap_[index].time_ < heap_[parent].time_))
739  
        if (!(heap_[index].time_ < heap_[parent].time_))
722  
            break;
740  
            break;
723  
        swap_heap(index, parent);
741  
        swap_heap(index, parent);
724  
        index = parent;
742  
        index = parent;
725  
    }
743  
    }
726  
}
744  
}
727  

745  

728  
inline void
746  
inline void
729  
timer_service::down_heap(std::size_t index)
747  
timer_service::down_heap(std::size_t index)
730  
{
748  
{
731  
    std::size_t child = index * 2 + 1;
749  
    std::size_t child = index * 2 + 1;
732  
    while (child < heap_.size())
750  
    while (child < heap_.size())
733  
    {
751  
    {
734  
        std::size_t min_child = (child + 1 == heap_.size() ||
752  
        std::size_t min_child = (child + 1 == heap_.size() ||
735  
                                 heap_[child].time_ < heap_[child + 1].time_)
753  
                                 heap_[child].time_ < heap_[child + 1].time_)
736  
            ? child
754  
            ? child
737  
            : child + 1;
755  
            : child + 1;
738  

756  

739  
        if (heap_[index].time_ < heap_[min_child].time_)
757  
        if (heap_[index].time_ < heap_[min_child].time_)
740  
            break;
758  
            break;
741  

759  

742  
        swap_heap(index, min_child);
760  
        swap_heap(index, min_child);
743  
        index = min_child;
761  
        index = min_child;
744  
        child = index * 2 + 1;
762  
        child = index * 2 + 1;
745  
    }
763  
    }
746  
}
764  
}
747  

765  

748  
inline void
766  
inline void
749  
timer_service::swap_heap(std::size_t i1, std::size_t i2)
767  
timer_service::swap_heap(std::size_t i1, std::size_t i2)
750  
{
768  
{
751  
    heap_entry tmp                = heap_[i1];
769  
    heap_entry tmp                = heap_[i1];
752  
    heap_[i1]                     = heap_[i2];
770  
    heap_[i1]                     = heap_[i2];
753  
    heap_[i2]                     = tmp;
771  
    heap_[i2]                     = tmp;
754  
    heap_[i1].timer_->heap_index_ = i1;
772  
    heap_[i1].timer_->heap_index_ = i1;
755  
    heap_[i2].timer_->heap_index_ = i2;
773  
    heap_[i2].timer_->heap_index_ = i2;
756  
}
774  
}
757  

775  

758  
// waiter_node out-of-class member function definitions
776  
// waiter_node out-of-class member function definitions
759  

777  

760  
inline void
778  
inline void
761  
waiter_node::canceller::operator()() const
779  
waiter_node::canceller::operator()() const
762  
{
780  
{
763  
    waiter_->svc_->cancel_waiter(waiter_);
781  
    waiter_->svc_->cancel_waiter(waiter_);
764  
}
782  
}
765  

783  

766  
inline void
784  
inline void
767  
waiter_node::completion_op::do_complete(
785  
waiter_node::completion_op::do_complete(
768  
    [[maybe_unused]] void* owner,
786  
    [[maybe_unused]] void* owner,
769  
    scheduler_op* base,
787  
    scheduler_op* base,
770  
    std::uint32_t,
788  
    std::uint32_t,
771  
    std::uint32_t)
789  
    std::uint32_t)
772  
{
790  
{
773  
    // owner is always non-null here. The destroy path (owner == nullptr)
791  
    // owner is always non-null here. The destroy path (owner == nullptr)
774  
    // is unreachable because completion_op overrides destroy() directly,
792  
    // is unreachable because completion_op overrides destroy() directly,
775  
    // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
793  
    // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
776  
    BOOST_COROSIO_ASSERT(owner);
794  
    BOOST_COROSIO_ASSERT(owner);
777  
    static_cast<completion_op*>(base)->operator()();
795  
    static_cast<completion_op*>(base)->operator()();
778  
}
796  
}
779  

797  

780  
inline void
798  
inline void
781  
waiter_node::completion_op::operator()()
799  
waiter_node::completion_op::operator()()
782  
{
800  
{
783  
    auto* w = waiter_;
801  
    auto* w = waiter_;
784  
    w->stop_cb_.reset();
802  
    w->stop_cb_.reset();
785  
    if (w->ec_out_)
803  
    if (w->ec_out_)
786  
        *w->ec_out_ = w->ec_value_;
804  
        *w->ec_out_ = w->ec_value_;
787  

805  

788  
    auto h      = w->h_;
806  
    auto h      = w->h_;
789  
    auto d      = w->d_;
807  
    auto d      = w->d_;
790  
    auto* svc   = w->svc_;
808  
    auto* svc   = w->svc_;
791  
    auto& sched = svc->get_scheduler();
809  
    auto& sched = svc->get_scheduler();
792  

810  

793  
    svc->destroy_waiter(w);
811  
    svc->destroy_waiter(w);
794  

812  

795  
    d.post(h);
813  
    d.post(h);
796  
    sched.work_finished();
814  
    sched.work_finished();
797  
}
815  
}
798  

816  

799  
// GCC 14 false-positive: inlining ~optional<stop_callback> through
817  
// GCC 14 false-positive: inlining ~optional<stop_callback> through
800  
// delete loses track that stop_cb_ was already .reset() above.
818  
// delete loses track that stop_cb_ was already .reset() above.
801  
#if defined(__GNUC__) && !defined(__clang__)
819  
#if defined(__GNUC__) && !defined(__clang__)
802  
#pragma GCC diagnostic push
820  
#pragma GCC diagnostic push
803  
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
821  
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
804  
#endif
822  
#endif
805  
inline void
823  
inline void
806  
waiter_node::completion_op::destroy()
824  
waiter_node::completion_op::destroy()
807  
{
825  
{
808  
    // Called during scheduler shutdown drain when this completion_op is
826  
    // Called during scheduler shutdown drain when this completion_op is
809  
    // in the scheduler's ready queue (posted by cancel_timer() or
827  
    // in the scheduler's ready queue (posted by cancel_timer() or
810  
    // process_expired()). Balances the work_started() from
828  
    // process_expired()). Balances the work_started() from
811  
    // implementation::wait(). The scheduler drain loop separately
829  
    // implementation::wait(). The scheduler drain loop separately
812  
    // balances the work_started() from post(). On IOCP both decrements
830  
    // balances the work_started() from post(). On IOCP both decrements
813  
    // are required for outstanding_work_ to reach zero; on other
831  
    // are required for outstanding_work_ to reach zero; on other
814  
    // backends this is harmless.
832  
    // backends this is harmless.
815  
    //
833  
    //
816  
    // This override also prevents scheduler_op::destroy() from calling
834  
    // This override also prevents scheduler_op::destroy() from calling
817  
    // do_complete(nullptr, ...). See also: timer_service::shutdown()
835  
    // do_complete(nullptr, ...). See also: timer_service::shutdown()
818  
    // which drains waiters still in the timer heap (the other path).
836  
    // which drains waiters still in the timer heap (the other path).
819  
    auto* w = waiter_;
837  
    auto* w = waiter_;
820  
    w->stop_cb_.reset();
838  
    w->stop_cb_.reset();
821  
    auto h      = std::exchange(w->h_, {});
839  
    auto h      = std::exchange(w->h_, {});
822  
    auto& sched = w->svc_->get_scheduler();
840  
    auto& sched = w->svc_->get_scheduler();
823  
    delete w;
841  
    delete w;
824  
    sched.work_finished();
842  
    sched.work_finished();
825  
    if (h)
843  
    if (h)
826  
        h.destroy();
844  
        h.destroy();
827  
}
845  
}
828  
#if defined(__GNUC__) && !defined(__clang__)
846  
#if defined(__GNUC__) && !defined(__clang__)
829  
#pragma GCC diagnostic pop
847  
#pragma GCC diagnostic pop
830  
#endif
848  
#endif
831  

849  

832  
inline std::coroutine_handle<>
850  
inline std::coroutine_handle<>
833  
timer_service::implementation::wait(
851  
timer_service::implementation::wait(
834  
    std::coroutine_handle<> h,
852  
    std::coroutine_handle<> h,
835  
    capy::executor_ref d,
853  
    capy::executor_ref d,
836  
    std::stop_token token,
854  
    std::stop_token token,
837  
    std::error_code* ec)
855  
    std::error_code* ec)
838  
{
856  
{
839  
    // Already-expired fast path — no waiter_node, no mutex.
857  
    // Already-expired fast path — no waiter_node, no mutex.
840  
    // Post instead of dispatch so the coroutine yields to the
858  
    // Post instead of dispatch so the coroutine yields to the
841  
    // scheduler, allowing other queued work to run.
859  
    // scheduler, allowing other queued work to run.
842  
    if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
860  
    if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
843  
    {
861  
    {
844  
        if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
862  
        if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
845  
        {
863  
        {
846  
            if (ec)
864  
            if (ec)
847  
                *ec = {};
865  
                *ec = {};
848  
            d.post(h);
866  
            d.post(h);
849  
            return std::noop_coroutine();
867  
            return std::noop_coroutine();
850  
        }
868  
        }
851  
    }
869  
    }
852  

870  

853  
    auto* w    = svc_->create_waiter();
871  
    auto* w    = svc_->create_waiter();
854  
    w->impl_   = this;
872  
    w->impl_   = this;
855  
    w->svc_    = svc_;
873  
    w->svc_    = svc_;
856  
    w->h_      = h;
874  
    w->h_      = h;
857  
    w->d_      = d;
875  
    w->d_      = d;
858  
    w->token_  = std::move(token);
876  
    w->token_  = std::move(token);
859  
    w->ec_out_ = ec;
877  
    w->ec_out_ = ec;
860  

878  

861  
    svc_->insert_waiter(*this, w);
879  
    svc_->insert_waiter(*this, w);
862  
    might_have_pending_waits_ = true;
880  
    might_have_pending_waits_ = true;
863  
    svc_->get_scheduler().work_started();
881  
    svc_->get_scheduler().work_started();
864  

882  

865  
    if (w->token_.stop_possible())
883  
    if (w->token_.stop_possible())
866  
        w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
884  
        w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
867  

885  

868  
    return std::noop_coroutine();
886  
    return std::noop_coroutine();
869  
}
887  
}
870  

888  

871  
// Free functions
889  
// Free functions
872  

890  

873  
struct timer_service_access
891  
struct timer_service_access
874  
{
892  
{
875  
    static native_scheduler& get_scheduler(io_context& ctx) noexcept
893  
    static native_scheduler& get_scheduler(io_context& ctx) noexcept
876  
    {
894  
    {
877  
        return static_cast<native_scheduler&>(*ctx.sched_);
895  
        return static_cast<native_scheduler&>(*ctx.sched_);
878  
    }
896  
    }
879  
};
897  
};
880  

898  

881  
// Bypass find_service() mutex by reading the scheduler's cached pointer
899  
// Bypass find_service() mutex by reading the scheduler's cached pointer
882  
inline io_object::io_service&
900  
inline io_object::io_service&
883  
timer_service_direct(capy::execution_context& ctx) noexcept
901  
timer_service_direct(capy::execution_context& ctx) noexcept
884  
{
902  
{
885  
    return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
903  
    return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
886  
                .timer_svc_;
904  
                .timer_svc_;
887  
}
905  
}
888  

906  

889  
inline std::size_t
907  
inline std::size_t
890  
timer_service_update_expiry(timer::implementation& base)
908  
timer_service_update_expiry(timer::implementation& base)
891  
{
909  
{
892  
    auto& impl = static_cast<timer_service::implementation&>(base);
910  
    auto& impl = static_cast<timer_service::implementation&>(base);
893  
    return impl.svc_->update_timer(impl, impl.expiry_);
911  
    return impl.svc_->update_timer(impl, impl.expiry_);
894  
}
912  
}
895  

913  

896  
inline std::size_t
914  
inline std::size_t
897  
timer_service_cancel(timer::implementation& base) noexcept
915  
timer_service_cancel(timer::implementation& base) noexcept
898  
{
916  
{
899  
    auto& impl = static_cast<timer_service::implementation&>(base);
917  
    auto& impl = static_cast<timer_service::implementation&>(base);
900  
    return impl.svc_->cancel_timer(impl);
918  
    return impl.svc_->cancel_timer(impl);
901  
}
919  
}
902  

920  

903  
inline std::size_t
921  
inline std::size_t
904  
timer_service_cancel_one(timer::implementation& base) noexcept
922  
timer_service_cancel_one(timer::implementation& base) noexcept
905  
{
923  
{
906  
    auto& impl = static_cast<timer_service::implementation&>(base);
924  
    auto& impl = static_cast<timer_service::implementation&>(base);
907  
    return impl.svc_->cancel_one_waiter(impl);
925  
    return impl.svc_->cancel_one_waiter(impl);
908  
}
926  
}
909  

927  

910  
inline timer_service&
928  
inline timer_service&
911  
get_timer_service(capy::execution_context& ctx, scheduler& sched)
929  
get_timer_service(capy::execution_context& ctx, scheduler& sched)
912  
{
930  
{
913  
    return ctx.make_service<timer_service>(sched);
931  
    return ctx.make_service<timer_service>(sched);
914  
}
932  
}
915  

933  

916  
} // namespace boost::corosio::detail
934  
} // namespace boost::corosio::detail
917  

935  

918  
#endif
936  
#endif