TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/corosio
9 : //
10 :
11 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13 :
14 : #include <boost/corosio/timer.hpp>
15 : #include <boost/corosio/io_context.hpp>
16 : #include <boost/corosio/detail/scheduler_op.hpp>
17 : #include <boost/corosio/native/native_scheduler.hpp>
18 : #include <boost/corosio/detail/intrusive.hpp>
19 : #include <boost/corosio/detail/thread_local_ptr.hpp>
20 : #include <boost/capy/error.hpp>
21 : #include <boost/capy/ex/execution_context.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <system_error>
24 :
25 : #include <atomic>
26 : #include <chrono>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <limits>
30 : #include <mutex>
31 : #include <optional>
32 : #include <stop_token>
33 : #include <utility>
34 : #include <vector>
35 :
36 : namespace boost::corosio::detail {
37 :
38 : struct scheduler;
39 :
40 : /*
41 : Timer Service
42 : =============
43 :
44 : Data Structures
45 : ---------------
46 : waiter_node holds per-waiter state: coroutine handle, executor,
47 : error output, stop_token, embedded completion_op. Each concurrent
48 : co_await t.wait() allocates one waiter_node.
49 :
50 : timer_service::implementation holds per-timer state: expiry,
51 : heap index, and an intrusive_list of waiter_nodes. Multiple
52 : coroutines can wait on the same timer simultaneously.
53 :
54 : timer_service owns a min-heap of active timers, a free list
55 : of recycled impls, and a free list of recycled waiter_nodes. The
56 : heap is ordered by expiry time; the scheduler queries
57 : nearest_expiry() to set the epoll/timerfd timeout.
58 :
59 : Optimization Strategy
60 : ---------------------
61 : 1. Deferred heap insertion — expires_after() stores the expiry
62 : but does not insert into the heap. Insertion happens in wait().
63 : 2. Thread-local impl cache — single-slot per-thread cache.
64 : 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65 : 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66 : 5. might_have_pending_waits_ flag — skips lock when no wait issued.
67 : 6. Thread-local waiter cache — single-slot per-thread cache.
68 :
69 : Concurrency
70 : -----------
71 : stop_token callbacks can fire from any thread. The impl_
72 : pointer on waiter_node is used as a "still in list" marker.
73 : */
74 :
75 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76 :
77 : inline void timer_service_invalidate_cache() noexcept;
78 :
79 : // timer_service class body — member function definitions are
80 : // out-of-class (after implementation and waiter_node are complete)
81 : class BOOST_COROSIO_DECL timer_service final
82 : : public capy::execution_context::service
83 : , public io_object::io_service
84 : {
85 : public:
86 : using clock_type = std::chrono::steady_clock;
87 : using time_point = clock_type::time_point;
88 :
89 : /// Type-erased callback for earliest-expiry-changed notifications.
90 : class callback
91 : {
92 : void* ctx_ = nullptr;
93 : void (*fn_)(void*) = nullptr;
94 :
95 : public:
96 : /// Construct an empty callback.
97 HIT 466 : callback() = default;
98 :
99 : /// Construct a callback with the given context and function.
100 466 : callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
101 :
102 : /// Return true if the callback is non-empty.
103 : explicit operator bool() const noexcept
104 : {
105 : return fn_ != nullptr;
106 : }
107 :
108 : /// Invoke the callback.
109 4946 : void operator()() const
110 : {
111 4946 : if (fn_)
112 4946 : fn_(ctx_);
113 4946 : }
114 : };
115 :
116 : struct implementation;
117 :
118 : private:
119 : struct heap_entry
120 : {
121 : time_point time_;
122 : implementation* timer_;
123 : };
124 :
125 : scheduler* sched_ = nullptr;
126 : mutable std::mutex mutex_;
127 : std::vector<heap_entry> heap_;
128 : implementation* free_list_ = nullptr;
129 : waiter_node* waiter_free_list_ = nullptr;
130 : callback on_earliest_changed_;
131 : bool shutting_down_ = false;
132 : // Avoids mutex in nearest_expiry() and empty()
133 : mutable std::atomic<std::int64_t> cached_nearest_ns_{
134 : (std::numeric_limits<std::int64_t>::max)()};
135 :
136 : public:
137 : /// Construct the timer service bound to a scheduler.
138 466 : inline timer_service(capy::execution_context&, scheduler& sched)
139 466 : : sched_(&sched)
140 : {
141 466 : }
142 :
143 : /// Return the associated scheduler.
144 9976 : inline scheduler& get_scheduler() noexcept
145 : {
146 9976 : return *sched_;
147 : }
148 :
149 : /// Destroy the timer service.
150 932 : ~timer_service() override = default;
151 :
152 : timer_service(timer_service const&) = delete;
153 : timer_service& operator=(timer_service const&) = delete;
154 :
155 : /// Register a callback invoked when the earliest expiry changes.
156 466 : inline void set_on_earliest_changed(callback cb)
157 : {
158 466 : on_earliest_changed_ = cb;
159 466 : }
160 :
161 : /// Return true if no timers are in the heap.
162 : inline bool empty() const noexcept
163 : {
164 : return cached_nearest_ns_.load(std::memory_order_acquire) ==
165 : (std::numeric_limits<std::int64_t>::max)();
166 : }
167 :
168 : /// Return the nearest timer expiry without acquiring the mutex.
169 123361 : inline time_point nearest_expiry() const noexcept
170 : {
171 123361 : auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
172 123361 : return time_point(time_point::duration(ns));
173 : }
174 :
175 : /// Cancel all pending timers and free cached resources.
176 : inline void shutdown() override;
177 :
178 : /// Construct a new timer implementation.
179 : inline io_object::implementation* construct() override;
180 :
181 : /// Destroy a timer implementation, cancelling pending waiters.
182 : inline void destroy(io_object::implementation* p) override;
183 :
184 : /// Cancel and recycle a timer implementation.
185 : inline void destroy_impl(implementation& impl);
186 :
187 : /// Create or recycle a waiter node.
188 : inline waiter_node* create_waiter();
189 :
190 : /// Return a waiter node to the cache or free list.
191 : inline void destroy_waiter(waiter_node* w);
192 :
193 : /// Update the timer expiry, cancelling existing waiters.
194 : inline std::size_t update_timer(implementation& impl, time_point new_time);
195 :
196 : /// Insert a waiter into the timer's waiter list and the heap.
197 : inline void insert_waiter(implementation& impl, waiter_node* w);
198 :
199 : /// Cancel all waiters on a timer.
200 : inline std::size_t cancel_timer(implementation& impl);
201 :
202 : /// Cancel a single waiter ( stop_token callback path ).
203 : inline void cancel_waiter(waiter_node* w);
204 :
205 : /// Cancel one waiter on a timer.
206 : inline std::size_t cancel_one_waiter(implementation& impl);
207 :
208 : /// Complete all waiters whose timers have expired.
209 : inline std::size_t process_expired();
210 :
211 : private:
212 147442 : inline void refresh_cached_nearest() noexcept
213 : {
214 147442 : auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
215 146953 : : heap_[0].time_.time_since_epoch().count();
216 147442 : cached_nearest_ns_.store(ns, std::memory_order_release);
217 147442 : }
218 :
219 : inline void remove_timer_impl(implementation& impl);
220 : inline void up_heap(std::size_t index);
221 : inline void down_heap(std::size_t index);
222 : inline void swap_heap(std::size_t i1, std::size_t i2);
223 : };
224 :
225 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
226 : : intrusive_list<waiter_node>::node
227 : {
228 : // Embedded completion op — avoids heap allocation per fire/cancel
229 : struct completion_op final : scheduler_op
230 : {
231 : waiter_node* waiter_ = nullptr;
232 :
233 : static void do_complete(
234 : void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
235 :
236 210 : completion_op() noexcept : scheduler_op(&do_complete) {}
237 :
238 : void operator()() override;
239 : void destroy() override;
240 : };
241 :
242 : // Per-waiter stop_token cancellation
243 : struct canceller
244 : {
245 : waiter_node* waiter_;
246 : void operator()() const;
247 : };
248 :
249 : // nullptr once removed from timer's waiter list (concurrency marker)
250 : timer_service::implementation* impl_ = nullptr;
251 : timer_service* svc_ = nullptr;
252 : std::coroutine_handle<> h_;
253 : capy::executor_ref d_;
254 : std::error_code* ec_out_ = nullptr;
255 : std::stop_token token_;
256 : std::optional<std::stop_callback<canceller>> stop_cb_;
257 : completion_op op_;
258 : std::error_code ec_value_;
259 : waiter_node* next_free_ = nullptr;
260 :
261 210 : waiter_node() noexcept
262 210 : {
263 210 : op_.waiter_ = this;
264 210 : }
265 : };
266 :
267 : struct timer_service::implementation final : timer::implementation
268 : {
269 : using clock_type = std::chrono::steady_clock;
270 : using time_point = clock_type::time_point;
271 : using duration = clock_type::duration;
272 :
273 : timer_service* svc_ = nullptr;
274 : intrusive_list<waiter_node> waiters_;
275 :
276 : // Free list linkage (reused when impl is on free_list)
277 : implementation* next_free_ = nullptr;
278 :
279 : inline explicit implementation(timer_service& svc) noexcept;
280 :
281 : inline std::coroutine_handle<> wait(
282 : std::coroutine_handle<>,
283 : capy::executor_ref,
284 : std::stop_token,
285 : std::error_code*) override;
286 : };
287 :
288 : // Thread-local caches avoid hot-path mutex acquisitions:
289 : // 1. Impl cache — single-slot, validated by comparing svc_
290 : // 2. Waiter cache — single-slot, no service affinity
291 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
292 :
293 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
294 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
295 :
296 : inline timer_service::implementation*
297 5192 : try_pop_tl_cache(timer_service* svc) noexcept
298 : {
299 5192 : auto* impl = tl_cached_impl.get();
300 5192 : if (impl)
301 : {
302 4943 : tl_cached_impl.set(nullptr);
303 4943 : if (impl->svc_ == svc)
304 4943 : return impl;
305 : // Stale impl from a destroyed service
306 MIS 0 : delete impl;
307 : }
308 HIT 249 : return nullptr;
309 : }
310 :
311 : inline bool
312 5184 : try_push_tl_cache(timer_service::implementation* impl) noexcept
313 : {
314 5184 : if (!tl_cached_impl.get())
315 : {
316 5104 : tl_cached_impl.set(impl);
317 5104 : return true;
318 : }
319 80 : return false;
320 : }
321 :
322 : inline waiter_node*
323 4992 : try_pop_waiter_tl_cache() noexcept
324 : {
325 4992 : auto* w = tl_cached_waiter.get();
326 4992 : if (w)
327 : {
328 4780 : tl_cached_waiter.set(nullptr);
329 4780 : return w;
330 : }
331 212 : return nullptr;
332 : }
333 :
334 : inline bool
335 4976 : try_push_waiter_tl_cache(waiter_node* w) noexcept
336 : {
337 4976 : if (!tl_cached_waiter.get())
338 : {
339 4896 : tl_cached_waiter.set(w);
340 4896 : return true;
341 : }
342 80 : return false;
343 : }
344 :
345 : inline void
346 466 : timer_service_invalidate_cache() noexcept
347 : {
348 466 : delete tl_cached_impl.get();
349 466 : tl_cached_impl.set(nullptr);
350 :
351 466 : delete tl_cached_waiter.get();
352 466 : tl_cached_waiter.set(nullptr);
353 466 : }
354 :
355 : // timer_service out-of-class member function definitions
356 :
357 249 : inline timer_service::implementation::implementation(
358 249 : timer_service& svc) noexcept
359 249 : : svc_(&svc)
360 : {
361 249 : }
362 :
363 : inline void
364 466 : timer_service::shutdown()
365 : {
366 466 : timer_service_invalidate_cache();
367 466 : shutting_down_ = true;
368 :
369 : // Snapshot impls and detach them from the heap so that
370 : // coroutine-owned timer destructors (triggered by h.destroy()
371 : // below) cannot re-enter remove_timer_impl() and mutate the
372 : // vector during iteration.
373 466 : std::vector<implementation*> impls;
374 466 : impls.reserve(heap_.size());
375 474 : for (auto& entry : heap_)
376 : {
377 8 : entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
378 8 : impls.push_back(entry.timer_);
379 : }
380 466 : heap_.clear();
381 466 : cached_nearest_ns_.store(
382 : (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
383 :
384 : // Cancel waiting timers. Each waiter called work_started()
385 : // in implementation::wait(). On IOCP the scheduler shutdown
386 : // loop exits when outstanding_work_ reaches zero, so we must
387 : // call work_finished() here to balance it. On other backends
388 : // this is harmless.
389 474 : for (auto* impl : impls)
390 : {
391 16 : while (auto* w = impl->waiters_.pop_front())
392 : {
393 8 : w->stop_cb_.reset();
394 8 : auto h = std::exchange(w->h_, {});
395 8 : sched_->work_finished();
396 8 : if (h)
397 8 : h.destroy();
398 8 : delete w;
399 8 : }
400 8 : delete impl;
401 : }
402 :
403 : // Delete free-listed impls
404 546 : while (free_list_)
405 : {
406 80 : auto* next = free_list_->next_free_;
407 80 : delete free_list_;
408 80 : free_list_ = next;
409 : }
410 :
411 : // Delete free-listed waiters
412 544 : while (waiter_free_list_)
413 : {
414 78 : auto* next = waiter_free_list_->next_free_;
415 78 : delete waiter_free_list_;
416 78 : waiter_free_list_ = next;
417 : }
418 466 : }
419 :
420 : inline io_object::implementation*
421 5192 : timer_service::construct()
422 : {
423 5192 : implementation* impl = try_pop_tl_cache(this);
424 5192 : if (impl)
425 : {
426 4943 : impl->svc_ = this;
427 4943 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
428 4943 : impl->might_have_pending_waits_ = false;
429 4943 : return impl;
430 : }
431 :
432 249 : std::lock_guard lock(mutex_);
433 249 : if (free_list_)
434 : {
435 MIS 0 : impl = free_list_;
436 0 : free_list_ = impl->next_free_;
437 0 : impl->next_free_ = nullptr;
438 0 : impl->svc_ = this;
439 0 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
440 0 : impl->might_have_pending_waits_ = false;
441 : }
442 : else
443 : {
444 HIT 249 : impl = new implementation(*this);
445 : }
446 249 : return impl;
447 249 : }
448 :
449 : inline void
450 5190 : timer_service::destroy(io_object::implementation* p)
451 : {
452 5190 : destroy_impl(static_cast<implementation&>(*p));
453 5190 : }
454 :
455 : inline void
456 5190 : timer_service::destroy_impl(implementation& impl)
457 : {
458 : // During shutdown the impl is owned by the shutdown loop.
459 : // Re-entering here (from a coroutine-owned timer destructor
460 : // triggered by h.destroy()) must not modify the heap or
461 : // recycle the impl — shutdown deletes it directly.
462 5190 : if (shutting_down_)
463 5110 : return;
464 :
465 5184 : cancel_timer(impl);
466 :
467 5184 : if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
468 : {
469 MIS 0 : std::lock_guard lock(mutex_);
470 0 : remove_timer_impl(impl);
471 0 : refresh_cached_nearest();
472 0 : }
473 :
474 HIT 5184 : if (try_push_tl_cache(&impl))
475 5104 : return;
476 :
477 80 : std::lock_guard lock(mutex_);
478 80 : impl.next_free_ = free_list_;
479 80 : free_list_ = &impl;
480 80 : }
481 :
482 : inline waiter_node*
483 4992 : timer_service::create_waiter()
484 : {
485 4992 : if (auto* w = try_pop_waiter_tl_cache())
486 4780 : return w;
487 :
488 212 : std::lock_guard lock(mutex_);
489 212 : if (waiter_free_list_)
490 : {
491 2 : auto* w = waiter_free_list_;
492 2 : waiter_free_list_ = w->next_free_;
493 2 : w->next_free_ = nullptr;
494 2 : return w;
495 : }
496 :
497 210 : return new waiter_node();
498 212 : }
499 :
500 : inline void
501 4976 : timer_service::destroy_waiter(waiter_node* w)
502 : {
503 4976 : if (try_push_waiter_tl_cache(w))
504 4896 : return;
505 :
506 80 : std::lock_guard lock(mutex_);
507 80 : w->next_free_ = waiter_free_list_;
508 80 : waiter_free_list_ = w;
509 80 : }
510 :
511 : inline std::size_t
512 6 : timer_service::update_timer(implementation& impl, time_point new_time)
513 : {
514 : bool in_heap =
515 6 : (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
516 6 : if (!in_heap && impl.waiters_.empty())
517 MIS 0 : return 0;
518 :
519 HIT 6 : bool notify = false;
520 6 : intrusive_list<waiter_node> canceled;
521 :
522 : {
523 6 : std::lock_guard lock(mutex_);
524 :
525 16 : while (auto* w = impl.waiters_.pop_front())
526 : {
527 10 : w->impl_ = nullptr;
528 10 : canceled.push_back(w);
529 10 : }
530 :
531 6 : if (impl.heap_index_ < heap_.size())
532 : {
533 6 : time_point old_time = heap_[impl.heap_index_].time_;
534 6 : heap_[impl.heap_index_].time_ = new_time;
535 :
536 6 : if (new_time < old_time)
537 6 : up_heap(impl.heap_index_);
538 : else
539 MIS 0 : down_heap(impl.heap_index_);
540 :
541 HIT 6 : notify = (impl.heap_index_ == 0);
542 : }
543 :
544 6 : refresh_cached_nearest();
545 6 : }
546 :
547 6 : std::size_t count = 0;
548 16 : while (auto* w = canceled.pop_front())
549 : {
550 10 : w->ec_value_ = make_error_code(capy::error::canceled);
551 10 : sched_->post(&w->op_);
552 10 : ++count;
553 10 : }
554 :
555 6 : if (notify)
556 6 : on_earliest_changed_();
557 :
558 6 : return count;
559 : }
560 :
561 : inline void
562 4992 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
563 : {
564 4992 : bool notify = false;
565 : {
566 4992 : std::lock_guard lock(mutex_);
567 4992 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
568 : {
569 4970 : impl.heap_index_ = heap_.size();
570 4970 : heap_.push_back({impl.expiry_, &impl});
571 4970 : up_heap(heap_.size() - 1);
572 4970 : notify = (impl.heap_index_ == 0);
573 4970 : refresh_cached_nearest();
574 : }
575 4992 : impl.waiters_.push_back(w);
576 4992 : }
577 4992 : if (notify)
578 4940 : on_earliest_changed_();
579 4992 : }
580 :
581 : inline std::size_t
582 5192 : timer_service::cancel_timer(implementation& impl)
583 : {
584 5192 : if (!impl.might_have_pending_waits_)
585 5168 : return 0;
586 :
587 : // Not in heap and no waiters — just clear the flag
588 24 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
589 MIS 0 : impl.waiters_.empty())
590 : {
591 0 : impl.might_have_pending_waits_ = false;
592 0 : return 0;
593 : }
594 :
595 HIT 24 : intrusive_list<waiter_node> canceled;
596 :
597 : {
598 24 : std::lock_guard lock(mutex_);
599 24 : remove_timer_impl(impl);
600 52 : while (auto* w = impl.waiters_.pop_front())
601 : {
602 28 : w->impl_ = nullptr;
603 28 : canceled.push_back(w);
604 28 : }
605 24 : refresh_cached_nearest();
606 24 : }
607 :
608 24 : impl.might_have_pending_waits_ = false;
609 :
610 24 : std::size_t count = 0;
611 52 : while (auto* w = canceled.pop_front())
612 : {
613 28 : w->ec_value_ = make_error_code(capy::error::canceled);
614 28 : sched_->post(&w->op_);
615 28 : ++count;
616 28 : }
617 :
618 24 : return count;
619 : }
620 :
621 : inline void
622 30 : timer_service::cancel_waiter(waiter_node* w)
623 : {
624 : {
625 30 : std::lock_guard lock(mutex_);
626 : // Already removed by cancel_timer or process_expired
627 30 : if (!w->impl_)
628 MIS 0 : return;
629 HIT 30 : auto* impl = w->impl_;
630 30 : w->impl_ = nullptr;
631 30 : impl->waiters_.remove(w);
632 30 : if (impl->waiters_.empty())
633 : {
634 28 : remove_timer_impl(*impl);
635 28 : impl->might_have_pending_waits_ = false;
636 : }
637 30 : refresh_cached_nearest();
638 30 : }
639 :
640 30 : w->ec_value_ = make_error_code(capy::error::canceled);
641 30 : sched_->post(&w->op_);
642 : }
643 :
644 : inline std::size_t
645 2 : timer_service::cancel_one_waiter(implementation& impl)
646 : {
647 2 : if (!impl.might_have_pending_waits_)
648 MIS 0 : return 0;
649 :
650 HIT 2 : waiter_node* w = nullptr;
651 :
652 : {
653 2 : std::lock_guard lock(mutex_);
654 2 : w = impl.waiters_.pop_front();
655 2 : if (!w)
656 MIS 0 : return 0;
657 HIT 2 : w->impl_ = nullptr;
658 2 : if (impl.waiters_.empty())
659 : {
660 MIS 0 : remove_timer_impl(impl);
661 0 : impl.might_have_pending_waits_ = false;
662 : }
663 HIT 2 : refresh_cached_nearest();
664 2 : }
665 :
666 2 : w->ec_value_ = make_error_code(capy::error::canceled);
667 2 : sched_->post(&w->op_);
668 2 : return 1;
669 : }
670 :
671 : inline std::size_t
672 142410 : timer_service::process_expired()
673 : {
674 142410 : intrusive_list<waiter_node> expired;
675 :
676 : {
677 142410 : std::lock_guard lock(mutex_);
678 142410 : auto now = clock_type::now();
679 :
680 147320 : while (!heap_.empty() && heap_[0].time_ <= now)
681 : {
682 4910 : implementation* t = heap_[0].timer_;
683 4910 : remove_timer_impl(*t);
684 9824 : while (auto* w = t->waiters_.pop_front())
685 : {
686 4914 : w->impl_ = nullptr;
687 4914 : w->ec_value_ = {};
688 4914 : expired.push_back(w);
689 4914 : }
690 4910 : t->might_have_pending_waits_ = false;
691 : }
692 :
693 142410 : refresh_cached_nearest();
694 142410 : }
695 :
696 142410 : std::size_t count = 0;
697 147324 : while (auto* w = expired.pop_front())
698 : {
699 4914 : sched_->post(&w->op_);
700 4914 : ++count;
701 4914 : }
702 :
703 142410 : return count;
704 : }
705 :
706 : inline void
707 4962 : timer_service::remove_timer_impl(implementation& impl)
708 : {
709 4962 : std::size_t index = impl.heap_index_;
710 4962 : if (index >= heap_.size())
711 MIS 0 : return; // Not in heap
712 :
713 HIT 4962 : if (index == heap_.size() - 1)
714 : {
715 : // Last element, just pop
716 153 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
717 153 : heap_.pop_back();
718 : }
719 : else
720 : {
721 : // Swap with last and reheapify
722 4809 : swap_heap(index, heap_.size() - 1);
723 4809 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
724 4809 : heap_.pop_back();
725 :
726 4809 : if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
727 MIS 0 : up_heap(index);
728 : else
729 HIT 4809 : down_heap(index);
730 : }
731 : }
732 :
733 : inline void
734 4976 : timer_service::up_heap(std::size_t index)
735 : {
736 9765 : while (index > 0)
737 : {
738 4819 : std::size_t parent = (index - 1) / 2;
739 4819 : if (!(heap_[index].time_ < heap_[parent].time_))
740 30 : break;
741 4789 : swap_heap(index, parent);
742 4789 : index = parent;
743 : }
744 4976 : }
745 :
746 : inline void
747 4809 : timer_service::down_heap(std::size_t index)
748 : {
749 4809 : std::size_t child = index * 2 + 1;
750 4809 : while (child < heap_.size())
751 : {
752 6 : std::size_t min_child = (child + 1 == heap_.size() ||
753 MIS 0 : heap_[child].time_ < heap_[child + 1].time_)
754 HIT 6 : ? child
755 6 : : child + 1;
756 :
757 6 : if (heap_[index].time_ < heap_[min_child].time_)
758 6 : break;
759 :
760 MIS 0 : swap_heap(index, min_child);
761 0 : index = min_child;
762 0 : child = index * 2 + 1;
763 : }
764 HIT 4809 : }
765 :
766 : inline void
767 9598 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
768 : {
769 9598 : heap_entry tmp = heap_[i1];
770 9598 : heap_[i1] = heap_[i2];
771 9598 : heap_[i2] = tmp;
772 9598 : heap_[i1].timer_->heap_index_ = i1;
773 9598 : heap_[i2].timer_->heap_index_ = i2;
774 9598 : }
775 :
776 : // waiter_node out-of-class member function definitions
777 :
778 : inline void
779 30 : waiter_node::canceller::operator()() const
780 : {
781 30 : waiter_->svc_->cancel_waiter(waiter_);
782 30 : }
783 :
784 : inline void
785 MIS 0 : waiter_node::completion_op::do_complete(
786 : [[maybe_unused]] void* owner,
787 : scheduler_op* base,
788 : std::uint32_t,
789 : std::uint32_t)
790 : {
791 : // owner is always non-null here. The destroy path (owner == nullptr)
792 : // is unreachable because completion_op overrides destroy() directly,
793 : // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
794 0 : BOOST_COROSIO_ASSERT(owner);
795 0 : static_cast<completion_op*>(base)->operator()();
796 0 : }
797 :
798 : inline void
799 HIT 4976 : waiter_node::completion_op::operator()()
800 : {
801 4976 : auto* w = waiter_;
802 4976 : w->stop_cb_.reset();
803 4976 : if (w->ec_out_)
804 4976 : *w->ec_out_ = w->ec_value_;
805 :
806 4976 : auto h = w->h_;
807 4976 : auto d = w->d_;
808 4976 : auto* svc = w->svc_;
809 4976 : auto& sched = svc->get_scheduler();
810 :
811 4976 : svc->destroy_waiter(w);
812 :
813 4976 : d.post(h);
814 4976 : sched.work_finished();
815 4976 : }
816 :
817 : // GCC 14 false-positive: inlining ~optional<stop_callback> through
818 : // delete loses track that stop_cb_ was already .reset() above.
819 : #if defined(__GNUC__) && !defined(__clang__)
820 : #pragma GCC diagnostic push
821 : #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
822 : #endif
823 : inline void
824 8 : waiter_node::completion_op::destroy()
825 : {
826 : // Called during scheduler shutdown drain when this completion_op is
827 : // in the scheduler's ready queue (posted by cancel_timer() or
828 : // process_expired()). Balances the work_started() from
829 : // implementation::wait(). The scheduler drain loop separately
830 : // balances the work_started() from post(). On IOCP both decrements
831 : // are required for outstanding_work_ to reach zero; on other
832 : // backends this is harmless.
833 : //
834 : // This override also prevents scheduler_op::destroy() from calling
835 : // do_complete(nullptr, ...). See also: timer_service::shutdown()
836 : // which drains waiters still in the timer heap (the other path).
837 8 : auto* w = waiter_;
838 8 : w->stop_cb_.reset();
839 8 : auto h = std::exchange(w->h_, {});
840 8 : auto& sched = w->svc_->get_scheduler();
841 8 : delete w;
842 8 : sched.work_finished();
843 8 : if (h)
844 8 : h.destroy();
845 8 : }
846 : #if defined(__GNUC__) && !defined(__clang__)
847 : #pragma GCC diagnostic pop
848 : #endif
849 :
850 : inline std::coroutine_handle<>
851 4992 : timer_service::implementation::wait(
852 : std::coroutine_handle<> h,
853 : capy::executor_ref d,
854 : std::stop_token token,
855 : std::error_code* ec)
856 : {
857 : // Already-expired fast path — no waiter_node, no mutex.
858 : // Post instead of dispatch so the coroutine yields to the
859 : // scheduler, allowing other queued work to run.
860 4992 : if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
861 : {
862 4970 : if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
863 : {
864 MIS 0 : if (ec)
865 0 : *ec = {};
866 0 : d.post(h);
867 0 : return std::noop_coroutine();
868 : }
869 : }
870 :
871 HIT 4992 : auto* w = svc_->create_waiter();
872 4992 : w->impl_ = this;
873 4992 : w->svc_ = svc_;
874 4992 : w->h_ = h;
875 4992 : w->d_ = d;
876 4992 : w->token_ = std::move(token);
877 4992 : w->ec_out_ = ec;
878 :
879 4992 : svc_->insert_waiter(*this, w);
880 4992 : might_have_pending_waits_ = true;
881 4992 : svc_->get_scheduler().work_started();
882 :
883 4992 : if (w->token_.stop_possible())
884 48 : w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
885 :
886 4992 : return std::noop_coroutine();
887 : }
888 :
889 : // Free functions
890 :
891 : struct timer_service_access
892 : {
893 5192 : static native_scheduler& get_scheduler(io_context& ctx) noexcept
894 : {
895 5192 : return static_cast<native_scheduler&>(*ctx.sched_);
896 : }
897 : };
898 :
899 : // Bypass find_service() mutex by reading the scheduler's cached pointer
900 : inline io_object::io_service&
901 5192 : timer_service_direct(capy::execution_context& ctx) noexcept
902 : {
903 5192 : return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
904 5192 : .timer_svc_;
905 : }
906 :
907 : inline std::size_t
908 6 : timer_service_update_expiry(timer::implementation& base)
909 : {
910 6 : auto& impl = static_cast<timer_service::implementation&>(base);
911 6 : return impl.svc_->update_timer(impl, impl.expiry_);
912 : }
913 :
914 : inline std::size_t
915 8 : timer_service_cancel(timer::implementation& base) noexcept
916 : {
917 8 : auto& impl = static_cast<timer_service::implementation&>(base);
918 8 : return impl.svc_->cancel_timer(impl);
919 : }
920 :
921 : inline std::size_t
922 2 : timer_service_cancel_one(timer::implementation& base) noexcept
923 : {
924 2 : auto& impl = static_cast<timer_service::implementation&>(base);
925 2 : return impl.svc_->cancel_one_waiter(impl);
926 : }
927 :
928 : inline timer_service&
929 466 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
930 : {
931 466 : return ctx.make_service<timer_service>(sched);
932 : }
933 :
934 : } // namespace boost::corosio::detail
935 :
936 : #endif
|