Class: Solana::Ruby::Kit::Subscribable::ReactiveStreamStore

Inherits:
Object
  • Object
show all
Extended by:
T::Sig
Defined in:
lib/solana/ruby/kit/subscribable/reactive_stream_store.rb

Overview

Thread-safe store that tracks the latest value published to a data channel and notifies subscribers on every state change.

Compatible with any observer pattern that expects a

{ subscribe, get_unified_state } contract.

Mirrors TypeScript’s ReactiveStreamStore<T>.

Direct Known Subclasses

RetryableReactiveStreamStore

Instance Method Summary collapse

Constructor Details

#initializeReactiveStreamStore

Returns a new instance of ReactiveStreamStore.



66
67
68
69
70
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 66

def initialize
  @state       = T.let(REACTIVE_LOADING_STATE, ReactiveState)
  @subscribers = T.let([], T::Array[T.proc.void])
  @mutex       = T.let(Mutex.new, Mutex)
end

Instance Method Details

#_set_state(new_state) ⇒ Object



110
111
112
113
114
115
116
117
118
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 110

def _set_state(new_state)
  subs = nil
  @mutex.synchronize do
    return if @state.equal?(new_state)
    @state = new_state
    subs = @subscribers.dup
  end
  subs&.each(&:call)
end

#get_errorObject



88
89
90
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 88

def get_error
  @mutex.synchronize { @state.error }
end

#get_stateObject



81
82
83
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 81

def get_state
  @mutex.synchronize { @state.data }
end

#get_unified_stateObject



74
75
76
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 74

def get_unified_state
  @mutex.synchronize { @state }
end

#retryObject



96
97
98
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 96

def retry
  Kernel.raise SolanaError.new(SolanaError::SUBSCRIBABLE__RETRY_NOT_SUPPORTED)
end

#subscribe(&callback) ⇒ Object



103
104
105
106
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 103

def subscribe(&callback)
  @mutex.synchronize { @subscribers << callback }
  lambda { @mutex.synchronize { @subscribers.delete(callback) } }
end