Class: Phronomy::EventLoop

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/event_loop.rb

Overview

Singleton event loop that manages all FSMSession instances.

A single background thread reads from a global AsyncQueue and dispatches events to their target FSMSession. IO work (LLM calls, tool calls) must be dispatched via +Runtime.instance.spawn+ or +BlockingAdapterPool+, then post results back to the loop via #post.

Activated with: +Phronomy.configure { |c| c.event_loop = true }+

== Threading exception (see ADR-010 Rule 2)

+EventLoop+ is a deliberate exception to Phronomy's cooperative-first concurrency model. Its dispatch loop is an infinite +while @running+ loop that must never block the framework's own event processing. Running it on a shared scheduler task would consume the scheduler, preventing other tasks from running. Therefore #start creates a dedicated Runtime::ThreadScheduler — this is correct and intentional per ADR-010. No other framework component should do the same; see the ADR-010 checklist.

== Handler constraints

Handlers dispatched by the EventLoop run on the EventLoop thread. They must not:

  • Perform blocking operations directly (database queries, LLM calls, HTTP requests). Schedule blocking work via +Runtime.instance.spawn+ or +BlockingAdapterPool+, then post results back with #post.
  • Call +Workflow#invoke+ (or any synchronous +invoke+) from within a handler. That method would block waiting for the EventLoop to process events, causing a deadlock. Use the async pattern: post a follow-up event instead.

== Fork safety

+EventLoop.instance+ is lazily initialized. The background thread is not created until the first call, so Puma worker forking does not duplicate the thread. No +after_fork+ hook is required.

== Deadlock warning

Do NOT call +Workflow#invoke+ (in EventLoop mode) from within a workflow entry action. The entry action runs on the EventLoop thread; a nested +invoke+ would block waiting for the same thread to process events → deadlock. Use the async pattern instead: schedule work via +Runtime.instance.spawn+ or +BlockingAdapterPool+, then post events back via +Phronomy::EventLoop.instance.post(...)+.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeEventLoop

Returns a new instance of EventLoop.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/phronomy/event_loop.rb', line 74

def initialize
  @queue = Phronomy::AsyncQueue.new  # global event queue (thread-safe; no Mutex needed)
  @fsms = {}                  # { id => FSMSession }     — EventLoop thread only
  @waiting = {}               # { id => completion_queue } — EventLoop thread only
  # Mutex-backed FSM count for drain-mode shutdown.
  @fsm_count_mutex = Mutex.new
  @fsm_count_cond = ConditionVariable.new
  @fsm_count = 0
  # Token cancelled when shutdown is requested; new child sessions receive it.
  @shutdown_token = Phronomy::CancellationToken.new
  # Fairness metrics (EventLoop thread only, except where noted)
  @lag_mutex = Mutex.new
  @last_lag_ns = 0
  @max_lag_ns = 0
  @dispatch_count = 0
  @total_lag_ns = 0
end

Class Method Details

.current?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns true when called from within the EventLoop dispatch task. Uses a task-local key set by the Runtime-spawned dispatch task so that the check works correctly for both thread-based and future fiber-based scheduler backends.

Returns:

  • (Boolean)


63
64
65
# File 'lib/phronomy/event_loop.rb', line 63

def self.current?
  Phronomy::Task.current&.name == "event-loop"
end

.instanceObject

Returns the singleton instance, creating and starting it on first call.



53
54
55
# File 'lib/phronomy/event_loop.rb', line 53

def self.instance
  @instance ||= new.tap(&:start)
end

.reset!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops and destroys the singleton. Primarily used in tests.



69
70
71
72
# File 'lib/phronomy/event_loop.rb', line 69

def self.reset!
  @instance&.stop
  @instance = nil
end

Instance Method Details

#average_lag_secondsFloat

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the mean event-loop lag across all dispatched events since the loop was started. Returns 0.0 when no events have been dispatched. Thread-safe.

Returns:

  • (Float)


114
115
116
117
118
119
120
# File 'lib/phronomy/event_loop.rb', line 114

def average_lag_seconds
  @lag_mutex.synchronize do
    return 0.0 if @dispatch_count.zero?

    @total_lag_ns.to_f / @dispatch_count / 1_000_000_000.0
  end
end

#enqueue_child(agent_fsm) ⇒ nil

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Enqueues an AgentFSM as a fire-and-forget child session.

Unlike #register, this method:

  • Is safe to call from the EventLoop thread (entry actions).
  • Does NOT block — no completion queue is created.
  • Delegates :finished/:error cleanup to the EventLoop via posted events.

Parameters:

Returns:

  • (nil)


163
164
165
166
167
168
# File 'lib/phronomy/event_loop.rb', line 163

def enqueue_child(agent_fsm)
  @queue.push([Event.new(type: :start, target_id: agent_fsm.id,
    payload: {session: agent_fsm, completion: nil}),
    Process.clock_gettime(Process::CLOCK_MONOTONIC, :nanosecond)])
  nil
end

#last_lag_secondsFloat

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the most recently measured event-loop lag in seconds. Lag is the wall-clock time between #post and the moment the event is dequeued for dispatch. Thread-safe.

Returns:

  • (Float)


97
98
99
# File 'lib/phronomy/event_loop.rb', line 97

def last_lag_seconds
  @lag_mutex.synchronize { @last_lag_ns } / 1_000_000_000.0
end

#max_lag_secondsFloat

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the maximum event-loop lag seen since the loop was started. Thread-safe.

Returns:

  • (Float)


105
106
107
# File 'lib/phronomy/event_loop.rb', line 105

def max_lag_seconds
  @lag_mutex.synchronize { @max_lag_ns } / 1_000_000_000.0
end

#post(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

Handler constraint: do not perform blocking operations or call +Workflow#invoke+ directly from within the handler that processes a posted event. Handlers run on the EventLoop thread; blocking there stalls all session processing. For blocking work, post a new event after the result is ready.

Posts an event to the loop. Safe to call from any thread (including IO threads). The current monotonic clock time is recorded so that the EventLoop can measure the dispatch lag when it dequeues the event.

Parameters:



181
182
183
# File 'lib/phronomy/event_loop.rb', line 181

def post(event)
  @queue.push([event, Process.clock_gettime(Process::CLOCK_MONOTONIC, :nanosecond)])
end

#register(fsm_session) ⇒ Phronomy::AsyncQueue

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers an FSMSession for execution and returns a completion queue.

The session and its completion queue are handed off to the EventLoop thread via the queue payload, so +@fsms+ and +@waiting+ are exclusively written and read by the EventLoop thread. No Mutex is required.

The caller blocks on +completion_queue.pop+ to receive the final context (WorkflowContext) once the workflow finishes or halts. If an error occurred, the popped value will be an Exception — callers are responsible for re-raising it.

Parameters:

Returns:



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/phronomy/event_loop.rb', line 135

def register(fsm_session)
  if Phronomy::EventLoop.current?
    raise Phronomy::Error,
      "Cannot call Workflow#invoke (EventLoop mode) from within an EventLoop " \
      "entry action. Schedule work via Runtime.instance.spawn or " \
      "BlockingAdapterPool, then post events back via " \
      "Phronomy::EventLoop.instance.post(...) instead."
  end

  completion_queue = Phronomy::AsyncQueue.new
  # Pass both session and completion_queue in the event payload so that the
  # EventLoop thread is the sole writer of @fsms and @waiting.
  @queue.push([Event.new(type: :start, target_id: fsm_session.id,
    payload: {session: fsm_session, completion: completion_queue}),
    Process.clock_gettime(Process::CLOCK_MONOTONIC, :nanosecond)])
  completion_queue
end

#startself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Starts the EventLoop dispatch task under Runtime ownership.

The dispatch loop runs as a Task so that Runtime#shutdown can drain it together with all other in-flight tasks. The task is named +"event-loop"+ so that current? can identify it via +Task.current&.name+.

Returns:

  • (self)


193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/phronomy/event_loop.rb', line 193

def start
  return self if @task&.alive?

  # Reset shutdown state so the loop can be restarted after a stop.
  @shutdown_token = Phronomy::CancellationToken.new
  @fsm_count_mutex.synchronize { @fsm_count = 0 }
  @running = true
  # The dispatch loop must always run in a real background thread.
  # A cooperative scheduler (FakeScheduler/ImmediateBackend) executes tasks
  # synchronously on the caller's thread, which would block forever inside
  # the run_loop infinite loop.  Create a dedicated Runtime with
  # ThreadScheduler to guarantee async execution regardless of the global
  # runtime_backend setting.
  thread_runtime = Phronomy::Runtime.new(scheduler: Phronomy::Runtime::ThreadScheduler.new)
  @task = thread_runtime.spawn(name: "event-loop") do
    run_loop
  end
  self
end

#stop(timeout: Phronomy.configuration.event_loop_stop_grace_seconds, drain: false, force_kill: false) ⇒ Symbol

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops the EventLoop dispatch task.

Sends a cooperative shutdown sentinel to the event queue so that the dispatch task can finish any in-flight handler before exiting. Waits up to +timeout+ seconds for a clean shutdown; if the task is still alive afterwards it is cancelled (cooperative cancellation via Task#cancel!).

Parameters:

  • timeout (Numeric) (defaults to: Phronomy.configuration.event_loop_stop_grace_seconds)

    seconds to wait for cooperative shutdown. Defaults to +Phronomy.configuration.event_loop_stop_grace_seconds+ (5 s).

  • drain (Boolean) (defaults to: false)

    when +true+, wait for all active FSMSessions to complete before signalling the loop to stop. Bounded by +timeout+. Defaults to +false+.

  • force_kill (Boolean) (defaults to: false)

    deprecated — retained for backward compatibility. When +true+, the dispatch task is cancelled via Task#cancel! if it does not stop within +timeout+. +Thread#kill+ is no longer used; cooperative cancellation (raising CancellationError) replaces it.

Returns:

  • (Symbol)

    shutdown status:

    • +:clean+ — loop exited cooperatively with no active sessions discarded
    • +:drained_with_discards+ — drain mode requested but sessions remained; they were discarded and the loop was stopped
    • +:timeout+ — the task did not stop in time and +force_kill:+ is +false+
    • +:force_killed+ — the task was cancelled because it did not stop in time


236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/phronomy/event_loop.rb', line 236

def stop(timeout: Phronomy.configuration.event_loop_stop_grace_seconds, drain: false, force_kill: false)
  @shutdown_token.cancel!
  status = :clean

  if drain
    # Wait for active sessions to finish, bounded by timeout.
    deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
    @fsm_count_mutex.synchronize do
      while @fsm_count > 0
        remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
        break if remaining <= 0
        @fsm_count_cond.wait(@fsm_count_mutex, remaining)
      end
      status = :drained_with_discards if @fsm_count > 0
    end
  end

  @running = false
  @queue.push(:__stop__)   # unblock queue.pop so the task can see @running = false
  begin
    @task&.join(timeout)
  rescue
    # Task may have terminated with an error (e.g. simulated crash in tests).
    # Suppress the re-raise so the cleanup below always runs.
    nil
  end
  if @task&.alive?
    if force_kill
      Phronomy.configuration.logger&.warn(
        "[Phronomy] EventLoop task did not stop within #{timeout}s; cancelling. " \
        "This is a last resort — check for blocking operations in event handlers."
      )
      @task.cancel!
      status = :force_killed
    else
      Phronomy.configuration.logger&.warn(
        "[Phronomy] EventLoop task did not stop within #{timeout}s; abandoning " \
        "(force_kill: false). Check for blocking operations in event handlers."
      )
      status = :timeout
    end
  end
  @task = nil
  status
end