Class: Tempest::Jetstream::StreamManager
- Inherits:
-
Object
- Object
- Tempest::Jetstream::StreamManager
- Defined in:
- lib/tempest/jetstream/stream_manager.rb
Overview
Runs a Jetstream::Client in a background thread so the REPL stays responsive. The transport itself is fiber-based, but we keep that fiber off the main thread to avoid interleaving with Reline’s blocking read. Owns reconnect-with-cursor so a flaky socket or sleep/wake cycle doesn’t silently strand the live feed.
Constant Summary collapse
- DEFAULT_BACKOFF =
[1, 2, 5, 10, 30].freeze
- CURSOR_WINDOW_SECONDS =
Conservative replay window: Jetstream’s default event-ttl is 24h, but Bluesky doesn’t publicly commit to that for their hosted instances and boundary cases (clock skew, tail trim races) bite around the limit. If we’ve been offline longer than this, drop the cursor and let the Runner backfill via getTimeline.
12 * 60 * 60
- DEFAULT_CURSOR_SAVE_INTERVAL =
How often we persist the cursor during a stable live-tail. 5s caps the worst-case event loss on crash to a few seconds of activity while keeping disk writes negligible on a busy stream.
5.0
Instance Method Summary collapse
-
#force_reconnect ⇒ Object
Break a stalled each_event so the reconnect loop can run.
-
#initialize(client:, backoff: DEFAULT_BACKOFF, sleeper: ->(s) { sleep(s) }, clock: -> { Time.now }, cursor_store: nil, cursor_save_interval: DEFAULT_CURSOR_SAVE_INTERVAL, filter: nil, logger: nil) ⇒ StreamManager
constructor
A new instance of StreamManager.
-
#last_event_at ⇒ Object
Time of the last event yielded by the underlying client, regardless of whether the filter accepted it.
- #running? ⇒ Boolean
- #start(&on_event) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(client:, backoff: DEFAULT_BACKOFF, sleeper: ->(s) { sleep(s) }, clock: -> { Time.now }, cursor_store: nil, cursor_save_interval: DEFAULT_CURSOR_SAVE_INTERVAL, filter: nil, logger: nil) ⇒ StreamManager
Returns a new instance of StreamManager.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 25 def initialize(client:, backoff: DEFAULT_BACKOFF, sleeper: ->(s) { sleep(s) }, clock: -> { Time.now }, cursor_store: nil, cursor_save_interval: DEFAULT_CURSOR_SAVE_INTERVAL, filter: nil, logger: nil) @client = client @backoff = backoff @sleeper = sleeper @clock = clock @cursor_store = cursor_store @cursor_save_interval = cursor_save_interval @filter = filter @logger = logger || Tempest::DebugLog.build_null_logger @thread = nil @mutex = Mutex.new @stopping = false @cursor_state = { live: nil, saved: nil } @last_event_at = nil end |
Instance Method Details
#force_reconnect ⇒ Object
Break a stalled each_event so the reconnect loop can run. Used by the Watchdog when the kernel hasn’t surfaced the disconnect (e.g., after macOS sleep/wake). Safe to call from another thread or when no worker is running.
84 85 86 87 88 89 90 91 92 93 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 84 def force_reconnect thread = @mutex.synchronize { @thread } return unless thread&.alive? @logger.warn("stream") { "force_reconnect requested" } begin thread.raise(Stalled.new("forced reconnect")) rescue ThreadError # Thread already exited between alive? and raise — nothing to do. end end |
#last_event_at ⇒ Object
Time of the last event yielded by the underlying client, regardless of whether the filter accepted it. Watchdog reads this to detect a stalled socket (kernel still thinks the TCP connection is alive but no bytes are arriving).
76 77 78 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 76 def last_event_at @mutex.synchronize { @last_event_at } end |
#running? ⇒ Boolean
68 69 70 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 68 def running? @mutex.synchronize { !!@thread&.alive? } end |
#start(&on_event) ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 44 def start(&on_event) @mutex.synchronize do return if @thread&.alive? @stopping = false @thread = Thread.new { run(on_event) } end end |
#stop ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 52 def stop @logger.info("stream") do live = @mutex.synchronize { @cursor_state[:live] } "stopping final_cursor=#{live.inspect}" end @mutex.synchronize { @stopping = true } thread = @mutex.synchronize do t = @thread @thread = nil t end thread&.kill thread&.join flush_cursor! end |