Class: Phronomy::FSMSession
- Inherits:
-
Object
- Object
- Phronomy::FSMSession
- Defined in:
- lib/phronomy/fsm_session.rb
Overview
Event-driven execution wrapper for a single workflow run.
Created by WorkflowRunner and registered with EventLoop. All public methods are called from the EventLoop thread — FSMSession is NOT thread-safe and must not be accessed concurrently from multiple threads.
== Lifecycle
register(session) → EventLoop posts :start → session.start ↓ (auto-transition present) EventLoop posts :state_completed → session.handle ↓ (repeat) session posts :finished or :halted ↓ EventLoop pushes ctx to completion_queue → caller unblocks
== Async IO pattern (EventLoop mode only)
When a state has no auto-transition and is not a wait_state, but has an external event registered (e.g. +transition from: :fetching, on: :fetch_done+), the FSMSession stays registered in the EventLoop and waits for that event. The entry action is expected to spawn an IO thread that posts the event back:
entry :fetching, ->(ctx) { Thread.new { ctx.result = http.get(ctx.url) Phronomy::EventLoop.instance.post( Phronomy::Event.new(type: :fetch_done, target_id: ctx.thread_id, payload: nil) ) } } transition from: :fetching, on: :fetch_done, to: :process
Constant Summary collapse
- FINISH =
WorkflowRunner::FINISH
Instance Attribute Summary collapse
-
#id ⇒ String
readonly
Workflow thread_id (matches WorkflowContext#thread_id).
Instance Method Summary collapse
-
#handle(event) ⇒ Object
private
Processes an event dispatched from EventLoop.
-
#initialize(id:, context:, entry_point:, entry_actions:, auto_state_set:, declared_states:, wait_state_names:, external_events:, phase_machine_class:, recursion_limit:, action_timeouts: {}, resume_event: nil, resume_phase: nil) ⇒ FSMSession
constructor
private
A new instance of FSMSession.
-
#start ⇒ Object
Begins workflow execution.
Constructor Details
#initialize(id:, context:, entry_point:, entry_actions:, auto_state_set:, declared_states:, wait_state_names:, external_events:, phase_machine_class:, recursion_limit:, action_timeouts: {}, resume_event: nil, resume_phase: nil) ⇒ FSMSession
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of FSMSession.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/phronomy/fsm_session.rb', line 56 def initialize(id:, context:, entry_point:, entry_actions:, auto_state_set:, declared_states:, wait_state_names:, external_events:, phase_machine_class:, recursion_limit:, action_timeouts: {}, resume_event: nil, resume_phase: nil) @id = id @ctx = context @entry_point = entry_point @entry_actions = entry_actions @auto_state_set = auto_state_set @declared_states = declared_states @wait_state_names = wait_state_names @external_events = external_events @phase_machine_class = phase_machine_class @recursion_limit = recursion_limit @action_timeouts = action_timeouts @resume_event = resume_event @resume_phase = resume_phase @step = 0 @done = false @current_state = nil @tracker = nil end |
Instance Attribute Details
#id ⇒ String (readonly)
Returns workflow thread_id (matches WorkflowContext#thread_id).
40 41 42 |
# File 'lib/phronomy/fsm_session.rb', line 40 def id @id end |
Instance Method Details
#handle(event) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Processes an event dispatched from EventLoop. Called for :state_completed, :action_completed, and all user-defined external events.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/phronomy/fsm_session.rb', line 136 def handle(event) return if @done if event.type == :action_completed # An awaitable entry action completed: update context and advance. @ctx = event.payload if event.payload.is_a?(Phronomy::WorkflowContext) @tracker.context = @ctx @tracker.async_pending = false # Reset flag set by start or fire_and_advance! advance_or_halt return end fire_and_advance!(event.type) rescue => e finish_with_error(e) end |
#start ⇒ Object
Begins workflow execution. Called by EventLoop on :start event.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/phronomy/fsm_session.rb', line 79 def start if @resume_event # Resume from wait state: position tracker at the wait state, then fire the # external event. state_machines fires before_transition (exit) and # after_transition (entry) callbacks, so both actions execute here. @current_state = @resume_phase @tracker = build_tracker(@current_state) @tracker.context = @ctx fire_and_advance!(@resume_event) else # Fresh start: state_machines does not fire callbacks on initialization, # so we invoke the entry action for the initial state manually. @current_state = @entry_point @tracker = build_tracker(@current_state) @tracker.context = @ctx (@entry_actions[@current_state] || []).each do |c| result = c.call(@ctx) if result.is_a?(Phronomy::Task) # Awaitable action: spawn a task to await without blocking EventLoop. @tracker.async_pending = true session_id = @id current_state_name = @current_state timeout_secs = @action_timeouts[current_state_name] Phronomy::Runtime.instance.spawn(name: "fsm-await-#{session_id}") do if timeout_secs if result.join(timeout_secs).nil? result.cancel! raise Phronomy::ActionTimeoutError, "Action in state #{current_state_name.inspect} timed out after #{timeout_secs}s" end end task_result = result.await if task_result.is_a?(Phronomy::WorkflowContext) event_loop.post(Event.new(type: :action_completed, target_id: session_id, payload: task_result)) else event_loop.post(Event.new(type: :state_completed, target_id: session_id, payload: nil)) end rescue => e event_loop.post(Event.new(type: :error, target_id: session_id, payload: e)) end break # Only one async action at a time per state elsif result.is_a?(Phronomy::WorkflowContext) @ctx = result end end @tracker.context = @ctx advance_or_halt unless @tracker.async_pending end rescue => e finish_with_error(e) end |