Class: Phronomy::EventLoop
- Inherits:
-
Object
- Object
- Phronomy::EventLoop
- Defined in:
- lib/phronomy/event_loop.rb
Overview
Singleton event loop that manages all FSMSession instances.
A single background thread reads from a global AsyncQueue and dispatches events to their target FSMSession. IO work (LLM calls, tool calls) must be dispatched via +Runtime.instance.spawn+ or +BlockingAdapterPool+, then post results back to the loop via #post.
Activated with: +Phronomy.configure { |c| c.event_loop = true }+
== Threading exception (see ADR-010 Rule 2)
+EventLoop+ is a deliberate exception to Phronomy's cooperative-first concurrency model. Its dispatch loop is an infinite +while @running+ loop that must never block the framework's own event processing. Running it on a shared scheduler task would consume the scheduler, preventing other tasks from running. Therefore #start creates a dedicated Runtime::ThreadScheduler — this is correct and intentional per ADR-010. No other framework component should do the same; see the ADR-010 checklist.
== Handler constraints
Handlers dispatched by the EventLoop run on the EventLoop thread. They must not:
- Perform blocking operations directly (database queries, LLM calls, HTTP requests). Schedule blocking work via +Runtime.instance.spawn+ or +BlockingAdapterPool+, then post results back with #post.
- Call +Workflow#invoke+ (or any synchronous +invoke+) from within a handler. That method would block waiting for the EventLoop to process events, causing a deadlock. Use the async pattern: post a follow-up event instead.
== Fork safety
+EventLoop.instance+ is lazily initialized. The background thread is not created until the first call, so Puma worker forking does not duplicate the thread. No +after_fork+ hook is required.
== Deadlock warning
Do NOT call +Workflow#invoke+ (in EventLoop mode) from within a workflow entry action. The entry action runs on the EventLoop thread; a nested +invoke+ would block waiting for the same thread to process events → deadlock. Use the async pattern instead: schedule work via +Runtime.instance.spawn+ or +BlockingAdapterPool+, then post events back via +Phronomy::EventLoop.instance.post(...)+.
Class Method Summary collapse
-
.current? ⇒ Boolean
private
Returns true when called from within the EventLoop dispatch task.
-
.instance ⇒ Object
Returns the singleton instance, creating and starting it on first call.
-
.reset! ⇒ Object
private
Stops and destroys the singleton.
Instance Method Summary collapse
-
#average_lag_seconds ⇒ Float
private
Returns the mean event-loop lag across all dispatched events since the loop was started.
-
#enqueue_child(agent_fsm) ⇒ nil
private
Enqueues an AgentFSM as a fire-and-forget child session.
-
#initialize ⇒ EventLoop
constructor
A new instance of EventLoop.
-
#last_lag_seconds ⇒ Float
private
Returns the most recently measured event-loop lag in seconds.
-
#max_lag_seconds ⇒ Float
private
Returns the maximum event-loop lag seen since the loop was started.
-
#post(event) ⇒ Object
private
Posts an event to the loop.
-
#register(fsm_session) ⇒ Phronomy::AsyncQueue
private
Registers an FSMSession for execution and returns a completion queue.
-
#start ⇒ self
private
Starts the EventLoop dispatch task under Runtime ownership.
-
#stop(timeout: Phronomy.configuration.event_loop_stop_grace_seconds, drain: false, force_kill: false) ⇒ Symbol
private
Stops the EventLoop dispatch task.
Constructor Details
#initialize ⇒ EventLoop
Returns a new instance of EventLoop.
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/phronomy/event_loop.rb', line 74 def initialize @queue = Phronomy::AsyncQueue.new # global event queue (thread-safe; no Mutex needed) @fsms = {} # { id => FSMSession } — EventLoop thread only @waiting = {} # { id => completion_queue } — EventLoop thread only # Mutex-backed FSM count for drain-mode shutdown. @fsm_count_mutex = Mutex.new @fsm_count_cond = ConditionVariable.new @fsm_count = 0 # Token cancelled when shutdown is requested; new child sessions receive it. @shutdown_token = Phronomy::CancellationToken.new # Fairness metrics (EventLoop thread only, except where noted) @lag_mutex = Mutex.new @last_lag_ns = 0 @max_lag_ns = 0 @dispatch_count = 0 @total_lag_ns = 0 end |
Class Method Details
.current? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns true when called from within the EventLoop dispatch task. Uses a task-local key set by the Runtime-spawned dispatch task so that the check works correctly for both thread-based and future fiber-based scheduler backends.
63 64 65 |
# File 'lib/phronomy/event_loop.rb', line 63 def self.current? Phronomy::Task.current&.name == "event-loop" end |
.instance ⇒ Object
Returns the singleton instance, creating and starting it on first call.
53 54 55 |
# File 'lib/phronomy/event_loop.rb', line 53 def self.instance @instance ||= new.tap(&:start) end |
.reset! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops and destroys the singleton. Primarily used in tests.
69 70 71 72 |
# File 'lib/phronomy/event_loop.rb', line 69 def self.reset! @instance&.stop @instance = nil end |
Instance Method Details
#average_lag_seconds ⇒ Float
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the mean event-loop lag across all dispatched events since the loop was started. Returns 0.0 when no events have been dispatched. Thread-safe.
114 115 116 117 118 119 120 |
# File 'lib/phronomy/event_loop.rb', line 114 def average_lag_seconds @lag_mutex.synchronize do return 0.0 if @dispatch_count.zero? @total_lag_ns.to_f / @dispatch_count / 1_000_000_000.0 end end |
#enqueue_child(agent_fsm) ⇒ nil
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Enqueues an AgentFSM as a fire-and-forget child session.
Unlike #register, this method:
- Is safe to call from the EventLoop thread (entry actions).
- Does NOT block — no completion queue is created.
- Delegates
:finished/:errorcleanup to the EventLoop via posted events.
163 164 165 166 167 168 |
# File 'lib/phronomy/event_loop.rb', line 163 def enqueue_child(agent_fsm) @queue.push([Event.new(type: :start, target_id: agent_fsm.id, payload: {session: agent_fsm, completion: nil}), Process.clock_gettime(Process::CLOCK_MONOTONIC, :nanosecond)]) nil end |
#last_lag_seconds ⇒ Float
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the most recently measured event-loop lag in seconds. Lag is the wall-clock time between #post and the moment the event is dequeued for dispatch. Thread-safe.
97 98 99 |
# File 'lib/phronomy/event_loop.rb', line 97 def last_lag_seconds @lag_mutex.synchronize { @last_lag_ns } / 1_000_000_000.0 end |
#max_lag_seconds ⇒ Float
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the maximum event-loop lag seen since the loop was started. Thread-safe.
105 106 107 |
# File 'lib/phronomy/event_loop.rb', line 105 def max_lag_seconds @lag_mutex.synchronize { @max_lag_ns } / 1_000_000_000.0 end |
#post(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Handler constraint: do not perform blocking operations or call +Workflow#invoke+ directly from within the handler that processes a posted event. Handlers run on the EventLoop thread; blocking there stalls all session processing. For blocking work, post a new event after the result is ready.
Posts an event to the loop. Safe to call from any thread (including IO threads). The current monotonic clock time is recorded so that the EventLoop can measure the dispatch lag when it dequeues the event.
181 182 183 |
# File 'lib/phronomy/event_loop.rb', line 181 def post(event) @queue.push([event, Process.clock_gettime(Process::CLOCK_MONOTONIC, :nanosecond)]) end |
#register(fsm_session) ⇒ Phronomy::AsyncQueue
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers an FSMSession for execution and returns a completion queue.
The session and its completion queue are handed off to the EventLoop thread via the queue payload, so +@fsms+ and +@waiting+ are exclusively written and read by the EventLoop thread. No Mutex is required.
The caller blocks on +completion_queue.pop+ to receive the final context (WorkflowContext) once the workflow finishes or halts. If an error occurred, the popped value will be an Exception — callers are responsible for re-raising it.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/phronomy/event_loop.rb', line 135 def register(fsm_session) if Phronomy::EventLoop.current? raise Phronomy::Error, "Cannot call Workflow#invoke (EventLoop mode) from within an EventLoop " \ "entry action. Schedule work via Runtime.instance.spawn or " \ "BlockingAdapterPool, then post events back via " \ "Phronomy::EventLoop.instance.post(...) instead." end completion_queue = Phronomy::AsyncQueue.new # Pass both session and completion_queue in the event payload so that the # EventLoop thread is the sole writer of @fsms and @waiting. @queue.push([Event.new(type: :start, target_id: fsm_session.id, payload: {session: fsm_session, completion: completion_queue}), Process.clock_gettime(Process::CLOCK_MONOTONIC, :nanosecond)]) completion_queue end |
#start ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Starts the EventLoop dispatch task under Runtime ownership.
The dispatch loop runs as a Task so that Runtime#shutdown can drain it together with all other in-flight tasks. The task is named +"event-loop"+ so that current? can identify it via +Task.current&.name+.
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/phronomy/event_loop.rb', line 193 def start return self if @task&.alive? # Reset shutdown state so the loop can be restarted after a stop. @shutdown_token = Phronomy::CancellationToken.new @fsm_count_mutex.synchronize { @fsm_count = 0 } @running = true # The dispatch loop must always run in a real background thread. # A cooperative scheduler (FakeScheduler/ImmediateBackend) executes tasks # synchronously on the caller's thread, which would block forever inside # the run_loop infinite loop. Create a dedicated Runtime with # ThreadScheduler to guarantee async execution regardless of the global # runtime_backend setting. thread_runtime = Phronomy::Runtime.new(scheduler: Phronomy::Runtime::ThreadScheduler.new) @task = thread_runtime.spawn(name: "event-loop") do run_loop end self end |
#stop(timeout: Phronomy.configuration.event_loop_stop_grace_seconds, drain: false, force_kill: false) ⇒ Symbol
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops the EventLoop dispatch task.
Sends a cooperative shutdown sentinel to the event queue so that the dispatch task can finish any in-flight handler before exiting. Waits up to +timeout+ seconds for a clean shutdown; if the task is still alive afterwards it is cancelled (cooperative cancellation via Task#cancel!).
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/phronomy/event_loop.rb', line 236 def stop(timeout: Phronomy.configuration.event_loop_stop_grace_seconds, drain: false, force_kill: false) @shutdown_token.cancel! status = :clean if drain # Wait for active sessions to finish, bounded by timeout. deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout @fsm_count_mutex.synchronize do while @fsm_count > 0 remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) break if remaining <= 0 @fsm_count_cond.wait(@fsm_count_mutex, remaining) end status = :drained_with_discards if @fsm_count > 0 end end @running = false @queue.push(:__stop__) # unblock queue.pop so the task can see @running = false begin @task&.join(timeout) rescue # Task may have terminated with an error (e.g. simulated crash in tests). # Suppress the re-raise so the cleanup below always runs. nil end if @task&.alive? if force_kill Phronomy.configuration.logger&.warn( "[Phronomy] EventLoop task did not stop within #{timeout}s; cancelling. " \ "This is a last resort — check for blocking operations in event handlers." ) @task.cancel! status = :force_killed else Phronomy.configuration.logger&.warn( "[Phronomy] EventLoop task did not stop within #{timeout}s; abandoning " \ "(force_kill: false). Check for blocking operations in event handlers." ) status = :timeout end end @task = nil status end |