Class: Phronomy::Runtime
- Inherits:
-
Object
- Object
- Phronomy::Runtime
- Defined in:
- lib/phronomy/runtime.rb,
lib/phronomy/runtime/scheduler.rb,
lib/phronomy/runtime/timer_queue.rb,
lib/phronomy/runtime/task_registry.rb,
lib/phronomy/runtime/timer_service.rb,
lib/phronomy/runtime/fake_scheduler.rb,
lib/phronomy/runtime/runtime_metrics.rb,
lib/phronomy/runtime/thread_scheduler.rb,
lib/phronomy/runtime/deterministic_scheduler.rb,
lib/phronomy/runtime/scheduler_timer_adapter.rb
Overview
Central authority for concurrent primitives.
+Runtime+ is the single place that creates Tasks, TaskGroups, and manages the lifecycle of all concurrency in Phronomy. It owns:
- a pluggable Scheduler (default: ThreadScheduler)
- a task registry for graceful shutdown
- the shared BlockingAdapterPool
In production, use the process-wide singleton via Runtime.instance. In tests, construct a Runtime with a FakeScheduler to run tasks synchronously without spawning additional threads:
Defined Under Namespace
Classes: DeterministicScheduler, FakeScheduler, RuntimeMetrics, Scheduler, SchedulerTimerAdapter, TaskRegistry, ThreadScheduler, TimerQueue, TimerService
Instance Attribute Summary collapse
-
#scheduler ⇒ Scheduler
readonly
The scheduler backing this runtime instance.
Class Method Summary collapse
-
.in_scheduler_context? ⇒ Boolean
private
Returns +true+ when the calling thread is executing inside an active scheduler task (i.e. Task.current is non-nil).
-
.instance ⇒ Runtime
private
Returns the process-wide default Runtime.
-
.instance=(runtime) ⇒ Runtime
private
Replaces the process-wide default Runtime.
-
.measure_ms { ... } ⇒ Array(Object, Integer)
private
Executes +block+ and returns +[result, elapsed_ms]+ where +elapsed_ms+ is the wall-clock duration in milliseconds (Integer, rounded).
Instance Method Summary collapse
-
#blocking_io(pool_size: 10, queue_size: 100) ⇒ BlockingAdapterPool
private
Returns the shared BlockingAdapterPool for this Runtime.
-
#gate(name) ⇒ ConcurrencyGate
private
Returns (or lazily creates) the ConcurrencyGate for the named resource.
-
#initialize(scheduler: ThreadScheduler.new) ⇒ Runtime
constructor
private
A new instance of Runtime.
-
#non_yield_threshold_violation_count ⇒ Integer
private
Number of times a task has exceeded the CPU-bound detection threshold (i.e. ran longer than +blocking_detect_threshold_ms+ without yielding).
-
#pool(name, size: 10, queue_size: 100) ⇒ BlockingAdapterPool
private
Returns (or lazily creates) a named BlockingAdapterPool.
-
#reset_gate(name) ⇒ void
private
Drops the cached gate for +name+ so that the next call to #gate rebuilds it from the current configuration.
-
#shutdown ⇒ void
private
Waits for all registered tasks to finish, then shuts down the EventLoop (if active), blocking adapter pool, named pools, and timer queue (if they were started).
-
#spawn(name: nil) { ... } ⇒ Task
private
Spawns a single Task using the runtime's scheduler.
-
#task_group(limit: Float::INFINITY, failure_policy: :fail_fast) ⇒ TaskGroup
private
Creates a new TaskGroup with an optional concurrency cap.
-
#task_snapshot ⇒ Hash{Symbol => Numeric}
private
Returns a snapshot of task-centric metrics for the current Runtime.
-
#timer_queue ⇒ TimerQueue, SchedulerTimerAdapter
private
Returns the shared timer queue for this Runtime.
-
#yield ⇒ void
private
Cooperative yield point.
-
#yield_if_needed(every: 1000) ⇒ void
private
Cooperative yield point with a call-count gate.
Constructor Details
#initialize(scheduler: ThreadScheduler.new) ⇒ Runtime
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of Runtime.
123 124 125 126 127 128 129 130 |
# File 'lib/phronomy/runtime.rb', line 123 def initialize(scheduler: ThreadScheduler.new) @scheduler = scheduler @task_registry = TaskRegistry.new @metrics = RuntimeMetrics.new @gate_registry = Phronomy::Concurrency::GateRegistry.new @pool_registry = Phronomy::Concurrency::PoolRegistry.new @timer_service = TimerService.new(scheduler) end |
Instance Attribute Details
#scheduler ⇒ Scheduler (readonly)
The scheduler backing this runtime instance.
119 120 121 |
# File 'lib/phronomy/runtime.rb', line 119 def scheduler @scheduler end |
Class Method Details
.in_scheduler_context? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns +true+ when the calling thread is executing inside an active scheduler task (i.e. Task.current is non-nil). Code running inside a #spawn block is always in a scheduler context.
Use this to detect potential scheduler-blocking calls: if Phronomy::Runtime.in_scheduler_context? Phronomy.configuration.logger&.warn("blocking call inside scheduler task") end
96 97 98 |
# File 'lib/phronomy/runtime.rb', line 96 def self.in_scheduler_context? !Task.current.nil? end |
.instance ⇒ Runtime
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the process-wide default Runtime.
Auto-creates an instance using the scheduler backend specified by +Phronomy.configuration.runtime_backend+:
- +:thread+ (default) — ThreadScheduler (one OS thread per task)
- +:immediate+ — FakeScheduler (synchronous, no extra threads)
- +:fiber+ — DeterministicScheduler in autorun mode (EXPERIMENTAL; Fiber-based synchronous execution; not yet suitable for production because it uses virtual time rather than real wall-clock timers)
- +:cooperative+ — deprecated alias for +:immediate+
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/phronomy/runtime.rb', line 50 def self.instance @instance ||= begin scheduler = case Phronomy.configuration.runtime_backend when :cooperative Phronomy.configuration.logger&.warn( "[phronomy] runtime_backend: :cooperative is a deprecated alias for :immediate. " \ "Use :immediate for synchronous/test execution. " \ ":cooperative will be reassigned when a real cooperative Fiber-based scheduler is available." ) FakeScheduler.new when :immediate FakeScheduler.new when :fiber Phronomy.configuration.logger&.warn( "[phronomy] runtime_backend: :fiber uses DeterministicScheduler in autorun mode. " \ "This is an EXPERIMENTAL Fiber-based cooperative scheduler. " \ "Wall-clock timer integration is available via SchedulerTimerAdapter (Issues #331, #337). " \ "Not recommended for production use." ) DeterministicScheduler.new(autorun: true) else ThreadScheduler.new end new(scheduler: scheduler) end end |
.instance=(runtime) ⇒ Runtime
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Replaces the process-wide default Runtime. Useful in tests.
81 82 83 |
# File 'lib/phronomy/runtime.rb', line 81 def self.instance=(runtime) @instance = runtime end |
.measure_ms { ... } ⇒ Array(Object, Integer)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Executes +block+ and returns +[result, elapsed_ms]+ where +elapsed_ms+ is the wall-clock duration in milliseconds (Integer, rounded).
Isolates all direct references to +Process.clock_gettime+ / +Process::CLOCK_MONOTONIC+ in one place so that callers stay at the framework abstraction level.
110 111 112 113 114 115 |
# File 'lib/phronomy/runtime.rb', line 110 def self.measure_ms t0 = Process.clock_gettime(Process::CLOCK_MONOTONIC) result = yield elapsed_ms = ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - t0) * 1000).round [result, elapsed_ms] end |
Instance Method Details
#blocking_io(pool_size: 10, queue_size: 100) ⇒ BlockingAdapterPool
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the shared BlockingAdapterPool for this Runtime. All blocking I/O (LLM HTTP, MCP, ActiveRecord, Redis) should be submitted through this pool.
Pool settings default to 10 workers / 100-deep queue. Override by constructing a Runtime with custom pool options or by replacing the shared Runtime via instance= in tests.
310 311 312 |
# File 'lib/phronomy/runtime.rb', line 310 def blocking_io(pool_size: 10, queue_size: 100) @pool_registry.default_pool(pool_size: pool_size, queue_size: queue_size) end |
#gate(name) ⇒ ConcurrencyGate
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns (or lazily creates) the ConcurrencyGate for the named resource.
Gate caps are read from the global Configuration when the gate is first accessed; subsequent calls return the cached gate. To change the cap at runtime, call #reset_gate first.
141 142 143 |
# File 'lib/phronomy/runtime.rb', line 141 def gate(name) @gate_registry.get(name.to_sym) end |
#non_yield_threshold_violation_count ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Number of times a task has exceeded the CPU-bound detection threshold (i.e. ran longer than +blocking_detect_threshold_ms+ without yielding). Resets to 0 when the Runtime is recreated.
196 197 198 |
# File 'lib/phronomy/runtime.rb', line 196 def non_yield_threshold_violation_count @metrics.starvation_count end |
#pool(name, size: 10, queue_size: 100) ⇒ BlockingAdapterPool
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns (or lazily creates) a named BlockingAdapterPool.
Named pools allow per-subsystem thread-budget control and observability. Recommended pool names: +:llm+, +:mcp+, +:db+, +:redis+, +:tool+. Each pool gets its own dedicated worker threads labelled with the pool name.
329 330 331 |
# File 'lib/phronomy/runtime.rb', line 329 def pool(name, size: 10, queue_size: 100) @pool_registry.named_pool(name, size: size, queue_size: queue_size) end |
#reset_gate(name) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Drops the cached gate for +name+ so that the next call to #gate rebuilds it from the current configuration. Useful in tests.
151 152 153 |
# File 'lib/phronomy/runtime.rb', line 151 def reset_gate(name) @gate_registry.reset(name.to_sym) end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Waits for all registered tasks to finish, then shuts down the EventLoop (if active), blocking adapter pool, named pools, and timer queue (if they were started).
When EventLoop mode is enabled, all pending Workflow and Agent FSM events are drained before pools are shut down, ensuring in-flight sessions complete cleanly.
Call this before process exit to avoid leaving orphaned threads or pending work items.
366 367 368 369 370 371 372 373 374 375 |
# File 'lib/phronomy/runtime.rb', line 366 def shutdown @task_registry.drain # Drain EventLoop events before stopping pools so that in-flight # Workflow / Agent FSM sessions can complete their final LLM calls. if Phronomy.configuration.event_loop Phronomy::EventLoop.instance.stop(drain: true) end @pool_registry.shutdown @timer_service.shutdown end |
#spawn(name: nil) { ... } ⇒ Task
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Spawns a single Task using the runtime's scheduler.
The spawned task is registered in the task registry so #shutdown can wait for it to complete. The task is automatically deregistered from the registry when it finishes (success, failure, or cancellation) so long-lived runtimes do not accumulate stale references.
Task names beginning with a recognised type prefix are counted in the task-centric metrics returned by #task_snapshot. Recognised prefixes: +agent-+, +tool-+, +workflow-+, +rag-+, +llm-+, +vector-+.
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/phronomy/runtime.rb', line 248 def spawn(name: nil, &block) type = _task_type(name) spawn_at = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) @metrics.record_start(type) task = @scheduler.spawn(name: name, parent: Task.current) do run_start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) @metrics.record_wait(run_start - spawn_at) begin result = block.call @metrics.record_end(type, :completed, run_start) result rescue CancellationError @metrics.record_end(type, :cancelled, run_start) raise rescue => e @metrics.record_end(type, :failed, run_start) raise e ensure current = Task.current @task_registry.deregister(current) if current end end @task_registry.register(task) task end |
#task_group(limit: Float::INFINITY, failure_policy: :fail_fast) ⇒ TaskGroup
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Creates a new TaskGroup with an optional concurrency cap.
228 229 230 |
# File 'lib/phronomy/runtime.rb', line 228 def task_group(limit: Float::INFINITY, failure_policy: :fail_fast) TaskGroup.new(limit: limit, failure_policy: failure_policy, runtime: self) end |
#task_snapshot ⇒ Hash{Symbol => Numeric}
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a snapshot of task-centric metrics for the current Runtime.
| Key | Description |
|---|---|
active_agent_tasks |
currently running agent spawns |
active_tool_tasks |
currently running tool spawns |
active_workflow_tasks |
currently running workflow spawns |
active_rag_tasks |
currently running RAG fetches |
active_llm_tasks |
currently running LLM calls |
task_wait_time_p50_ms |
p50 spawn-to-start latency (ms) |
task_wait_time_p95_ms |
p95 spawn-to-start latency (ms) |
task_run_time_p50_ms |
p50 execution duration (ms) |
task_run_time_p95_ms |
p95 execution duration (ms) |
cancelled_tasks |
total cancelled task count |
failed_tasks |
total failed task count |
non_yield_threshold_violation_count |
cumulative count of tasks that ran past blocking_detect_threshold_ms without yielding |
294 295 296 |
# File 'lib/phronomy/runtime.rb', line 294 def task_snapshot @metrics.snapshot end |
#timer_queue ⇒ TimerQueue, SchedulerTimerAdapter
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the shared timer queue for this Runtime.
When the scheduler is a DeterministicScheduler (e.g. the +:fiber+ runtime backend), returns a SchedulerTimerAdapter that integrates with the scheduler's tick cycle instead of spawning a background OS thread. This is the first concrete step of the TimerQueue scheduler-tick integration described in ADR-010 (Issue #331).
For all other schedulers, returns a TimerQueue backed by a single background thread.
All deadline-based cancellation should be registered here instead of spawning one-off sleep threads. Lazily created on first access.
349 350 351 |
# File 'lib/phronomy/runtime.rb', line 349 def timer_queue @timer_service.timer_queue end |
#yield ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Cooperative yield point.
Signals the scheduler that the current task is willing to give up CPU time so that other ready tasks can run. On the default ThreadScheduler this calls +Thread.pass+. On a future fiber-based scheduler this would switch to the next runnable fiber.
When +blocking_detect_threshold_ms+ is configured, checks whether the current task has exceeded that threshold without yielding; if so, emits a warning via the configured logger and increments +non_yield_threshold_violation_count+.
Call this inside tight loops or CPU-intensive sections of tool +execute+ methods and Workflow actions to keep the scheduler responsive.
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/phronomy/runtime.rb', line 172 def yield if (threshold = Phronomy.configuration.blocking_detect_threshold_ms) slice_start = Task.current_cpu_slice_start_ms if slice_start elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) - slice_start if elapsed > threshold name = Task.current&.name || "unknown" Phronomy.configuration.logger&.warn( "[Phronomy] CPU-bound task detected: '#{name}' ran #{elapsed.round}ms " \ "without yielding (threshold: #{threshold}ms)" ) @metrics.increment_starvation end end end Task.record_yield! @scheduler.yield end |
#yield_if_needed(every: 1000) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Cooperative yield point with a call-count gate.
Increments a per-thread counter and calls #yield when the counter reaches a multiple of +every+. The counter is thread-local so concurrent tasks each maintain their own independent loop counter without requiring a mutex.
216 217 218 219 220 |
# File 'lib/phronomy/runtime.rb', line 216 def yield_if_needed(every: 1000) # Delegate Thread.current access to Task so that runtime.rb stays outside # the Thread.current allowlist (Issue #302). self.yield if (Task.increment_yield_counter! % every).zero? end |