Class: Phronomy::FSMSession

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/fsm_session.rb

Overview

Event-driven execution wrapper for a single workflow run.

Created by WorkflowRunner and registered with EventLoop. All public methods are called from the EventLoop thread — FSMSession is NOT thread-safe and must not be accessed concurrently from multiple threads.

== Lifecycle

register(session) → EventLoop posts :start → session.start ↓ (auto-transition present) EventLoop posts :state_completed → session.handle ↓ (repeat) session posts :finished or :halted ↓ EventLoop pushes ctx to completion_queue → caller unblocks

== Async IO pattern (EventLoop mode only)

When a state has no auto-transition and is not a wait_state, but has an external event registered (e.g. +transition from: :fetching, on: :fetch_done+), the FSMSession stays registered in the EventLoop and waits for that event. The entry action is expected to spawn an IO thread that posts the event back:

entry :fetching, ->(ctx) { Thread.new { ctx.result = http.get(ctx.url) Phronomy::EventLoop.instance.post( Phronomy::Event.new(type: :fetch_done, target_id: ctx.thread_id, payload: nil) ) } } transition from: :fetching, on: :fetch_done, to: :process

Constant Summary collapse

FINISH =
WorkflowRunner::FINISH

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id:, context:, entry_point:, entry_actions:, auto_state_set:, declared_states:, wait_state_names:, external_events:, phase_machine_class:, recursion_limit:, action_timeouts: {}, resume_event: nil, resume_phase: nil) ⇒ FSMSession

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of FSMSession.

Parameters:

  • id (String)
  • context (Object)

    includes Phronomy::WorkflowContext

  • entry_point (Symbol)

    initial state name

  • entry_actions (Hash)

    { state_name => [callable, ...] }

  • auto_state_set (Hash)

    { state_name => true }

  • declared_states (Array<Symbol>)

    all action state names

  • wait_state_names (Array<Symbol>)
  • external_events (Hash)

    { event_name => [to:, guard:] }

  • phase_machine_class (Class)

    state_machines-backed phase tracker class

  • recursion_limit (Integer)
  • action_timeouts (Hash) (defaults to: {})

    { state_name => seconds }

  • resume_event (Symbol, nil) (defaults to: nil)

    external event to fire when resuming

  • resume_phase (Symbol, nil) (defaults to: nil)

    wait state name to resume from



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/phronomy/fsm_session.rb', line 56

def initialize(id:, context:, entry_point:, entry_actions:, auto_state_set:,
  declared_states:, wait_state_names:, external_events:, phase_machine_class:,
  recursion_limit:, action_timeouts: {}, resume_event: nil, resume_phase: nil)
  @id = id
  @ctx = context
  @entry_point = entry_point
  @entry_actions = entry_actions
  @auto_state_set = auto_state_set
  @declared_states = declared_states
  @wait_state_names = wait_state_names
  @external_events = external_events
  @phase_machine_class = phase_machine_class
  @recursion_limit = recursion_limit
  @action_timeouts = action_timeouts
  @resume_event = resume_event
  @resume_phase = resume_phase
  @step = 0
  @done = false
  @current_state = nil
  @tracker = nil
end

Instance Attribute Details

#idString (readonly)

Returns workflow thread_id (matches WorkflowContext#thread_id).

Returns:

  • (String)

    workflow thread_id (matches WorkflowContext#thread_id)



40
41
42
# File 'lib/phronomy/fsm_session.rb', line 40

def id
  @id
end

Instance Method Details

#handle(event) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Processes an event dispatched from EventLoop. Called for :state_completed, :action_completed, and all user-defined external events.

Parameters:



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/phronomy/fsm_session.rb', line 136

def handle(event)
  return if @done

  if event.type == :action_completed
    # An awaitable entry action completed: update context and advance.
    @ctx = event.payload if event.payload.is_a?(Phronomy::WorkflowContext)
    @tracker.context = @ctx
    @tracker.async_pending = false  # Reset flag set by start or fire_and_advance!
    advance_or_halt
    return
  end

  fire_and_advance!(event.type)
rescue => e
  finish_with_error(e)
end

#startObject

Begins workflow execution. Called by EventLoop on :start event.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/phronomy/fsm_session.rb', line 79

def start
  if @resume_event
    # Resume from wait state: position tracker at the wait state, then fire the
    # external event. state_machines fires before_transition (exit) and
    # after_transition (entry) callbacks, so both actions execute here.
    @current_state = @resume_phase
    @tracker = build_tracker(@current_state)
    @tracker.context = @ctx
    fire_and_advance!(@resume_event)
  else
    # Fresh start: state_machines does not fire callbacks on initialization,
    # so we invoke the entry action for the initial state manually.
    @current_state = @entry_point
    @tracker = build_tracker(@current_state)
    @tracker.context = @ctx
    (@entry_actions[@current_state] || []).each do |c|
      result = c.call(@ctx)
      if result.is_a?(Phronomy::Task)
        # Awaitable action: spawn a task to await without blocking EventLoop.
        @tracker.async_pending = true
        session_id = @id
        current_state_name = @current_state
        timeout_secs = @action_timeouts[current_state_name]
        Phronomy::Runtime.instance.spawn(name: "fsm-await-#{session_id}") do
          if timeout_secs
            if result.join(timeout_secs).nil?
              result.cancel!
              raise Phronomy::ActionTimeoutError,
                "Action in state #{current_state_name.inspect} timed out after #{timeout_secs}s"
            end
          end
          task_result = result.await
          if task_result.is_a?(Phronomy::WorkflowContext)
            event_loop.post(Event.new(type: :action_completed, target_id: session_id, payload: task_result))
          else
            event_loop.post(Event.new(type: :state_completed, target_id: session_id, payload: nil))
          end
        rescue => e
          event_loop.post(Event.new(type: :error, target_id: session_id, payload: e))
        end
        break # Only one async action at a time per state
      elsif result.is_a?(Phronomy::WorkflowContext)
        @ctx = result
      end
    end
    @tracker.context = @ctx
    advance_or_halt unless @tracker.async_pending
  end
rescue => e
  finish_with_error(e)
end