Class: Phronomy::EventLoop

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/event_loop.rb

Overview

Singleton event loop that manages all FSMSession instances.

A single background thread reads from a global Thread::Queue and dispatches events to their target FSMSession. IO work (LLM calls, tool calls) runs in separate IO threads that post events back to the loop via EventLoop#post.

Activated with: +Phronomy.configure { |c| c.event_loop = true }+

== Fork safety

+EventLoop.instance+ is lazily initialized. The background thread is not created until the first call, so Puma worker forking does not duplicate the thread. No +after_fork+ hook is required.

== Deadlock warning

Do NOT call +Workflow#invoke+ (in EventLoop mode) from within a workflow entry action. The entry action runs on the EventLoop thread; a nested +invoke+ would block waiting for the same thread to process events → deadlock. Use the async IO pattern instead (spawn a Thread, post events back to the EventLoop).

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEventLoop

Returns a new instance of EventLoop.



38
39
40
41
42
# File 'lib/phronomy/event_loop.rb', line 38

def initialize
  @queue = Thread::Queue.new  # global event queue (thread-safe; no Mutex needed)
  @fsms = {}                 # { id => FSMSession }     — EventLoop thread only
  @waiting = {}                 # { id => completion_queue } — EventLoop thread only
end

Class Method Details

.instanceObject

Returns the singleton instance, creating and starting it on first call.



27
28
29
# File 'lib/phronomy/event_loop.rb', line 27

def self.instance
  @instance ||= new.tap(&:start)
end

.reset!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops and destroys the singleton. Primarily used in tests.



33
34
35
36
# File 'lib/phronomy/event_loop.rb', line 33

def self.reset!
  @instance&.stop
  @instance = nil
end

Instance Method Details

#enqueue_child(agent_fsm) ⇒ nil

Enqueues an AgentFSM as a fire-and-forget child session.

Unlike #register, this method:

  • Is safe to call from the EventLoop thread (entry actions).
  • Does NOT block — no completion queue is created.
  • Delegates :finished/:error cleanup to the EventLoop via posted events.

Parameters:

Returns:

  • (nil)


81
82
83
84
85
# File 'lib/phronomy/event_loop.rb', line 81

def enqueue_child(agent_fsm)
  @queue.push(Event.new(type: :start, target_id: agent_fsm.id,
    payload: {session: agent_fsm, completion: nil}))
  nil
end

#post(event) ⇒ Object

Posts an event to the loop. Safe to call from any thread (including IO threads).

Parameters:



90
91
92
# File 'lib/phronomy/event_loop.rb', line 90

def post(event)
  @queue.push(event)
end

#register(fsm_session) ⇒ Thread::Queue

Registers an FSMSession for execution and returns a completion queue.

The session and its completion queue are handed off to the EventLoop thread via the queue payload, so +@fsms+ and +@waiting+ are exclusively written and read by the EventLoop thread. No Mutex is required.

The caller blocks on +completion_queue.pop+ to receive the final context (WorkflowContext) once the workflow finishes or halts. If an error occurred, the popped value will be an Exception — callers are responsible for re-raising it.

Parameters:

Returns:

  • (Thread::Queue)

    resolves to final/halted context, or an Exception



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/phronomy/event_loop.rb', line 56

def register(fsm_session)
  if Thread.current[:phronomy_event_loop_thread]
    raise Phronomy::Error,
      "Cannot call Workflow#invoke (EventLoop mode) from within an EventLoop " \
      "entry action. Use the async IO pattern: spawn a Thread, post events " \
      "back via Phronomy::EventLoop.instance.post(...) instead."
  end

  completion_queue = Thread::Queue.new
  # Pass both session and completion_queue in the event payload so that the
  # EventLoop thread is the sole writer of @fsms and @waiting.
  @queue.push(Event.new(type: :start, target_id: fsm_session.id,
    payload: {session: fsm_session, completion: completion_queue}))
  completion_queue
end

#startself

Starts the background event loop thread.

Returns:

  • (self)


96
97
98
99
100
101
102
103
104
# File 'lib/phronomy/event_loop.rb', line 96

def start
  @running = true
  @thread = Thread.new do
    Thread.current[:phronomy_event_loop_thread] = true
    run_loop
  end
  @thread.abort_on_exception = false
  self
end

#stopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops the background thread. Used in tests only.



108
109
110
111
112
# File 'lib/phronomy/event_loop.rb', line 108

def stop
  @running = false
  @thread&.kill
  @thread = nil
end