Class: Solana::Ruby::Kit::Subscribable::RetryableReactiveStreamStore

Inherits:
ReactiveStreamStore show all
Extended by:
T::Sig
Defined in:
lib/solana/ruby/kit/subscribable/reactive_stream_store.rb

Overview

A ReactiveStreamStore that supports retry() by re-invoking a factory proc to get a fresh DataPublisher on each connection attempt. Mirrors TypeScript’s createReactiveStoreFromDataPublisherFactory result.

Instance Method Summary collapse

Methods inherited from ReactiveStreamStore

#_set_state, #get_error, #get_state, #get_unified_state, #subscribe

Constructor Details

#initialize(data_channel:, error_channel:, create_publisher:, signal: nil) ⇒ RetryableReactiveStreamStore

Returns a new instance of RetryableReactiveStreamStore.



138
139
140
141
142
143
144
145
146
147
148
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 138

def initialize(data_channel:, error_channel:, create_publisher:, signal: nil)
  super()
  @data_channel     = T.let(data_channel,     T.untyped)
  @error_channel    = T.let(error_channel,    T.untyped)
  @outer_signal     = T.let(signal,            T.nilable(T.proc.void))
  @create_publisher = T.let(create_publisher,  T.proc.returns(DataPublisher))
  @stopped          = T.let(false,             T::Boolean)
  # Per-connection active flag shared with subscriber lambdas via closure.
  @conn_active      = T.let([false],            T::Array[T::Boolean])
  _connect
end

Instance Method Details

#retryObject



151
152
153
154
155
156
157
158
159
160
161
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 151

def retry
  stopped = @mutex.synchronize { @stopped }
  return if stopped
  return unless get_unified_state.status == 'error'

  stale_data = get_unified_state.data
  # Deactivate old connection's subscribers
  @mutex.synchronize { @conn_active[0] = false }
  _set_state(ReactiveState.new(status: 'retrying', data: stale_data))
  _connect
end