Class: Phronomy::Runtime::DeterministicScheduler Private
- Defined in:
- lib/phronomy/runtime/deterministic_scheduler.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Tick-based deterministic cooperative scheduler for testing.
Unlike FakeScheduler (which runs every task synchronously to completion before +spawn+ returns), +DeterministicScheduler+ pushes each task to a ready queue and only advances execution one step at a time via #tick. This makes it possible to test:
- Task interleaving (two tasks yielding control back and forth)
- Virtual-time timer firing order
- +await+ suspension and resumption
- Cancellation while a task is suspended
EXPERIMENTAL Fiber-based cooperative scheduler.
Uses Task::FiberBackend to run tasks cooperatively without OS threads. Intended for deterministic testing and, in future, as a production cooperative scheduler. Not recommended for production use.
Activated via +runtime_backend: :fiber+ in Phronomy.configure.
Defined Under Namespace
Classes: CoopSignal
Constant Summary
Constants inherited from Scheduler
Instance Attribute Summary collapse
-
#virtual_time ⇒ Float
readonly
private
Current virtual clock time (seconds since scheduler creation).
Instance Method Summary collapse
-
#advance(seconds) ⇒ self
private
Advances the virtual clock by +seconds+ and enqueues any timer callbacks that are now due.
-
#autorun? ⇒ Boolean
private
Returns +true+ when this scheduler is in autorun mode.
-
#complete_blocking_await ⇒ self
private
Marks one pending cooperative blocking-I/O await as complete.
-
#enqueue_fiber(callable) ⇒ self
private
Enqueues a callable (Fiber step or arbitrary block) onto the ready queue.
-
#fire_real_timers ⇒ self
private
Fires all wall-clock timer callbacks whose deadline has passed.
-
#idle? ⇒ Boolean
private
Returns +true+ when there are no ready entries to dispatch.
-
#initialize(autorun: false) ⇒ DeterministicScheduler
constructor
private
A new instance of DeterministicScheduler.
-
#new_signal ⇒ CoopSignal
private
Creates a new cooperative signal backed by CoopSignal.
-
#pending_real_timer_count ⇒ Integer
private
Returns the number of pending wall-clock timer entries (not yet fired).
-
#pending_timers ⇒ Array<Hash>
private
Returns a list of pending timer entries (not yet fired).
-
#raise_signal(signal) ⇒ void
private
Wakes up one Fiber waiting on +signal+.
-
#raise_signal_all(signal) ⇒ void
private
Wakes up all Fibers waiting on +signal+.
-
#ready_count ⇒ Integer
private
Returns the number of entries currently in the ready queue.
-
#run_until_idle ⇒ self
private
Drains the ready queue by calling #tick until it is empty.
-
#schedule_after(delay) { ... } ⇒ self
private
Schedules +callback+ to fire +delay+ seconds from now (virtual time).
-
#schedule_at(absolute_time) { ... } ⇒ self
private
Schedules +callback+ to fire at the given absolute virtual time.
-
#schedule_real_after(seconds) { ... } ⇒ self
private
Schedules +callback+ to fire +seconds+ from now (wall-clock time).
-
#spawn(name:, parent:, &block) ⇒ Task
private
Spawns a new Task backed by Task::FiberBackend and enqueues it.
-
#tick ⇒ self
private
Executes one ready entry (a fiber step or a timer callback).
-
#track_blocking_await ⇒ self
private
Registers one pending cooperative blocking-I/O await.
-
#wait_for_signal(signal) ⇒ void
private
Suspends the current Fiber until +signal+ is notified.
Methods inherited from Scheduler
Constructor Details
#initialize(autorun: false) ⇒ DeterministicScheduler
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of DeterministicScheduler.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 88 def initialize(autorun: false) @autorun = autorun @ready = [] # Array of callables ({ fiber.resume } or timer callbacks) @mutex = Mutex.new @virtual_time = 0.0 @timer_heap = [] # Array of { fire_at:, callback: } @real_timer_heap = [] # Array of [fire_at_monotonic, callback] for wall-clock timers @clock = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) } # Tracks Fibers suspended in BlockingAdapterPool#await so that # run_until_idle knows to keep looping until worker threads complete. # Protected by @await_mutex (separate from @mutex to avoid contention). @pending_awaits = 0 @await_mutex = Mutex.new @await_cond = ConditionVariable.new end |
Instance Attribute Details
#virtual_time ⇒ Float (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns current virtual clock time (seconds since scheduler creation).
80 81 82 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 80 def virtual_time @virtual_time end |
Instance Method Details
#advance(seconds) ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Advances the virtual clock by +seconds+ and enqueues any timer callbacks that are now due.
241 242 243 244 245 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 241 def advance(seconds) @virtual_time += seconds fire_due_timers self end |
#autorun? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns +true+ when this scheduler is in autorun mode.
107 108 109 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 107 def autorun? @autorun end |
#complete_blocking_await ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Marks one pending cooperative blocking-I/O await as complete. Called from the BlockingAdapterPool::PendingOperation#on_complete callback (on the pool worker thread) after the result is ready. Decrements the counter and broadcasts to wake #run_until_idle.
374 375 376 377 378 379 380 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 374 def complete_blocking_await @await_mutex.synchronize do @pending_awaits -= 1 @await_cond.broadcast end self end |
#enqueue_fiber(callable) ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Enqueues a callable (Fiber step or arbitrary block) onto the ready queue. Called by Task::FiberBackend#await to resume a waiting Fiber. Also wakes any thread blocked in #run_until_idle waiting for external completion signals (e.g. from BlockingAdapterPool worker threads).
279 280 281 282 283 284 285 286 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 279 def enqueue_fiber(callable) @mutex.synchronize { @ready << callable } # Broadcast to wake run_until_idle if it is sleeping on @await_cond. # @await_mutex is always acquired AFTER releasing @mutex (never nested) # to guarantee consistent lock ordering and avoid deadlocks. @await_mutex.synchronize { @await_cond.broadcast } self end |
#fire_real_timers ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Fires all wall-clock timer callbacks whose deadline has passed. Enqueues each fired callback onto the ready queue for scheduler dispatch.
339 340 341 342 343 344 345 346 347 348 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 339 def fire_real_timers now = @clock.call due = @mutex.synchronize do ready, pending = @real_timer_heap.partition { |(t, _)| t <= now } @real_timer_heap.replace(pending) ready end due.each { |(_, cb)| enqueue_fiber(cb) } self end |
#idle? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns +true+ when there are no ready entries to dispatch.
291 292 293 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 291 def idle? @mutex.synchronize { @ready.empty? } end |
#new_signal ⇒ CoopSignal
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Creates a new cooperative signal backed by CoopSignal.
140 141 142 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 140 def new_signal CoopSignal.new(self) end |
#pending_real_timer_count ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the number of pending wall-clock timer entries (not yet fired).
353 354 355 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 353 def pending_real_timer_count @mutex.synchronize { @real_timer_heap.size } end |
#pending_timers ⇒ Array<Hash>
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a list of pending timer entries (not yet fired). Each entry has +:fire_at+ and +:description+ (if set) keys.
306 307 308 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 306 def pending_timers @mutex.synchronize { @timer_heap.dup } end |
#raise_signal(signal) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Wakes up one Fiber waiting on +signal+.
156 157 158 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 156 def raise_signal(signal) signal.notify_one end |
#raise_signal_all(signal) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Wakes up all Fibers waiting on +signal+.
164 165 166 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 164 def raise_signal_all(signal) signal.notify_all end |
#ready_count ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the number of entries currently in the ready queue.
298 299 300 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 298 def ready_count @mutex.synchronize { @ready.size } end |
#run_until_idle ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Drains the ready queue by calling #tick until it is empty.
In autorun mode (#autorun? is +true+), also handles wall-clock timers and cooperative blocking-I/O awaits:
- Fires any timers whose deadline has already passed on each iteration.
- When all ready tasks are done but future timers remain pending, sleeps until the next deadline and fires them.
- When Fibers are suspended in BlockingAdapterPool::PendingOperation#await (tracked via #track_blocking_await), waits on a condition variable that is broadcast by #enqueue_fiber when the worker thread completes (Issue #338). This ensures run_until_idle does not exit while blocking I/O operations are still in flight.
Does not fire pending virtual timers — call #advance for those.
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 204 def run_until_idle if @autorun loop do fire_real_timers tick until idle? # Atomically check all exit conditions. should_break = @await_mutex.synchronize do idle? && pending_real_timer_count.zero? && @pending_awaits.zero? end break if should_break if idle? if pending_real_timer_count > 0 && @await_mutex.synchronize { @pending_awaits.zero? } # Only real timers pending — sleep until the next deadline. sleep_until_next_real_timer else # Pending blocking awaits (pool workers still running). # Wait for the completion signal broadcast by enqueue_fiber / # complete_blocking_await (30-second safety cap). @await_mutex.synchronize { @await_cond.wait(@await_mutex, 30) } end end end else tick until idle? end self end |
#schedule_after(delay) { ... } ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedules +callback+ to fire +delay+ seconds from now (virtual time).
267 268 269 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 267 def schedule_after(delay, &callback) schedule_at(@virtual_time + delay, &callback) end |
#schedule_at(absolute_time) { ... } ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedules +callback+ to fire at the given absolute virtual time.
253 254 255 256 257 258 259 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 253 def schedule_at(absolute_time, &callback) @mutex.synchronize do @timer_heap << {fire_at: absolute_time, callback: callback} @timer_heap.sort_by! { |e| e[:fire_at] } end self end |
#schedule_real_after(seconds) { ... } ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Schedules +callback+ to fire +seconds+ from now (wall-clock time).
Unlike #schedule_after (which uses virtual time), this method uses the real monotonic clock. Callbacks are fired during #run_until_idle when #autorun? is +true+, or explicitly via #fire_real_timers.
This is the integration point for TimerQueue replacement: when a Phronomy::Runtime is backed by a +DeterministicScheduler+, its Phronomy::Runtime#timer_queue returns a SchedulerTimerAdapter that delegates here instead of spawning a background OS thread.
325 326 327 328 329 330 331 332 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 325 def schedule_real_after(seconds, &callback) fire_at = @clock.call + seconds.to_f @mutex.synchronize do @real_timer_heap << [fire_at, callback] @real_timer_heap.sort_by! { |(t, _)| t } end self end |
#spawn(name:, parent:, &block) ⇒ Task
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Spawns a new Task backed by Task::FiberBackend and enqueues it. The task does NOT start executing until #tick is called.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 118 def spawn(name:, parent:, &block) task = Task.spawn(name: name, parent: parent, backend_class: Task::FiberBackend, &block) backend = task.backend # Build a self-rescheduling step: after each step, re-enqueue if the # Fiber yielded cooperatively and is still alive. step_callable = nil step_callable = lambda do backend.step enqueue_fiber(step_callable) if backend.alive? && !backend.cooperative_suspend? end enqueue_fiber(step_callable) # Auto-run only when called from outside a running scheduler tick. # When SCHEDULER_KEY is set, the calling code is already inside a managed # Fiber; the outer run_until_idle loop will pick up the new task on the # next iteration without a recursive re-entry. run_until_idle if @autorun && Thread.current.thread_variable_get(SCHEDULER_KEY).nil? task end |
#tick ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Executes one ready entry (a fiber step or a timer callback). Sets the thread-local scheduler reference so that +FiberBackend#await+ can suspend cooperatively.
174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 174 def tick callable = @mutex.synchronize { @ready.shift } return self unless callable # Use thread_variable_set (not Thread#[]) so the value is accessible from # any Fiber running on this OS thread, not just the current Fiber. prev = Thread.current.thread_variable_get(SCHEDULER_KEY) Thread.current.thread_variable_set(SCHEDULER_KEY, self) callable.call ensure Thread.current.thread_variable_set(SCHEDULER_KEY, prev) end |
#track_blocking_await ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers one pending cooperative blocking-I/O await. Called by BlockingAdapterPool::PendingOperation#await before +Fiber.yield+ so that #run_until_idle knows not to exit yet. Each call must be balanced by a #complete_blocking_await call.
363 364 365 366 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 363 def track_blocking_await @await_mutex.synchronize { @pending_awaits += 1 } self end |
#wait_for_signal(signal) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Suspends the current Fiber until +signal+ is notified.
148 149 150 |
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 148 def wait_for_signal(signal) signal.wait end |