Class: Solana::Ruby::Kit::Subscribable::RetryableReactiveStreamStore
- Inherits:
-
ReactiveStreamStore
- Object
- ReactiveStreamStore
- Solana::Ruby::Kit::Subscribable::RetryableReactiveStreamStore
- Extended by:
- T::Sig
- Defined in:
- lib/solana/ruby/kit/subscribable/reactive_stream_store.rb
Overview
A ReactiveStreamStore that supports retry() by re-invoking a factory proc to get a fresh DataPublisher on each connection attempt. Mirrors TypeScript’s createReactiveStoreFromDataPublisherFactory result.
Instance Method Summary collapse
-
#initialize(data_channel:, error_channel:, create_publisher:, signal: nil) ⇒ RetryableReactiveStreamStore
constructor
A new instance of RetryableReactiveStreamStore.
- #retry ⇒ Object
Methods inherited from ReactiveStreamStore
#_set_state, #get_error, #get_state, #get_unified_state, #subscribe
Constructor Details
#initialize(data_channel:, error_channel:, create_publisher:, signal: nil) ⇒ RetryableReactiveStreamStore
Returns a new instance of RetryableReactiveStreamStore.
138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 138 def initialize(data_channel:, error_channel:, create_publisher:, signal: nil) super() @data_channel = T.let(data_channel, T.untyped) @error_channel = T.let(error_channel, T.untyped) @outer_signal = T.let(signal, T.nilable(T.proc.void)) @create_publisher = T.let(create_publisher, T.proc.returns(DataPublisher)) @stopped = T.let(false, T::Boolean) # Per-connection active flag shared with subscriber lambdas via closure. @conn_active = T.let([false], T::Array[T::Boolean]) _connect end |
Instance Method Details
#retry ⇒ Object
151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/solana/ruby/kit/subscribable/reactive_stream_store.rb', line 151 def retry stopped = @mutex.synchronize { @stopped } return if stopped return unless get_unified_state.status == 'error' stale_data = get_unified_state.data # Deactivate old connection's subscribers @mutex.synchronize { @conn_active[0] = false } _set_state(ReactiveState.new(status: 'retrying', data: stale_data)) _connect end |