Class: Phronomy::EventLoop
- Inherits:
-
Object
- Object
- Phronomy::EventLoop
- Defined in:
- lib/phronomy/event_loop.rb
Overview
Singleton event loop that manages all FSMSession instances.
A single background thread reads from a global Thread::Queue and dispatches events to their target FSMSession. IO work (LLM calls, tool calls) runs in separate IO threads that post events back to the loop via EventLoop#post.
Activated with: +Phronomy.configure { |c| c.event_loop = true }+
== Fork safety
+EventLoop.instance+ is lazily initialized. The background thread is not created until the first call, so Puma worker forking does not duplicate the thread. No +after_fork+ hook is required.
== Deadlock warning
Do NOT call +Workflow#invoke+ (in EventLoop mode) from within a workflow entry action. The entry action runs on the EventLoop thread; a nested +invoke+ would block waiting for the same thread to process events → deadlock. Use the async IO pattern instead (spawn a Thread, post events back to the EventLoop).
Class Method Summary collapse
-
.instance ⇒ Object
Returns the singleton instance, creating and starting it on first call.
-
.reset! ⇒ Object
private
Stops and destroys the singleton.
Instance Method Summary collapse
-
#enqueue_child(agent_fsm) ⇒ nil
private
Enqueues an AgentFSM as a fire-and-forget child session.
-
#initialize ⇒ EventLoop
constructor
A new instance of EventLoop.
-
#post(event) ⇒ Object
private
Posts an event to the loop.
-
#register(fsm_session) ⇒ Thread::Queue
private
Registers an FSMSession for execution and returns a completion queue.
-
#start ⇒ self
private
Starts the background event loop thread.
-
#stop(timeout: Phronomy.configuration.event_loop_stop_grace_seconds, drain: false, force_kill: false) ⇒ Symbol
private
Stops the background thread.
Constructor Details
#initialize ⇒ EventLoop
Returns a new instance of EventLoop.
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/phronomy/event_loop.rb', line 38 def initialize @queue = Thread::Queue.new # global event queue (thread-safe; no Mutex needed) @fsms = {} # { id => FSMSession } — EventLoop thread only @waiting = {} # { id => completion_queue } — EventLoop thread only # Mutex-backed FSM count for drain-mode shutdown. @fsm_count_mutex = Mutex.new @fsm_count_cond = ConditionVariable.new @fsm_count = 0 # Token cancelled when shutdown is requested; new child sessions receive it. @shutdown_token = Phronomy::CancellationToken.new end |
Class Method Details
.instance ⇒ Object
Returns the singleton instance, creating and starting it on first call.
27 28 29 |
# File 'lib/phronomy/event_loop.rb', line 27 def self.instance @instance ||= new.tap(&:start) end |
.reset! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops and destroys the singleton. Primarily used in tests.
33 34 35 36 |
# File 'lib/phronomy/event_loop.rb', line 33 def self.reset! @instance&.stop @instance = nil end |
Instance Method Details
#enqueue_child(agent_fsm) ⇒ nil
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Enqueues an AgentFSM as a fire-and-forget child session.
Unlike #register, this method:
- Is safe to call from the EventLoop thread (entry actions).
- Does NOT block — no completion queue is created.
- Delegates
:finished/:errorcleanup to the EventLoop via posted events.
89 90 91 92 93 |
# File 'lib/phronomy/event_loop.rb', line 89 def enqueue_child(agent_fsm) @queue.push(Event.new(type: :start, target_id: agent_fsm.id, payload: {session: agent_fsm, completion: nil})) nil end |
#post(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Posts an event to the loop. Safe to call from any thread (including IO threads).
99 100 101 |
# File 'lib/phronomy/event_loop.rb', line 99 def post(event) @queue.push(event) end |
#register(fsm_session) ⇒ Thread::Queue
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Registers an FSMSession for execution and returns a completion queue.
The session and its completion queue are handed off to the EventLoop thread via the queue payload, so +@fsms+ and +@waiting+ are exclusively written and read by the EventLoop thread. No Mutex is required.
The caller blocks on +completion_queue.pop+ to receive the final context (WorkflowContext) once the workflow finishes or halts. If an error occurred, the popped value will be an Exception — callers are responsible for re-raising it.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/phronomy/event_loop.rb', line 63 def register(fsm_session) if Thread.current[:phronomy_event_loop_thread] raise Phronomy::Error, "Cannot call Workflow#invoke (EventLoop mode) from within an EventLoop " \ "entry action. Use the async IO pattern: spawn a Thread, post events " \ "back via Phronomy::EventLoop.instance.post(...) instead." end completion_queue = Thread::Queue.new # Pass both session and completion_queue in the event payload so that the # EventLoop thread is the sole writer of @fsms and @waiting. @queue.push(Event.new(type: :start, target_id: fsm_session.id, payload: {session: fsm_session, completion: completion_queue})) completion_queue end |
#start ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Starts the background event loop thread.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/phronomy/event_loop.rb', line 106 def start return self if @thread&.alive? # Reset shutdown state so the loop can be restarted after a stop. @shutdown_token = Phronomy::CancellationToken.new @fsm_count_mutex.synchronize { @fsm_count = 0 } @running = true @thread = Thread.new do Thread.current[:phronomy_event_loop_thread] = true run_loop end @thread.abort_on_exception = false self end |
#stop(timeout: Phronomy.configuration.event_loop_stop_grace_seconds, drain: false, force_kill: false) ⇒ Symbol
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops the background thread. Used in tests only.
Sends a cooperative shutdown sentinel to the event queue so that the worker thread can finish any in-flight handler before exiting. Waits up to +timeout+ seconds for a clean shutdown; if the thread is still alive afterwards it is force-killed as a last resort.
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/phronomy/event_loop.rb', line 145 def stop(timeout: Phronomy.configuration.event_loop_stop_grace_seconds, drain: false, force_kill: false) @shutdown_token.cancel! status = :clean if drain # Wait for active sessions to finish, bounded by timeout. deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout @fsm_count_mutex.synchronize do while @fsm_count > 0 remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) break if remaining <= 0 @fsm_count_cond.wait(@fsm_count_mutex, remaining) end status = :drained_with_discards if @fsm_count > 0 end end @running = false @queue.push(:__stop__) # unblock queue.pop so the worker can see @running = false begin @thread&.join(timeout) rescue # Thread may have terminated with an exception (e.g. simulated crash in # tests). Suppress the re-raise so the cleanup below always runs. nil end if @thread&.alive? if force_kill Phronomy.configuration.logger&.warn( "[Phronomy] EventLoop thread did not stop within #{timeout}s; force-killing. " \ "This is a last resort — check for blocking operations in event handlers." ) @thread.kill status = :force_killed else Phronomy.configuration.logger&.warn( "[Phronomy] EventLoop thread did not stop within #{timeout}s; abandoning " \ "(force_kill: false). Check for blocking operations in event handlers." ) status = :timeout end end @thread = nil status end |