Class: Tempest::Jetstream::StreamManager
- Inherits:
-
Object
- Object
- Tempest::Jetstream::StreamManager
- Defined in:
- lib/tempest/jetstream/stream_manager.rb
Overview
Runs a Jetstream::Client in a background thread so the REPL stays responsive. The transport itself is fiber-based, but we keep that fiber off the main thread to avoid interleaving with Reline’s blocking read. Owns reconnect-with-cursor so a flaky socket or sleep/wake cycle doesn’t silently strand the live feed.
Constant Summary collapse
- DEFAULT_BACKOFF =
[1, 2, 5, 10, 30].freeze
- CURSOR_WINDOW_SECONDS =
Conservative replay window: Jetstream’s default event-ttl is 24h, but Bluesky doesn’t publicly commit to that for their hosted instances and boundary cases (clock skew, tail trim races) bite around the limit. If we’ve been offline longer than this, drop the cursor and let the Runner backfill via getTimeline.
12 * 60 * 60
- DEFAULT_CURSOR_SAVE_INTERVAL =
How often we persist the cursor during a stable live-tail. 5s caps the worst-case event loss on crash to a few seconds of activity while keeping disk writes negligible on a busy stream.
5.0
Instance Method Summary collapse
-
#force_reconnect ⇒ Object
Break a stalled each_event so the reconnect loop can run.
-
#initialize(client:, backoff: DEFAULT_BACKOFF, sleeper: ->(s) { sleep(s) }, clock: -> { Time.now }, cursor_store: nil, cursor_save_interval: DEFAULT_CURSOR_SAVE_INTERVAL, filter: nil, logger: nil) ⇒ StreamManager
constructor
A new instance of StreamManager.
-
#last_event_at ⇒ Object
Time of the last event yielded by the underlying client, regardless of whether the filter accepted it.
- #running? ⇒ Boolean
- #start(&on_event) ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(client:, backoff: DEFAULT_BACKOFF, sleeper: ->(s) { sleep(s) }, clock: -> { Time.now }, cursor_store: nil, cursor_save_interval: DEFAULT_CURSOR_SAVE_INTERVAL, filter: nil, logger: nil) ⇒ StreamManager
Returns a new instance of StreamManager.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 25 def initialize(client:, backoff: DEFAULT_BACKOFF, sleeper: ->(s) { sleep(s) }, clock: -> { Time.now }, cursor_store: nil, cursor_save_interval: DEFAULT_CURSOR_SAVE_INTERVAL, filter: nil, logger: nil) @client = client @backoff = backoff @sleeper = sleeper @clock = clock @cursor_store = cursor_store @cursor_save_interval = cursor_save_interval @filter = filter @logger = logger || Tempest::DebugLog.null_channel @thread = nil @mutex = Mutex.new @stopping = false @cursor_state = { live: nil, saved: nil } @last_event_at = nil end |
Instance Method Details
#force_reconnect ⇒ Object
Break a stalled each_event so the reconnect loop can run. Used by the Watchdog when the kernel hasn’t surfaced the disconnect (e.g., after macOS sleep/wake). Safe to call from another thread or when no worker is running.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 82 def force_reconnect thread = @mutex.synchronize { @thread } return unless thread&.alive? @logger.warn("stream", event: "force_reconnect_requested") # Pre-advance last_event_at so the watchdog's next tick sees a fresh # connection and doesn't re-fire while the worker is still recovering. # Without this, a second Stalled can land in the backoff sleep — which # is outside the inner `rescue Stalled` block — and would historically # take down the worker. The outer rescue in `run` now catches that # case too, but suppressing the duplicate force_reconnect is still the # right thing to do. @mutex.synchronize { @last_event_at = @clock.call } begin thread.raise(Stalled.new("forced reconnect")) rescue ThreadError # Thread already exited between alive? and raise — nothing to do. end end |
#last_event_at ⇒ Object
Time of the last event yielded by the underlying client, regardless of whether the filter accepted it. Watchdog reads this to detect a stalled socket (kernel still thinks the TCP connection is alive but no bytes are arriving).
74 75 76 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 74 def last_event_at @mutex.synchronize { @last_event_at } end |
#running? ⇒ Boolean
66 67 68 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 66 def running? @mutex.synchronize { !!@thread&.alive? } end |
#start(&on_event) ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 44 def start(&on_event) @mutex.synchronize do return if @thread&.alive? @stopping = false @thread = Thread.new { run(on_event) } end end |
#stop ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/tempest/jetstream/stream_manager.rb', line 52 def stop live = @mutex.synchronize { @cursor_state[:live] } @logger.info("stream", event: "stopping", final_cursor: live) @mutex.synchronize { @stopping = true } thread = @mutex.synchronize do t = @thread @thread = nil t end thread&.kill thread&.join flush_cursor! end |