Class: Phronomy::Runtime::DeterministicScheduler Private

Inherits:
Scheduler
  • Object
show all
Defined in:
lib/phronomy/runtime/deterministic_scheduler.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Tick-based deterministic cooperative scheduler for testing.

Unlike FakeScheduler (which runs every task synchronously to completion before +spawn+ returns), +DeterministicScheduler+ pushes each task to a ready queue and only advances execution one step at a time via #tick. This makes it possible to test:

  • Task interleaving (two tasks yielding control back and forth)
  • Virtual-time timer firing order
  • +await+ suspension and resumption
  • Cancellation while a task is suspended

EXPERIMENTAL Fiber-based cooperative scheduler.

Uses Task::FiberBackend to run tasks cooperatively without OS threads. Intended for deterministic testing and, in future, as a production cooperative scheduler. Not recommended for production use.

Activated via +runtime_backend: :fiber+ in Phronomy.configure.

Examples:

Basic usage

sched = Phronomy::Runtime::DeterministicScheduler.new
rt    = Phronomy::Runtime.new(scheduler: sched)

rt.spawn { Fiber.yield; :done }   # not started yet
sched.tick                        # runs until first Fiber.yield
sched.tick                        # runs to completion
sched.run_until_idle              # same as calling tick until empty

Virtual clock

sched.schedule_after(1.0) { puts "fired at T=1" }
sched.advance(1.0)    # moves virtual clock forward, fires the timer
sched.run_until_idle  # dispatches the timer callback

Defined Under Namespace

Classes: CoopSignal

Constant Summary

Constants inherited from Scheduler

Scheduler::SCHEDULER_KEY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Scheduler

current, #yield

Constructor Details

#initialize(autorun: false) ⇒ DeterministicScheduler

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of DeterministicScheduler.

Parameters:

  • autorun (Boolean) (defaults to: false)

    when +true+, each call to #spawn automatically drains the ready queue via #run_until_idle before returning the task. This makes +DeterministicScheduler+ behave like FakeScheduler (tasks complete synchronously) while still executing them on real Fibers. Used internally by the +:fiber+ runtime backend.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 88

def initialize(autorun: false)
  @autorun = autorun
  @ready = []  # Array of callables ({ fiber.resume } or timer callbacks)
  @mutex = Mutex.new
  @virtual_time = 0.0
  @timer_heap = []  # Array of { fire_at:, callback: }
  @real_timer_heap = []  # Array of [fire_at_monotonic, callback] for wall-clock timers
  @clock = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }
  # Tracks Fibers suspended in BlockingAdapterPool#await so that
  # run_until_idle knows to keep looping until worker threads complete.
  # Protected by @await_mutex (separate from @mutex to avoid contention).
  @pending_awaits = 0
  @await_mutex = Mutex.new
  @await_cond = ConditionVariable.new
end

Instance Attribute Details

#virtual_timeFloat (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns current virtual clock time (seconds since scheduler creation).

Returns:

  • (Float)

    current virtual clock time (seconds since scheduler creation)



80
81
82
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 80

def virtual_time
  @virtual_time
end

Instance Method Details

#advance(seconds) ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Advances the virtual clock by +seconds+ and enqueues any timer callbacks that are now due.

Parameters:

  • seconds (Numeric)

Returns:

  • (self)


241
242
243
244
245
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 241

def advance(seconds)
  @virtual_time += seconds
  fire_due_timers
  self
end

#autorun?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns +true+ when this scheduler is in autorun mode.

Returns:

  • (Boolean)


107
108
109
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 107

def autorun?
  @autorun
end

#complete_blocking_awaitself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Marks one pending cooperative blocking-I/O await as complete. Called from the BlockingAdapterPool::PendingOperation#on_complete callback (on the pool worker thread) after the result is ready. Decrements the counter and broadcasts to wake #run_until_idle.

Returns:

  • (self)


374
375
376
377
378
379
380
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 374

def complete_blocking_await
  @await_mutex.synchronize do
    @pending_awaits -= 1
    @await_cond.broadcast
  end
  self
end

#enqueue_fiber(callable) ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Enqueues a callable (Fiber step or arbitrary block) onto the ready queue. Called by Task::FiberBackend#await to resume a waiting Fiber. Also wakes any thread blocked in #run_until_idle waiting for external completion signals (e.g. from BlockingAdapterPool worker threads).

Parameters:

  • callable (#call)

Returns:

  • (self)


279
280
281
282
283
284
285
286
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 279

def enqueue_fiber(callable)
  @mutex.synchronize { @ready << callable }
  # Broadcast to wake run_until_idle if it is sleeping on @await_cond.
  # @await_mutex is always acquired AFTER releasing @mutex (never nested)
  # to guarantee consistent lock ordering and avoid deadlocks.
  @await_mutex.synchronize { @await_cond.broadcast }
  self
end

#fire_real_timersself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Fires all wall-clock timer callbacks whose deadline has passed. Enqueues each fired callback onto the ready queue for scheduler dispatch.

Returns:

  • (self)


339
340
341
342
343
344
345
346
347
348
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 339

def fire_real_timers
  now = @clock.call
  due = @mutex.synchronize do
    ready, pending = @real_timer_heap.partition { |(t, _)| t <= now }
    @real_timer_heap.replace(pending)
    ready
  end
  due.each { |(_, cb)| enqueue_fiber(cb) }
  self
end

#idle?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns +true+ when there are no ready entries to dispatch.

Returns:

  • (Boolean)


291
292
293
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 291

def idle?
  @mutex.synchronize { @ready.empty? }
end

#new_signalCoopSignal

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Creates a new cooperative signal backed by CoopSignal.

Returns:



140
141
142
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 140

def new_signal
  CoopSignal.new(self)
end

#pending_real_timer_countInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the number of pending wall-clock timer entries (not yet fired).

Returns:

  • (Integer)


353
354
355
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 353

def pending_real_timer_count
  @mutex.synchronize { @real_timer_heap.size }
end

#pending_timersArray<Hash>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a list of pending timer entries (not yet fired). Each entry has +:fire_at+ and +:description+ (if set) keys.

Returns:

  • (Array<Hash>)


306
307
308
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 306

def pending_timers
  @mutex.synchronize { @timer_heap.dup }
end

#raise_signal(signal) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Wakes up one Fiber waiting on +signal+.

Parameters:



156
157
158
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 156

def raise_signal(signal)
  signal.notify_one
end

#raise_signal_all(signal) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Wakes up all Fibers waiting on +signal+.

Parameters:



164
165
166
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 164

def raise_signal_all(signal)
  signal.notify_all
end

#ready_countInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the number of entries currently in the ready queue.

Returns:

  • (Integer)


298
299
300
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 298

def ready_count
  @mutex.synchronize { @ready.size }
end

#run_until_idleself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Drains the ready queue by calling #tick until it is empty.

In autorun mode (#autorun? is +true+), also handles wall-clock timers and cooperative blocking-I/O awaits:

  • Fires any timers whose deadline has already passed on each iteration.
  • When all ready tasks are done but future timers remain pending, sleeps until the next deadline and fires them.
  • When Fibers are suspended in BlockingAdapterPool::PendingOperation#await (tracked via #track_blocking_await), waits on a condition variable that is broadcast by #enqueue_fiber when the worker thread completes (Issue #338). This ensures run_until_idle does not exit while blocking I/O operations are still in flight.

Does not fire pending virtual timers — call #advance for those.

Returns:

  • (self)


204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 204

def run_until_idle
  if @autorun
    loop do
      fire_real_timers
      tick until idle?

      # Atomically check all exit conditions.
      should_break = @await_mutex.synchronize do
        idle? && pending_real_timer_count.zero? && @pending_awaits.zero?
      end
      break if should_break

      if idle?
        if pending_real_timer_count > 0 &&
            @await_mutex.synchronize { @pending_awaits.zero? }
          # Only real timers pending — sleep until the next deadline.
          sleep_until_next_real_timer
        else
          # Pending blocking awaits (pool workers still running).
          # Wait for the completion signal broadcast by enqueue_fiber /
          # complete_blocking_await (30-second safety cap).
          @await_mutex.synchronize { @await_cond.wait(@await_mutex, 30) }
        end
      end
    end
  else
    tick until idle?
  end
  self
end

#schedule_after(delay) { ... } ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Schedules +callback+ to fire +delay+ seconds from now (virtual time).

Parameters:

  • delay (Numeric)

Yields:

  • callback

Returns:

  • (self)


267
268
269
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 267

def schedule_after(delay, &callback)
  schedule_at(@virtual_time + delay, &callback)
end

#schedule_at(absolute_time) { ... } ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Schedules +callback+ to fire at the given absolute virtual time.

Parameters:

  • absolute_time (Float)

Yields:

  • callback to invoke when the virtual clock reaches +absolute_time+

Returns:

  • (self)


253
254
255
256
257
258
259
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 253

def schedule_at(absolute_time, &callback)
  @mutex.synchronize do
    @timer_heap << {fire_at: absolute_time, callback: callback}
    @timer_heap.sort_by! { |e| e[:fire_at] }
  end
  self
end

#schedule_real_after(seconds) { ... } ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Schedules +callback+ to fire +seconds+ from now (wall-clock time).

Unlike #schedule_after (which uses virtual time), this method uses the real monotonic clock. Callbacks are fired during #run_until_idle when #autorun? is +true+, or explicitly via #fire_real_timers.

This is the integration point for TimerQueue replacement: when a Phronomy::Runtime is backed by a +DeterministicScheduler+, its Phronomy::Runtime#timer_queue returns a SchedulerTimerAdapter that delegates here instead of spawning a background OS thread.

Parameters:

  • seconds (Numeric)

    delay before the callback fires

Yields:

  • called when the deadline is reached

Returns:

  • (self)


325
326
327
328
329
330
331
332
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 325

def schedule_real_after(seconds, &callback)
  fire_at = @clock.call + seconds.to_f
  @mutex.synchronize do
    @real_timer_heap << [fire_at, callback]
    @real_timer_heap.sort_by! { |(t, _)| t }
  end
  self
end

#spawn(name:, parent:, &block) ⇒ Task

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Spawns a new Task backed by Task::FiberBackend and enqueues it. The task does NOT start executing until #tick is called.

Parameters:

  • name (String, nil)
  • parent (Task, nil)

Returns:



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 118

def spawn(name:, parent:, &block)
  task = Task.spawn(name: name, parent: parent, backend_class: Task::FiberBackend, &block)
  backend = task.backend
  # Build a self-rescheduling step: after each step, re-enqueue if the
  # Fiber yielded cooperatively and is still alive.
  step_callable = nil
  step_callable = lambda do
    backend.step
    enqueue_fiber(step_callable) if backend.alive? && !backend.cooperative_suspend?
  end
  enqueue_fiber(step_callable)
  # Auto-run only when called from outside a running scheduler tick.
  # When SCHEDULER_KEY is set, the calling code is already inside a managed
  # Fiber; the outer run_until_idle loop will pick up the new task on the
  # next iteration without a recursive re-entry.
  run_until_idle if @autorun && Thread.current.thread_variable_get(SCHEDULER_KEY).nil?
  task
end

#tickself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Executes one ready entry (a fiber step or a timer callback). Sets the thread-local scheduler reference so that +FiberBackend#await+ can suspend cooperatively.

Returns:

  • (self)


174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 174

def tick
  callable = @mutex.synchronize { @ready.shift }
  return self unless callable

  # Use thread_variable_set (not Thread#[]) so the value is accessible from
  # any Fiber running on this OS thread, not just the current Fiber.
  prev = Thread.current.thread_variable_get(SCHEDULER_KEY)
  Thread.current.thread_variable_set(SCHEDULER_KEY, self)
  callable.call
ensure
  Thread.current.thread_variable_set(SCHEDULER_KEY, prev)
end

#track_blocking_awaitself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers one pending cooperative blocking-I/O await. Called by BlockingAdapterPool::PendingOperation#await before +Fiber.yield+ so that #run_until_idle knows not to exit yet. Each call must be balanced by a #complete_blocking_await call.

Returns:

  • (self)


363
364
365
366
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 363

def track_blocking_await
  @await_mutex.synchronize { @pending_awaits += 1 }
  self
end

#wait_for_signal(signal) ⇒ void

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method returns an undefined value.

Suspends the current Fiber until +signal+ is notified.

Parameters:



148
149
150
# File 'lib/phronomy/runtime/deterministic_scheduler.rb', line 148

def wait_for_signal(signal)
  signal.wait
end