Class: Solana::Ruby::Kit::Subscribable::ReactiveStreamStore
- Inherits:
-
Object
- Object
- Solana::Ruby::Kit::Subscribable::ReactiveStreamStore
show all
- Extended by:
- T::Sig
- Defined in:
- lib/solana/ruby/kit/subscribable/reactive_stream_store.rb
Overview
Thread-safe store that tracks the latest value published to a data channel and notifies subscribers on every state change.
Compatible with any observer pattern that expects a
{ subscribe, get_unified_state } contract.
Mirrors TypeScript’s ReactiveStreamStore<T>.
Instance Method Summary
collapse
Constructor Details
Returns a new instance of ReactiveStreamStore.
66
67
68
69
70
|
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 66
def initialize
@state = T.let(REACTIVE_LOADING_STATE, ReactiveState)
@subscribers = T.let([], T::Array[T.proc.void])
@mutex = T.let(Mutex.new, Mutex)
end
|
Instance Method Details
#_set_state(new_state) ⇒ Object
110
111
112
113
114
115
116
117
118
|
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 110
def _set_state(new_state)
subs = nil
@mutex.synchronize do
return if @state.equal?(new_state)
@state = new_state
subs = @subscribers.dup
end
subs&.each(&:call)
end
|
#get_error ⇒ Object
88
89
90
|
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 88
def get_error
@mutex.synchronize { @state.error }
end
|
#get_state ⇒ Object
81
82
83
|
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 81
def get_state
@mutex.synchronize { @state.data }
end
|
#get_unified_state ⇒ Object
74
75
76
|
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 74
def get_unified_state
@mutex.synchronize { @state }
end
|
#subscribe(&callback) ⇒ Object
103
104
105
106
|
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 103
def subscribe(&callback)
@mutex.synchronize { @subscribers << callback }
lambda { @mutex.synchronize { @subscribers.delete(callback) } }
end
|