Class: Phronomy::EventLoop
- Inherits:
-
Object
- Object
- Phronomy::EventLoop
- Defined in:
- lib/phronomy/event_loop.rb
Overview
Singleton event loop that manages all FSMSession instances.
A single background thread reads from a global Thread::Queue and dispatches events to their target FSMSession. IO work (LLM calls, tool calls) runs in separate IO threads that post events back to the loop via EventLoop#post.
Activated with: +Phronomy.configure { |c| c.event_loop = true }+
== Fork safety
+EventLoop.instance+ is lazily initialized. The background thread is not created until the first call, so Puma worker forking does not duplicate the thread. No +after_fork+ hook is required.
== Deadlock warning
Do NOT call +Workflow#invoke+ (in EventLoop mode) from within a workflow entry action. The entry action runs on the EventLoop thread; a nested +invoke+ would block waiting for the same thread to process events → deadlock. Use the async IO pattern instead (spawn a Thread, post events back to the EventLoop).
Class Method Summary collapse
-
.instance ⇒ Object
Returns the singleton instance, creating and starting it on first call.
-
.reset! ⇒ Object
private
Stops and destroys the singleton.
Instance Method Summary collapse
-
#enqueue_child(agent_fsm) ⇒ nil
Enqueues an AgentFSM as a fire-and-forget child session.
-
#initialize ⇒ EventLoop
constructor
A new instance of EventLoop.
-
#post(event) ⇒ Object
Posts an event to the loop.
-
#register(fsm_session) ⇒ Thread::Queue
Registers an FSMSession for execution and returns a completion queue.
-
#start ⇒ self
Starts the background event loop thread.
-
#stop ⇒ Object
private
Stops the background thread.
Constructor Details
#initialize ⇒ EventLoop
Returns a new instance of EventLoop.
38 39 40 41 42 |
# File 'lib/phronomy/event_loop.rb', line 38 def initialize @queue = Thread::Queue.new # global event queue (thread-safe; no Mutex needed) @fsms = {} # { id => FSMSession } — EventLoop thread only @waiting = {} # { id => completion_queue } — EventLoop thread only end |
Class Method Details
.instance ⇒ Object
Returns the singleton instance, creating and starting it on first call.
27 28 29 |
# File 'lib/phronomy/event_loop.rb', line 27 def self.instance @instance ||= new.tap(&:start) end |
.reset! ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops and destroys the singleton. Primarily used in tests.
33 34 35 36 |
# File 'lib/phronomy/event_loop.rb', line 33 def self.reset! @instance&.stop @instance = nil end |
Instance Method Details
#enqueue_child(agent_fsm) ⇒ nil
Enqueues an AgentFSM as a fire-and-forget child session.
Unlike #register, this method:
- Is safe to call from the EventLoop thread (entry actions).
- Does NOT block — no completion queue is created.
- Delegates
:finished/:errorcleanup to the EventLoop via posted events.
81 82 83 84 85 |
# File 'lib/phronomy/event_loop.rb', line 81 def enqueue_child(agent_fsm) @queue.push(Event.new(type: :start, target_id: agent_fsm.id, payload: {session: agent_fsm, completion: nil})) nil end |
#post(event) ⇒ Object
Posts an event to the loop. Safe to call from any thread (including IO threads).
90 91 92 |
# File 'lib/phronomy/event_loop.rb', line 90 def post(event) @queue.push(event) end |
#register(fsm_session) ⇒ Thread::Queue
Registers an FSMSession for execution and returns a completion queue.
The session and its completion queue are handed off to the EventLoop thread via the queue payload, so +@fsms+ and +@waiting+ are exclusively written and read by the EventLoop thread. No Mutex is required.
The caller blocks on +completion_queue.pop+ to receive the final context (WorkflowContext) once the workflow finishes or halts. If an error occurred, the popped value will be an Exception — callers are responsible for re-raising it.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/phronomy/event_loop.rb', line 56 def register(fsm_session) if Thread.current[:phronomy_event_loop_thread] raise Phronomy::Error, "Cannot call Workflow#invoke (EventLoop mode) from within an EventLoop " \ "entry action. Use the async IO pattern: spawn a Thread, post events " \ "back via Phronomy::EventLoop.instance.post(...) instead." end completion_queue = Thread::Queue.new # Pass both session and completion_queue in the event payload so that the # EventLoop thread is the sole writer of @fsms and @waiting. @queue.push(Event.new(type: :start, target_id: fsm_session.id, payload: {session: fsm_session, completion: completion_queue})) completion_queue end |
#start ⇒ self
Starts the background event loop thread.
96 97 98 99 100 101 102 103 104 |
# File 'lib/phronomy/event_loop.rb', line 96 def start @running = true @thread = Thread.new do Thread.current[:phronomy_event_loop_thread] = true run_loop end @thread.abort_on_exception = false self end |
#stop ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Stops the background thread. Used in tests only.
108 109 110 111 112 |
# File 'lib/phronomy/event_loop.rb', line 108 def stop @running = false @thread&.kill @thread = nil end |