Class: Phronomy::EventLoop

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/event_loop.rb

Overview

Singleton event loop that manages all FSMSession instances.

A single background thread reads from a global Thread::Queue and dispatches events to their target FSMSession. IO work (LLM calls, tool calls) runs in separate IO threads that post events back to the loop via EventLoop#post.

Activated with: +Phronomy.configure { |c| c.event_loop = true }+

== Fork safety

+EventLoop.instance+ is lazily initialized. The background thread is not created until the first call, so Puma worker forking does not duplicate the thread. No +after_fork+ hook is required.

== Deadlock warning

Do NOT call +Workflow#invoke+ (in EventLoop mode) from within a workflow entry action. The entry action runs on the EventLoop thread; a nested +invoke+ would block waiting for the same thread to process events → deadlock. Use the async IO pattern instead (spawn a Thread, post events back to the EventLoop).

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEventLoop

Returns a new instance of EventLoop.



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/phronomy/event_loop.rb', line 38

def initialize
  @queue = Thread::Queue.new  # global event queue (thread-safe; no Mutex needed)
  @fsms = {}                  # { id => FSMSession }     — EventLoop thread only
  @waiting = {}               # { id => completion_queue } — EventLoop thread only
  # Mutex-backed FSM count for drain-mode shutdown.
  @fsm_count_mutex = Mutex.new
  @fsm_count_cond = ConditionVariable.new
  @fsm_count = 0
  # Token cancelled when shutdown is requested; new child sessions receive it.
  @shutdown_token = Phronomy::CancellationToken.new
end

Class Method Details

.instanceObject

Returns the singleton instance, creating and starting it on first call.



27
28
29
# File 'lib/phronomy/event_loop.rb', line 27

def self.instance
  @instance ||= new.tap(&:start)
end

.reset!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops and destroys the singleton. Primarily used in tests.



33
34
35
36
# File 'lib/phronomy/event_loop.rb', line 33

def self.reset!
  @instance&.stop
  @instance = nil
end

Instance Method Details

#enqueue_child(agent_fsm) ⇒ nil

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Enqueues an AgentFSM as a fire-and-forget child session.

Unlike #register, this method:

  • Is safe to call from the EventLoop thread (entry actions).
  • Does NOT block — no completion queue is created.
  • Delegates :finished/:error cleanup to the EventLoop via posted events.

Parameters:

Returns:

  • (nil)


89
90
91
92
93
# File 'lib/phronomy/event_loop.rb', line 89

def enqueue_child(agent_fsm)
  @queue.push(Event.new(type: :start, target_id: agent_fsm.id,
    payload: {session: agent_fsm, completion: nil}))
  nil
end

#post(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Posts an event to the loop. Safe to call from any thread (including IO threads).

Parameters:



99
100
101
# File 'lib/phronomy/event_loop.rb', line 99

def post(event)
  @queue.push(event)
end

#register(fsm_session) ⇒ Thread::Queue

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers an FSMSession for execution and returns a completion queue.

The session and its completion queue are handed off to the EventLoop thread via the queue payload, so +@fsms+ and +@waiting+ are exclusively written and read by the EventLoop thread. No Mutex is required.

The caller blocks on +completion_queue.pop+ to receive the final context (WorkflowContext) once the workflow finishes or halts. If an error occurred, the popped value will be an Exception — callers are responsible for re-raising it.

Parameters:

Returns:

  • (Thread::Queue)

    resolves to final/halted context, or an Exception



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/phronomy/event_loop.rb', line 63

def register(fsm_session)
  if Thread.current[:phronomy_event_loop_thread]
    raise Phronomy::Error,
      "Cannot call Workflow#invoke (EventLoop mode) from within an EventLoop " \
      "entry action. Use the async IO pattern: spawn a Thread, post events " \
      "back via Phronomy::EventLoop.instance.post(...) instead."
  end

  completion_queue = Thread::Queue.new
  # Pass both session and completion_queue in the event payload so that the
  # EventLoop thread is the sole writer of @fsms and @waiting.
  @queue.push(Event.new(type: :start, target_id: fsm_session.id,
    payload: {session: fsm_session, completion: completion_queue}))
  completion_queue
end

#startself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Starts the background event loop thread.

Returns:

  • (self)


106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/phronomy/event_loop.rb', line 106

def start
  return self if @thread&.alive?

  # Reset shutdown state so the loop can be restarted after a stop.
  @shutdown_token = Phronomy::CancellationToken.new
  @fsm_count_mutex.synchronize { @fsm_count = 0 }
  @running = true
  @thread = Thread.new do
    Thread.current[:phronomy_event_loop_thread] = true
    run_loop
  end
  @thread.abort_on_exception = false
  self
end

#stop(timeout: Phronomy.configuration.event_loop_stop_grace_seconds, drain: false, force_kill: false) ⇒ Symbol

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops the background thread. Used in tests only.

Sends a cooperative shutdown sentinel to the event queue so that the worker thread can finish any in-flight handler before exiting. Waits up to +timeout+ seconds for a clean shutdown; if the thread is still alive afterwards it is force-killed as a last resort.

Parameters:

  • timeout (Numeric) (defaults to: Phronomy.configuration.event_loop_stop_grace_seconds)

    seconds to wait for cooperative shutdown. Defaults to +Phronomy.configuration.event_loop_stop_grace_seconds+ (5 s).

  • drain (Boolean) (defaults to: false)

    when +true+, wait for all active FSMSessions to complete before signalling the loop to stop. Bounded by +timeout+. Defaults to +false+.

  • force_kill (Boolean) (defaults to: false)

    when +true+, the worker thread is killed with +Thread#kill+ if it does not stop within +timeout+. When +false+ (default), the thread is never killed; the method returns +:timeout+ instead. +false+ is safer for production because +Thread#kill+ can interrupt +ensure+ blocks.

Returns:

  • (Symbol)

    shutdown status:

    • +:clean+ — loop exited cooperatively with no active sessions discarded
    • +:drained_with_discards+ — drain mode requested but sessions remained; they were discarded and the loop was stopped
    • +:timeout+ — the worker thread did not stop in time and +force_kill:+ is +false+
    • +:force_killed+ — the worker thread did not stop in time and was killed


145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/phronomy/event_loop.rb', line 145

def stop(timeout: Phronomy.configuration.event_loop_stop_grace_seconds, drain: false, force_kill: false)
  @shutdown_token.cancel!
  status = :clean

  if drain
    # Wait for active sessions to finish, bounded by timeout.
    deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
    @fsm_count_mutex.synchronize do
      while @fsm_count > 0
        remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
        break if remaining <= 0
        @fsm_count_cond.wait(@fsm_count_mutex, remaining)
      end
      status = :drained_with_discards if @fsm_count > 0
    end
  end

  @running = false
  @queue.push(:__stop__)   # unblock queue.pop so the worker can see @running = false
  begin
    @thread&.join(timeout)
  rescue
    # Thread may have terminated with an exception (e.g. simulated crash in
    # tests). Suppress the re-raise so the cleanup below always runs.
    nil
  end
  if @thread&.alive?
    if force_kill
      Phronomy.configuration.logger&.warn(
        "[Phronomy] EventLoop thread did not stop within #{timeout}s; force-killing. " \
        "This is a last resort — check for blocking operations in event handlers."
      )
      @thread.kill
      status = :force_killed
    else
      Phronomy.configuration.logger&.warn(
        "[Phronomy] EventLoop thread did not stop within #{timeout}s; abandoning " \
        "(force_kill: false). Check for blocking operations in event handlers."
      )
      status = :timeout
    end
  end
  @thread = nil
  status
end