include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

75.9% Lines (233/307) 81.0% List of functions (34/42)
f(x) Functions (42)
Function Calls Lines Blocks
boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler_base const*, boost::corosio::detail::reactor_scheduler_context*) :65 394x 100.0% 100.0% boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler_base const*) :82 793883x 100.0% 86.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :94 0 0.0% 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :111 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::reactor_scheduler_base() :237 466x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::task_op::operator()() :275 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::task_op::destroy() :276 0 0.0% 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler_base const*) :321 394x 100.0% 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :329 394x 66.7% 80.0% boost::corosio::detail::reactor_scheduler_base::reset_inline_budget() const :341 97876x 54.5% 45.0% boost::corosio::detail::reactor_scheduler_base::try_consume_inline_budget() const :362 442601x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const :376 7187x 100.0% 84.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :382 7187x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :383 14374x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :385 7178x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :394 9x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::post(boost::corosio::detail::scheduler_op*) const :419 93839x 100.0% 87.0% boost::corosio::detail::reactor_scheduler_base::running_in_this_thread() const :436 1159x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::stop() :442 392x 100.0% 78.0% boost::corosio::detail::reactor_scheduler_base::stopped() const :454 21x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::restart() :461 91x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::run() :468 365x 100.0% 76.0% boost::corosio::detail::reactor_scheduler_base::run_one() :493 2x 75.0% 64.0% boost::corosio::detail::reactor_scheduler_base::wait_one(long) :507 61x 100.0% 70.0% boost::corosio::detail::reactor_scheduler_base::poll() :521 6x 100.0% 76.0% boost::corosio::detail::reactor_scheduler_base::poll_one() :546 4x 100.0% 70.0% boost::corosio::detail::reactor_scheduler_base::work_started() :560 15551x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::work_finished() :566 22483x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::compensating_work_started() const :573 151221x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :581 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :594 9069x 30.0% 35.0% boost::corosio::detail::reactor_scheduler_base::shutdown_drain() :611 466x 100.0% 88.0% boost::corosio::detail::reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const :628 817x 100.0% 100.0% boost::corosio::detail::reactor_scheduler_base::maybe_unlock_and_signal_one(std::unique_lock<std::mutex>&) const :635 2016x 57.1% 50.0% boost::corosio::detail::reactor_scheduler_base::unlock_and_signal_one(std::unique_lock<std::mutex>&) const :649 307195x 85.7% 80.0% boost::corosio::detail::reactor_scheduler_base::clear_signal() const :661 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal(std::unique_lock<std::mutex>&) const :667 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal_for(std::unique_lock<std::mutex>&, long) const :679 0 0.0% 0.0% boost::corosio::detail::reactor_scheduler_base::wake_one_thread_and_unlock(std::unique_lock<std::mutex>&) const :691 2016x 87.5% 92.0% boost::corosio::detail::reactor_scheduler_base::work_cleanup::~work_cleanup() :709 261299x 92.3% 92.0% boost::corosio::detail::reactor_scheduler_base::task_cleanup::~task_cleanup() :733 171443x 83.3% 86.0% boost::corosio::detail::reactor_scheduler_base::do_one(std::unique_lock<std::mutex>&, long, boost::corosio::detail::reactor_scheduler_context*) :754 261638x 66.7% 54.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/native/native_scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <condition_variable>
23 #include <coroutine>
24 #include <cstddef>
25 #include <cstdint>
26 #include <limits>
27 #include <memory>
28 #include <mutex>
29
30 namespace boost::corosio::detail {
31
32 // Forward declaration
33 class reactor_scheduler_base;
34
35 /** Per-thread state for a reactor scheduler.
36
37 Each thread running a scheduler's event loop has one of these
38 on a thread-local stack. It holds a private work queue and
39 inline completion budget for speculative I/O fast paths.
40 */
41 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
42 {
43 /// Scheduler this context belongs to.
44 reactor_scheduler_base const* key;
45
46 /// Next context frame on this thread's stack.
47 reactor_scheduler_context* next;
48
49 /// Private work queue for reduced contention.
50 op_queue private_queue;
51
52 /// Unflushed work count for the private queue.
53 std::int64_t private_outstanding_work;
54
55 /// Remaining inline completions allowed this cycle.
56 int inline_budget;
57
58 /// Maximum inline budget (adaptive, 2-16).
59 int inline_budget_max;
60
61 /// True if no other thread absorbed queued work last cycle.
62 bool unassisted;
63
64 /// Construct a context frame linked to @a n.
65 394x reactor_scheduler_context(
66 reactor_scheduler_base const* k, reactor_scheduler_context* n)
67 394x : key(k)
68 394x , next(n)
69 394x , private_outstanding_work(0)
70 394x , inline_budget(0)
71 394x , inline_budget_max(2)
72 394x , unassisted(false)
73 {
74 394x }
75 };
76
77 /// Thread-local context stack for reactor schedulers.
78 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
79
80 /// Find the context frame for a scheduler on this thread.
81 inline reactor_scheduler_context*
82 793883x reactor_find_context(reactor_scheduler_base const* self) noexcept
83 {
84 793883x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
85 {
86 791307x if (c->key == self)
87 791307x return c;
88 }
89 2576x return nullptr;
90 }
91
92 /// Flush private work count to global counter.
93 inline void
94 reactor_flush_private_work(
95 reactor_scheduler_context* ctx,
96 std::atomic<std::int64_t>& outstanding_work) noexcept
97 {
98 if (ctx && ctx->private_outstanding_work > 0)
99 {
100 outstanding_work.fetch_add(
101 ctx->private_outstanding_work, std::memory_order_relaxed);
102 ctx->private_outstanding_work = 0;
103 }
104 }
105
106 /** Drain private queue to global queue, flushing work count first.
107
108 @return True if any ops were drained.
109 */
110 inline bool
111 reactor_drain_private_queue(
112 reactor_scheduler_context* ctx,
113 std::atomic<std::int64_t>& outstanding_work,
114 op_queue& completed_ops) noexcept
115 {
116 if (!ctx || ctx->private_queue.empty())
117 return false;
118
119 reactor_flush_private_work(ctx, outstanding_work);
120 completed_ops.splice(ctx->private_queue);
121 return true;
122 }
123
124 /** Non-template base for reactor-backed scheduler implementations.
125
126 Provides the complete threading model shared by epoll, kqueue,
127 and select schedulers: signal state machine, inline completion
128 budget, work counting, run/poll methods, and the do_one event
129 loop.
130
131 Derived classes provide platform-specific hooks by overriding:
132 - `run_task(lock, ctx)` to run the reactor poll
133 - `interrupt_reactor()` to wake a blocked reactor
134
135 De-templated from the original CRTP design to eliminate
136 duplicate instantiations when multiple backends are compiled
137 into the same binary. Virtual dispatch for run_task (called
138 once per reactor cycle, before a blocking syscall) has
139 negligible overhead.
140
141 @par Thread Safety
142 All public member functions are thread-safe.
143 */
144 class reactor_scheduler_base
145 : public native_scheduler
146 , public capy::execution_context::service
147 {
148 public:
149 using key_type = scheduler;
150 using context_type = reactor_scheduler_context;
151
152 /// Post a coroutine for deferred execution.
153 void post(std::coroutine_handle<> h) const override;
154
155 /// Post a scheduler operation for deferred execution.
156 void post(scheduler_op* h) const override;
157
158 /// Return true if called from a thread running this scheduler.
159 bool running_in_this_thread() const noexcept override;
160
161 /// Request the scheduler to stop dispatching handlers.
162 void stop() override;
163
164 /// Return true if the scheduler has been stopped.
165 bool stopped() const noexcept override;
166
167 /// Reset the stopped state so `run()` can resume.
168 void restart() override;
169
170 /// Run the event loop until no work remains.
171 std::size_t run() override;
172
173 /// Run until one handler completes or no work remains.
174 std::size_t run_one() override;
175
176 /// Run until one handler completes or @a usec elapses.
177 std::size_t wait_one(long usec) override;
178
179 /// Run ready handlers without blocking.
180 std::size_t poll() override;
181
182 /// Run at most one ready handler without blocking.
183 std::size_t poll_one() override;
184
185 /// Increment the outstanding work count.
186 void work_started() noexcept override;
187
188 /// Decrement the outstanding work count, stopping on zero.
189 void work_finished() noexcept override;
190
191 /** Reset the thread's inline completion budget.
192
193 Called at the start of each posted completion handler to
194 grant a fresh budget for speculative inline completions.
195 */
196 void reset_inline_budget() const noexcept;
197
198 /** Consume one unit of inline budget if available.
199
200 @return True if budget was available and consumed.
201 */
202 bool try_consume_inline_budget() const noexcept;
203
204 /** Offset a forthcoming work_finished from work_cleanup.
205
206 Called by descriptor_state when all I/O returned EAGAIN and
207 no handler will be executed. Must be called from a scheduler
208 thread.
209 */
210 void compensating_work_started() const noexcept;
211
212 /** Drain work from thread context's private queue to global queue.
213
214 Flushes private work count to the global counter, then
215 transfers the queue under mutex protection.
216
217 @param queue The private queue to drain.
218 @param count Private work count to flush before draining.
219 */
220 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
221
222 /** Post completed operations for deferred invocation.
223
224 If called from a thread running this scheduler, operations
225 go to the thread's private queue (fast path). Otherwise,
226 operations are added to the global queue under mutex and a
227 waiter is signaled.
228
229 @par Preconditions
230 work_started() must have been called for each operation.
231
232 @param ops Queue of operations to post.
233 */
234 void post_deferred_completions(op_queue& ops) const;
235
236 protected:
237 466x reactor_scheduler_base() = default;
238
239 /** Drain completed_ops during shutdown.
240
241 Pops all operations from the global queue and destroys them,
242 skipping the task sentinel. Signals all waiting threads.
243 Derived classes call this from their shutdown() override
244 before performing platform-specific cleanup.
245 */
246 void shutdown_drain();
247
248 /// RAII guard that re-inserts the task sentinel after `run_task`.
249 struct task_cleanup
250 {
251 reactor_scheduler_base const* sched;
252 std::unique_lock<std::mutex>* lock;
253 context_type* ctx;
254 ~task_cleanup();
255 };
256
257 mutable std::mutex mutex_;
258 mutable std::condition_variable cond_;
259 mutable op_queue completed_ops_;
260 mutable std::atomic<std::int64_t> outstanding_work_{0};
261 bool stopped_ = false;
262 mutable std::atomic<bool> task_running_{false};
263 mutable bool task_interrupted_ = false;
264
265 /// Bit 0 of `state_`: set when the condvar should be signaled.
266 static constexpr std::size_t signaled_bit = 1;
267
268 /// Increment per waiting thread in `state_`.
269 static constexpr std::size_t waiter_increment = 2;
270 mutable std::size_t state_ = 0;
271
272 /// Sentinel op that triggers a reactor poll when dequeued.
273 struct task_op final : scheduler_op
274 {
275 void operator()() override {}
276 void destroy() override {}
277 };
278 task_op task_op_;
279
280 /// Run the platform-specific reactor poll.
281 virtual void
282 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) = 0;
283
284 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
285 virtual void interrupt_reactor() const = 0;
286
287 private:
288 struct work_cleanup
289 {
290 reactor_scheduler_base* sched;
291 std::unique_lock<std::mutex>* lock;
292 context_type* ctx;
293 ~work_cleanup();
294 };
295
296 std::size_t do_one(
297 std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx);
298
299 void signal_all(std::unique_lock<std::mutex>& lock) const;
300 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
301 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
302 void clear_signal() const;
303 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
304 void wait_for_signal_for(
305 std::unique_lock<std::mutex>& lock, long timeout_us) const;
306 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
307 };
308
309 /** RAII guard that pushes/pops a scheduler context frame.
310
311 On construction, pushes a new context frame onto the
312 thread-local stack. On destruction, drains any remaining
313 private queue items to the global queue and pops the frame.
314 */
315 struct reactor_thread_context_guard
316 {
317 /// The context frame managed by this guard.
318 reactor_scheduler_context frame_;
319
320 /// Construct the guard, pushing a frame for @a sched.
321 394x explicit reactor_thread_context_guard(
322 reactor_scheduler_base const* sched) noexcept
323 394x : frame_(sched, reactor_context_stack.get())
324 {
325 394x reactor_context_stack.set(&frame_);
326 394x }
327
328 /// Destroy the guard, draining private work and popping the frame.
329 394x ~reactor_thread_context_guard() noexcept
330 {
331 394x if (!frame_.private_queue.empty())
332 frame_.key->drain_thread_queue(
333 frame_.private_queue, frame_.private_outstanding_work);
334 394x reactor_context_stack.set(frame_.next);
335 394x }
336 };
337
338 // ---- Inline implementations ------------------------------------------------
339
340 inline void
341 97876x reactor_scheduler_base::reset_inline_budget() const noexcept
342 {
343 97876x if (auto* ctx = reactor_find_context(this))
344 {
345 // Cap when no other thread absorbed queued work
346 97876x if (ctx->unassisted)
347 {
348 97876x ctx->inline_budget_max = 4;
349 97876x ctx->inline_budget = 4;
350 97876x return;
351 }
352 // Ramp up when previous cycle fully consumed budget
353 if (ctx->inline_budget == 0)
354 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
355 else if (ctx->inline_budget < ctx->inline_budget_max)
356 ctx->inline_budget_max = 2;
357 ctx->inline_budget = ctx->inline_budget_max;
358 }
359 }
360
361 inline bool
362 442601x reactor_scheduler_base::try_consume_inline_budget() const noexcept
363 {
364 442601x if (auto* ctx = reactor_find_context(this))
365 {
366 442601x if (ctx->inline_budget > 0)
367 {
368 354110x --ctx->inline_budget;
369 354110x return true;
370 }
371 }
372 88491x return false;
373 }
374
375 inline void
376 7187x reactor_scheduler_base::post(std::coroutine_handle<> h) const
377 {
378 struct post_handler final : scheduler_op
379 {
380 std::coroutine_handle<> h_;
381
382 7187x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
383 14374x ~post_handler() override = default;
384
385 7178x void operator()() override
386 {
387 7178x auto saved = h_;
388 7178x delete this;
389 // Ensure stores from the posting thread are visible
390 std::atomic_thread_fence(std::memory_order_acquire);
391 7178x saved.resume();
392 7178x }
393
394 9x void destroy() override
395 {
396 9x auto saved = h_;
397 9x delete this;
398 9x saved.destroy();
399 9x }
400 };
401
402 7187x auto ph = std::make_unique<post_handler>(h);
403
404 7187x if (auto* ctx = reactor_find_context(this))
405 {
406 5205x ++ctx->private_outstanding_work;
407 5205x ctx->private_queue.push(ph.release());
408 5205x return;
409 }
410
411 1982x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
412
413 1982x std::unique_lock lock(mutex_);
414 1982x completed_ops_.push(ph.release());
415 1982x wake_one_thread_and_unlock(lock);
416 7187x }
417
418 inline void
419 93839x reactor_scheduler_base::post(scheduler_op* h) const
420 {
421 93839x if (auto* ctx = reactor_find_context(this))
422 {
423 93805x ++ctx->private_outstanding_work;
424 93805x ctx->private_queue.push(h);
425 93805x return;
426 }
427
428 34x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
429
430 34x std::unique_lock lock(mutex_);
431 34x completed_ops_.push(h);
432 34x wake_one_thread_and_unlock(lock);
433 34x }
434
435 inline bool
436 1159x reactor_scheduler_base::running_in_this_thread() const noexcept
437 {
438 1159x return reactor_find_context(this) != nullptr;
439 }
440
441 inline void
442 392x reactor_scheduler_base::stop()
443 {
444 392x std::unique_lock lock(mutex_);
445 392x if (!stopped_)
446 {
447 351x stopped_ = true;
448 351x signal_all(lock);
449 351x interrupt_reactor();
450 }
451 392x }
452
453 inline bool
454 21x reactor_scheduler_base::stopped() const noexcept
455 {
456 21x std::unique_lock lock(mutex_);
457 42x return stopped_;
458 21x }
459
460 inline void
461 91x reactor_scheduler_base::restart()
462 {
463 91x std::unique_lock lock(mutex_);
464 91x stopped_ = false;
465 91x }
466
467 inline std::size_t
468 365x reactor_scheduler_base::run()
469 {
470 730x if (outstanding_work_.load(std::memory_order_acquire) == 0)
471 {
472 31x stop();
473 31x return 0;
474 }
475
476 334x reactor_thread_context_guard ctx(this);
477 334x std::unique_lock lock(mutex_);
478
479 334x std::size_t n = 0;
480 for (;;)
481 {
482 261572x if (!do_one(lock, -1, &ctx.frame_))
483 334x break;
484 261238x if (n != (std::numeric_limits<std::size_t>::max)())
485 261238x ++n;
486 261238x if (!lock.owns_lock())
487 167183x lock.lock();
488 }
489 334x return n;
490 334x }
491
492 inline std::size_t
493 2x reactor_scheduler_base::run_one()
494 {
495 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
496 {
497 stop();
498 return 0;
499 }
500
501 2x reactor_thread_context_guard ctx(this);
502 2x std::unique_lock lock(mutex_);
503 2x return do_one(lock, -1, &ctx.frame_);
504 2x }
505
506 inline std::size_t
507 61x reactor_scheduler_base::wait_one(long usec)
508 {
509 122x if (outstanding_work_.load(std::memory_order_acquire) == 0)
510 {
511 10x stop();
512 10x return 0;
513 }
514
515 51x reactor_thread_context_guard ctx(this);
516 51x std::unique_lock lock(mutex_);
517 51x return do_one(lock, usec, &ctx.frame_);
518 51x }
519
520 inline std::size_t
521 6x reactor_scheduler_base::poll()
522 {
523 12x if (outstanding_work_.load(std::memory_order_acquire) == 0)
524 {
525 1x stop();
526 1x return 0;
527 }
528
529 5x reactor_thread_context_guard ctx(this);
530 5x std::unique_lock lock(mutex_);
531
532 5x std::size_t n = 0;
533 for (;;)
534 {
535 11x if (!do_one(lock, 0, &ctx.frame_))
536 5x break;
537 6x if (n != (std::numeric_limits<std::size_t>::max)())
538 6x ++n;
539 6x if (!lock.owns_lock())
540 6x lock.lock();
541 }
542 5x return n;
543 5x }
544
545 inline std::size_t
546 4x reactor_scheduler_base::poll_one()
547 {
548 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
549 {
550 2x stop();
551 2x return 0;
552 }
553
554 2x reactor_thread_context_guard ctx(this);
555 2x std::unique_lock lock(mutex_);
556 2x return do_one(lock, 0, &ctx.frame_);
557 2x }
558
559 inline void
560 15551x reactor_scheduler_base::work_started() noexcept
561 {
562 15551x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
563 15551x }
564
565 inline void
566 22483x reactor_scheduler_base::work_finished() noexcept
567 {
568 44966x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
569 343x stop();
570 22483x }
571
572 inline void
573 151221x reactor_scheduler_base::compensating_work_started() const noexcept
574 {
575 151221x auto* ctx = reactor_find_context(this);
576 151221x if (ctx)
577 151221x ++ctx->private_outstanding_work;
578 151221x }
579
580 inline void
581 reactor_scheduler_base::drain_thread_queue(
582 op_queue& queue, std::int64_t count) const
583 {
584 if (count > 0)
585 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
586
587 std::unique_lock lock(mutex_);
588 completed_ops_.splice(queue);
589 if (count > 0)
590 maybe_unlock_and_signal_one(lock);
591 }
592
593 inline void
594 9069x reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
595 {
596 9069x if (ops.empty())
597 9069x return;
598
599 if (auto* ctx = reactor_find_context(this))
600 {
601 ctx->private_queue.splice(ops);
602 return;
603 }
604
605 std::unique_lock lock(mutex_);
606 completed_ops_.splice(ops);
607 wake_one_thread_and_unlock(lock);
608 }
609
610 inline void
611 466x reactor_scheduler_base::shutdown_drain()
612 {
613 466x std::unique_lock lock(mutex_);
614
615 1019x while (auto* h = completed_ops_.pop())
616 {
617 553x if (h == &task_op_)
618 466x continue;
619 87x lock.unlock();
620 87x h->destroy();
621 87x lock.lock();
622 553x }
623
624 466x signal_all(lock);
625 466x }
626
627 inline void
628 817x reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const
629 {
630 817x state_ |= signaled_bit;
631 817x cond_.notify_all();
632 817x }
633
634 inline bool
635 2016x reactor_scheduler_base::maybe_unlock_and_signal_one(
636 std::unique_lock<std::mutex>& lock) const
637 {
638 2016x state_ |= signaled_bit;
639 2016x if (state_ > signaled_bit)
640 {
641 lock.unlock();
642 cond_.notify_one();
643 return true;
644 }
645 2016x return false;
646 }
647
648 inline bool
649 307195x reactor_scheduler_base::unlock_and_signal_one(
650 std::unique_lock<std::mutex>& lock) const
651 {
652 307195x state_ |= signaled_bit;
653 307195x bool have_waiters = state_ > signaled_bit;
654 307195x lock.unlock();
655 307195x if (have_waiters)
656 cond_.notify_one();
657 307195x return have_waiters;
658 }
659
660 inline void
661 reactor_scheduler_base::clear_signal() const
662 {
663 state_ &= ~signaled_bit;
664 }
665
666 inline void
667 reactor_scheduler_base::wait_for_signal(
668 std::unique_lock<std::mutex>& lock) const
669 {
670 while ((state_ & signaled_bit) == 0)
671 {
672 state_ += waiter_increment;
673 cond_.wait(lock);
674 state_ -= waiter_increment;
675 }
676 }
677
678 inline void
679 reactor_scheduler_base::wait_for_signal_for(
680 std::unique_lock<std::mutex>& lock, long timeout_us) const
681 {
682 if ((state_ & signaled_bit) == 0)
683 {
684 state_ += waiter_increment;
685 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
686 state_ -= waiter_increment;
687 }
688 }
689
690 inline void
691 2016x reactor_scheduler_base::wake_one_thread_and_unlock(
692 std::unique_lock<std::mutex>& lock) const
693 {
694 2016x if (maybe_unlock_and_signal_one(lock))
695 return;
696
697 2016x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
698 {
699 26x task_interrupted_ = true;
700 26x lock.unlock();
701 26x interrupt_reactor();
702 }
703 else
704 {
705 1990x lock.unlock();
706 }
707 }
708
709 261299x inline reactor_scheduler_base::work_cleanup::~work_cleanup()
710 {
711 261299x if (ctx)
712 {
713 261299x std::int64_t produced = ctx->private_outstanding_work;
714 261299x if (produced > 1)
715 15x sched->outstanding_work_.fetch_add(
716 produced - 1, std::memory_order_relaxed);
717 261284x else if (produced < 1)
718 16001x sched->work_finished();
719 261299x ctx->private_outstanding_work = 0;
720
721 261299x if (!ctx->private_queue.empty())
722 {
723 94077x lock->lock();
724 94077x sched->completed_ops_.splice(ctx->private_queue);
725 }
726 }
727 else
728 {
729 sched->work_finished();
730 }
731 261299x }
732
733 342886x inline reactor_scheduler_base::task_cleanup::~task_cleanup()
734 {
735 171443x if (!ctx)
736 return;
737
738 171443x if (ctx->private_outstanding_work > 0)
739 {
740 4905x sched->outstanding_work_.fetch_add(
741 4905x ctx->private_outstanding_work, std::memory_order_relaxed);
742 4905x ctx->private_outstanding_work = 0;
743 }
744
745 171443x if (!ctx->private_queue.empty())
746 {
747 4905x if (!lock->owns_lock())
748 lock->lock();
749 4905x sched->completed_ops_.splice(ctx->private_queue);
750 }
751 171443x }
752
753 inline std::size_t
754 261638x reactor_scheduler_base::do_one(
755 std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx)
756 {
757 for (;;)
758 {
759 433081x if (stopped_)
760 335x return 0;
761
762 432746x scheduler_op* op = completed_ops_.pop();
763
764 // Handle reactor sentinel — time to poll for I/O
765 432746x if (op == &task_op_)
766 {
767 bool more_handlers =
768 171447x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
769
770 296998x if (!more_handlers &&
771 251102x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
772 timeout_us == 0))
773 {
774 4x completed_ops_.push(&task_op_);
775 4x return 0;
776 }
777
778 171443x task_interrupted_ = more_handlers || timeout_us == 0;
779 171443x task_running_.store(true, std::memory_order_release);
780
781 171443x if (more_handlers)
782 45896x unlock_and_signal_one(lock);
783
784 try
785 {
786 171443x run_task(lock, ctx);
787 }
788 catch (...)
789 {
790 task_running_.store(false, std::memory_order_relaxed);
791 throw;
792 }
793
794 171443x task_running_.store(false, std::memory_order_relaxed);
795 171443x completed_ops_.push(&task_op_);
796 171443x continue;
797 171443x }
798
799 // Handle operation
800 261299x if (op != nullptr)
801 {
802 261299x bool more = !completed_ops_.empty();
803
804 261299x if (more)
805 261299x ctx->unassisted = !unlock_and_signal_one(lock);
806 else
807 {
808 ctx->unassisted = false;
809 lock.unlock();
810 }
811
812 261299x work_cleanup on_exit{this, &lock, ctx};
813 (void)on_exit;
814
815 261299x (*op)();
816 261299x return 1;
817 261299x }
818
819 // Try private queue before blocking
820 if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
821 continue;
822
823 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
824 timeout_us == 0)
825 return 0;
826
827 clear_signal();
828 if (timeout_us < 0)
829 wait_for_signal(lock);
830 else
831 wait_for_signal_for(lock, timeout_us);
832 171443x }
833 }
834
835 } // namespace boost::corosio::detail
836
837 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
838