Class: Phronomy::Runtime
- Inherits:
-
Object
- Object
- Phronomy::Runtime
- Defined in:
- lib/phronomy/runtime.rb,
lib/phronomy/runtime/scheduler.rb,
lib/phronomy/runtime/timer_queue.rb,
lib/phronomy/runtime/gate_registry.rb,
lib/phronomy/runtime/pool_registry.rb,
lib/phronomy/runtime/task_registry.rb,
lib/phronomy/runtime/timer_service.rb,
lib/phronomy/runtime/fake_scheduler.rb,
lib/phronomy/runtime/runtime_metrics.rb,
lib/phronomy/runtime/thread_scheduler.rb,
lib/phronomy/runtime/deterministic_scheduler.rb,
lib/phronomy/runtime/scheduler_timer_adapter.rb
Overview
Central authority for concurrent primitives.
+Runtime+ is the single place that creates Tasks, TaskGroups, and manages the lifecycle of all concurrency in Phronomy. It owns:
- a pluggable Scheduler (default: ThreadScheduler)
- a task registry for graceful shutdown
- the shared BlockingAdapterPool
In production, use the process-wide singleton via Runtime.instance. In tests, construct a Runtime with a FakeScheduler to run tasks synchronously without spawning additional threads:
Defined Under Namespace
Classes: DeterministicScheduler, FakeScheduler, GateRegistry, PoolRegistry, RuntimeMetrics, Scheduler, SchedulerTimerAdapter, TaskRegistry, ThreadScheduler, TimerQueue, TimerService
Instance Attribute Summary collapse
-
#scheduler ⇒ Scheduler
readonly
The scheduler backing this runtime instance.
Class Method Summary collapse
-
.in_scheduler_context? ⇒ Boolean
private
Returns +true+ when the calling thread is executing inside an active scheduler task (i.e. Task.current is non-nil).
-
.instance ⇒ Runtime
private
Returns the process-wide default Runtime.
-
.instance=(runtime) ⇒ Runtime
private
Replaces the process-wide default Runtime.
Instance Method Summary collapse
-
#blocking_io(pool_size: 10, queue_size: 100) ⇒ BlockingAdapterPool
private
Returns the shared BlockingAdapterPool for this Runtime.
-
#gate(name) ⇒ ConcurrencyGate
private
Returns (or lazily creates) the ConcurrencyGate for the named resource.
-
#initialize(scheduler: ThreadScheduler.new) ⇒ Runtime
constructor
private
A new instance of Runtime.
-
#non_yield_threshold_violation_count ⇒ Integer
private
Number of times a task has exceeded the CPU-bound detection threshold (i.e. ran longer than +blocking_detect_threshold_ms+ without yielding).
-
#pool(name, size: 10, queue_size: 100) ⇒ BlockingAdapterPool
private
Returns (or lazily creates) a named BlockingAdapterPool.
-
#reset_gate(name) ⇒ void
private
Drops the cached gate for +name+ so that the next call to #gate rebuilds it from the current configuration.
-
#shutdown ⇒ void
private
Waits for all registered tasks to finish, then shuts down the EventLoop (if active), blocking adapter pool, named pools, and timer queue (if they were started).
-
#spawn(name: nil) { ... } ⇒ Task
private
Spawns a single Task using the runtime's scheduler.
-
#task_group(limit: Float::INFINITY, failure_policy: :fail_fast) ⇒ TaskGroup
private
Creates a new TaskGroup with an optional concurrency cap.
-
#task_snapshot ⇒ Hash{Symbol => Numeric}
private
Returns a snapshot of task-centric metrics for the current Runtime.
-
#timer_queue ⇒ TimerQueue, SchedulerTimerAdapter
private
Returns the shared timer queue for this Runtime.
-
#yield ⇒ void
private
Cooperative yield point.
-
#yield_if_needed(every: 1000) ⇒ void
private
Cooperative yield point with a call-count gate.
Constructor Details
#initialize(scheduler: ThreadScheduler.new) ⇒ Runtime
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of Runtime.
108 109 110 111 112 113 114 115 |
# File 'lib/phronomy/runtime.rb', line 108 def initialize(scheduler: ThreadScheduler.new) @scheduler = scheduler @task_registry = TaskRegistry.new @metrics = RuntimeMetrics.new @gate_registry = GateRegistry.new @pool_registry = PoolRegistry.new @timer_service = TimerService.new(scheduler) end |
Instance Attribute Details
#scheduler ⇒ Scheduler (readonly)
The scheduler backing this runtime instance.
104 105 106 |
# File 'lib/phronomy/runtime.rb', line 104 def scheduler @scheduler end |
Class Method Details
.in_scheduler_context? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns +true+ when the calling thread is executing inside an active scheduler task (i.e. Task.current is non-nil). Code running inside a #spawn block is always in a scheduler context.
Use this to detect potential scheduler-blocking calls: if Phronomy::Runtime.in_scheduler_context? Phronomy.configuration.logger&.warn("blocking call inside scheduler task") end
98 99 100 |
# File 'lib/phronomy/runtime.rb', line 98 def self.in_scheduler_context? !Task.current.nil? end |
.instance ⇒ Runtime
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the process-wide default Runtime.
Auto-creates an instance using the scheduler backend specified by +Phronomy.configuration.runtime_backend+:
- +:thread+ (default) — ThreadScheduler (one OS thread per task)
- +:immediate+ — FakeScheduler (synchronous, no extra threads)
- +:fiber+ — DeterministicScheduler in autorun mode (EXPERIMENTAL; Fiber-based synchronous execution; not yet suitable for production because it uses virtual time rather than real wall-clock timers)
- +:cooperative+ — deprecated alias for +:immediate+
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/phronomy/runtime.rb', line 52 def self.instance @instance ||= begin scheduler = case Phronomy.configuration.runtime_backend when :cooperative Phronomy.configuration.logger&.warn( "[phronomy] runtime_backend: :cooperative is a deprecated alias for :immediate. " \ "Use :immediate for synchronous/test execution. " \ ":cooperative will be reassigned when a real cooperative Fiber-based scheduler is available." ) FakeScheduler.new when :immediate FakeScheduler.new when :fiber Phronomy.configuration.logger&.warn( "[phronomy] runtime_backend: :fiber uses DeterministicScheduler in autorun mode. " \ "This is an EXPERIMENTAL Fiber-based cooperative scheduler. " \ "Wall-clock timer integration is available via SchedulerTimerAdapter (Issues #331, #337). " \ "Not recommended for production use." ) DeterministicScheduler.new(autorun: true) else ThreadScheduler.new end new(scheduler: scheduler) end end |
.instance=(runtime) ⇒ Runtime
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Replaces the process-wide default Runtime. Useful in tests.
83 84 85 |
# File 'lib/phronomy/runtime.rb', line 83 def self.instance=(runtime) @instance = runtime end |
Instance Method Details
#blocking_io(pool_size: 10, queue_size: 100) ⇒ BlockingAdapterPool
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the shared BlockingAdapterPool for this Runtime. All blocking I/O (LLM HTTP, MCP, ActiveRecord, Redis) should be submitted through this pool.
Pool settings default to 10 workers / 100-deep queue. Override by constructing a Runtime with custom pool options or by replacing the shared Runtime via instance= in tests.
295 296 297 |
# File 'lib/phronomy/runtime.rb', line 295 def blocking_io(pool_size: 10, queue_size: 100) @pool_registry.default_pool(pool_size: pool_size, queue_size: queue_size) end |
#gate(name) ⇒ ConcurrencyGate
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns (or lazily creates) the ConcurrencyGate for the named resource.
Gate caps are read from the global Configuration when the gate is first accessed; subsequent calls return the cached gate. To change the cap at runtime, call #reset_gate first.
126 127 128 |
# File 'lib/phronomy/runtime.rb', line 126 def gate(name) @gate_registry.get(name.to_sym) end |
#non_yield_threshold_violation_count ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Number of times a task has exceeded the CPU-bound detection threshold (i.e. ran longer than +blocking_detect_threshold_ms+ without yielding). Resets to 0 when the Runtime is recreated.
181 182 183 |
# File 'lib/phronomy/runtime.rb', line 181 def non_yield_threshold_violation_count @metrics.starvation_count end |
#pool(name, size: 10, queue_size: 100) ⇒ BlockingAdapterPool
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns (or lazily creates) a named BlockingAdapterPool.
Named pools allow per-subsystem thread-budget control and observability. Recommended pool names: +:llm+, +:mcp+, +:db+, +:redis+, +:tool+. Each pool gets its own dedicated worker threads labelled with the pool name.
314 315 316 |
# File 'lib/phronomy/runtime.rb', line 314 def pool(name, size: 10, queue_size: 100) @pool_registry.named_pool(name, size: size, queue_size: queue_size) end |
#reset_gate(name) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Drops the cached gate for +name+ so that the next call to #gate rebuilds it from the current configuration. Useful in tests.
136 137 138 |
# File 'lib/phronomy/runtime.rb', line 136 def reset_gate(name) @gate_registry.reset(name.to_sym) end |
#shutdown ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Waits for all registered tasks to finish, then shuts down the EventLoop (if active), blocking adapter pool, named pools, and timer queue (if they were started).
When EventLoop mode is enabled, all pending Workflow and Agent FSM events are drained before pools are shut down, ensuring in-flight sessions complete cleanly.
Call this before process exit to avoid leaving orphaned threads or pending work items.
351 352 353 354 355 356 357 358 359 360 |
# File 'lib/phronomy/runtime.rb', line 351 def shutdown @task_registry.drain # Drain EventLoop events before stopping pools so that in-flight # Workflow / Agent FSM sessions can complete their final LLM calls. if Phronomy.configuration.event_loop Phronomy::EventLoop.instance.stop(drain: true) end @pool_registry.shutdown @timer_service.shutdown end |
#spawn(name: nil) { ... } ⇒ Task
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Spawns a single Task using the runtime's scheduler.
The spawned task is registered in the task registry so #shutdown can wait for it to complete. The task is automatically deregistered from the registry when it finishes (success, failure, or cancellation) so long-lived runtimes do not accumulate stale references.
Task names beginning with a recognised type prefix are counted in the task-centric metrics returned by #task_snapshot. Recognised prefixes: +agent-+, +tool-+, +workflow-+, +rag-+, +llm-+, +vector-+.
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/phronomy/runtime.rb', line 233 def spawn(name: nil, &block) type = _task_type(name) spawn_at = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) @metrics.record_start(type) task = @scheduler.spawn(name: name, parent: Task.current) do run_start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) @metrics.record_wait(run_start - spawn_at) begin result = block.call @metrics.record_end(type, :completed, run_start) result rescue CancellationError @metrics.record_end(type, :cancelled, run_start) raise rescue => e @metrics.record_end(type, :failed, run_start) raise e ensure current = Task.current @task_registry.deregister(current) if current end end @task_registry.register(task) task end |
#task_group(limit: Float::INFINITY, failure_policy: :fail_fast) ⇒ TaskGroup
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Creates a new TaskGroup with an optional concurrency cap.
213 214 215 |
# File 'lib/phronomy/runtime.rb', line 213 def task_group(limit: Float::INFINITY, failure_policy: :fail_fast) TaskGroup.new(limit: limit, failure_policy: failure_policy, runtime: self) end |
#task_snapshot ⇒ Hash{Symbol => Numeric}
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a snapshot of task-centric metrics for the current Runtime.
| Key | Description |
|---|---|
active_agent_tasks |
currently running agent spawns |
active_tool_tasks |
currently running tool spawns |
active_workflow_tasks |
currently running workflow spawns |
active_rag_tasks |
currently running RAG fetches |
active_llm_tasks |
currently running LLM calls |
task_wait_time_p50_ms |
p50 spawn-to-start latency (ms) |
task_wait_time_p95_ms |
p95 spawn-to-start latency (ms) |
task_run_time_p50_ms |
p50 execution duration (ms) |
task_run_time_p95_ms |
p95 execution duration (ms) |
cancelled_tasks |
total cancelled task count |
failed_tasks |
total failed task count |
non_yield_threshold_violation_count |
cumulative count of tasks that ran past blocking_detect_threshold_ms without yielding |
279 280 281 |
# File 'lib/phronomy/runtime.rb', line 279 def task_snapshot @metrics.snapshot end |
#timer_queue ⇒ TimerQueue, SchedulerTimerAdapter
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the shared timer queue for this Runtime.
When the scheduler is a DeterministicScheduler (e.g. the +:fiber+ runtime backend), returns a SchedulerTimerAdapter that integrates with the scheduler's tick cycle instead of spawning a background OS thread. This is the first concrete step of the TimerQueue scheduler-tick integration described in ADR-010 (Issue #331).
For all other schedulers, returns a TimerQueue backed by a single background thread.
All deadline-based cancellation should be registered here instead of spawning one-off sleep threads. Lazily created on first access.
334 335 336 |
# File 'lib/phronomy/runtime.rb', line 334 def timer_queue @timer_service.timer_queue end |
#yield ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Cooperative yield point.
Signals the scheduler that the current task is willing to give up CPU time so that other ready tasks can run. On the default ThreadScheduler this calls +Thread.pass+. On a future fiber-based scheduler this would switch to the next runnable fiber.
When +blocking_detect_threshold_ms+ is configured, checks whether the current task has exceeded that threshold without yielding; if so, emits a warning via the configured logger and increments +non_yield_threshold_violation_count+.
Call this inside tight loops or CPU-intensive sections of tool +execute+ methods and Workflow actions to keep the scheduler responsive.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/phronomy/runtime.rb', line 157 def yield if (threshold = Phronomy.configuration.blocking_detect_threshold_ms) slice_start = Task.current_cpu_slice_start_ms if slice_start elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) - slice_start if elapsed > threshold name = Task.current&.name || "unknown" Phronomy.configuration.logger&.warn( "[Phronomy] CPU-bound task detected: '#{name}' ran #{elapsed.round}ms " \ "without yielding (threshold: #{threshold}ms)" ) @metrics.increment_starvation end end end Task.record_yield! @scheduler.yield end |
#yield_if_needed(every: 1000) ⇒ void
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
This method returns an undefined value.
Cooperative yield point with a call-count gate.
Increments a per-thread counter and calls #yield when the counter reaches a multiple of +every+. The counter is thread-local so concurrent tasks each maintain their own independent loop counter without requiring a mutex.
201 202 203 204 205 |
# File 'lib/phronomy/runtime.rb', line 201 def yield_if_needed(every: 1000) # Delegate Thread.current access to Task so that runtime.rb stays outside # the Thread.current allowlist (Issue #302). self.yield if (Task.increment_yield_counter! % every).zero? end |