Class: Solana::Ruby::Kit::Subscribable::ReactiveActionStore

Inherits:
Object
  • Object
show all
Extended by:
T::Sig
Defined in:
lib/solana/ruby/kit/subscribable/reactive_action_store.rb

Overview

A thread-safe state machine wrapping a callable. Exposes

dispatch / dispatch_async / get_state / subscribe / reset

so that observers can react to in-flight, succeeded, or failed calls.

dispatch — fire-and-forget; runs the callable in a background thread. dispatch_async — blocking; runs the callable in the current thread and

returns the result (or raises on failure / supersession).

Only the most recent dispatch can mutate state — superseded calls silently drop their result via a generation counter.

Mirrors TypeScript’s ReactiveActionStore<TArgs, TResult>.

Instance Method Summary collapse

Constructor Details

#initialize(fn) ⇒ ReactiveActionStore

Returns a new instance of ReactiveActionStore.



67
68
69
70
71
72
73
# File 'lib/solana/ruby/kit/subscribable/reactive_action_store.rb', line 67

def initialize(fn)
  @fn          = T.let(fn,   T.proc.params(args: T.untyped).returns(T.untyped))
  @state       = T.let(REACTIVE_ACTION_IDLE_STATE, ReactiveActionState)
  @listeners   = T.let([], T::Array[T.proc.void])
  @mutex       = T.let(Mutex.new, Mutex)
  @current_gen = T.let(0, Integer)
end

Instance Method Details

#dispatch(*args) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/solana/ruby/kit/subscribable/reactive_action_store.rb', line 84

def dispatch(*args)
  gen        = nil
  prev_data  = nil
  @mutex.synchronize do
    @current_gen += 1
    gen       = @current_gen
    prev_data = @state.data
    @state    = ReactiveActionState.new(status: 'running', data: prev_data)
  end
  _notify

  Thread.new do
    begin
      result  = @fn.call(*args)
      active  = @mutex.synchronize do
        if @current_gen == gen
          @state = ReactiveActionState.new(status: 'success', data: result)
          true
        else
          false
        end
      end
      _notify if active
    rescue => e
      active = @mutex.synchronize do
        if @current_gen == gen
          @state = ReactiveActionState.new(status: 'error', data: prev_data, error: e)
          true
        else
          false
        end
      end
      _notify if active
    end
  end

  nil
end

#dispatch_async(*args) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/solana/ruby/kit/subscribable/reactive_action_store.rb', line 127

def dispatch_async(*args)
  gen        = nil
  prev_data  = nil
  @mutex.synchronize do
    @current_gen += 1
    gen       = @current_gen
    prev_data = @state.data
    @state    = ReactiveActionState.new(status: 'running', data: prev_data)
  end
  _notify

  result = @fn.call(*args)

  active = @mutex.synchronize do
    if @current_gen == gen
      @state = ReactiveActionState.new(status: 'success', data: result)
      true
    else
      false
    end
  end
  _notify if active
  Kernel.raise 'ReactiveActionStore: call was superseded' unless active
  result
rescue RuntimeError
  Kernel.raise
rescue => e
  active = @mutex.synchronize do
    if @current_gen == gen
      @state = ReactiveActionState.new(status: 'error', data: prev_data, error: e)
      true
    else
      false
    end
  end
  _notify if active
  Kernel.raise e
end

#get_stateObject



77
78
79
# File 'lib/solana/ruby/kit/subscribable/reactive_action_store.rb', line 77

def get_state
  @mutex.synchronize { @state }
end

#resetObject



169
170
171
172
173
174
175
# File 'lib/solana/ruby/kit/subscribable/reactive_action_store.rb', line 169

def reset
  @mutex.synchronize do
    @current_gen += 1
    @state = REACTIVE_ACTION_IDLE_STATE
  end
  _notify
end

#subscribe(&listener) ⇒ Object



180
181
182
183
# File 'lib/solana/ruby/kit/subscribable/reactive_action_store.rb', line 180

def subscribe(&listener)
  @mutex.synchronize { @listeners << listener }
  lambda { @mutex.synchronize { @listeners.delete(listener) } }
end