Class: Tempest::Jetstream::StreamManager

Inherits:
Object
  • Object
show all
Defined in:
lib/tempest/jetstream/stream_manager.rb

Overview

Runs a Jetstream::Client in a background thread so the REPL stays responsive. The transport itself is fiber-based, but we keep that fiber off the main thread to avoid interleaving with Reline’s blocking read. Owns reconnect-with-cursor so a flaky socket or sleep/wake cycle doesn’t silently strand the live feed.

Constant Summary collapse

DEFAULT_BACKOFF =
[1, 2, 5, 10, 30].freeze
CURSOR_WINDOW_SECONDS =

Conservative replay window: Jetstream’s default event-ttl is 24h, but Bluesky doesn’t publicly commit to that for their hosted instances and boundary cases (clock skew, tail trim races) bite around the limit. If we’ve been offline longer than this, drop the cursor and let the Runner backfill via getTimeline.

12 * 60 * 60
DEFAULT_CURSOR_SAVE_INTERVAL =

How often we persist the cursor during a stable live-tail. 5s caps the worst-case event loss on crash to a few seconds of activity while keeping disk writes negligible on a busy stream.

5.0

Instance Method Summary collapse

Constructor Details

#initialize(client:, backoff: DEFAULT_BACKOFF, sleeper: ->(s) { sleep(s) }, clock: -> { Time.now }, cursor_store: nil, cursor_save_interval: DEFAULT_CURSOR_SAVE_INTERVAL, filter: nil, logger: nil) ⇒ StreamManager

Returns a new instance of StreamManager.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/tempest/jetstream/stream_manager.rb', line 25

def initialize(client:, backoff: DEFAULT_BACKOFF, sleeper: ->(s) { sleep(s) },
               clock: -> { Time.now }, cursor_store: nil,
               cursor_save_interval: DEFAULT_CURSOR_SAVE_INTERVAL,
               filter: nil, logger: nil)
  @client = client
  @backoff = backoff
  @sleeper = sleeper
  @clock = clock
  @cursor_store = cursor_store
  @cursor_save_interval = cursor_save_interval
  @filter = filter
  @logger = logger || Tempest::DebugLog.build_null_logger
  @thread = nil
  @mutex = Mutex.new
  @stopping = false
  @cursor_state = { live: nil, saved: nil }
  @last_event_at = nil
end

Instance Method Details

#force_reconnectObject

Break a stalled each_event so the reconnect loop can run. Used by the Watchdog when the kernel hasn’t surfaced the disconnect (e.g., after macOS sleep/wake). Safe to call from another thread or when no worker is running.



84
85
86
87
88
89
90
91
92
93
# File 'lib/tempest/jetstream/stream_manager.rb', line 84

def force_reconnect
  thread = @mutex.synchronize { @thread }
  return unless thread&.alive?
  @logger.warn("stream") { "force_reconnect requested" }
  begin
    thread.raise(Stalled.new("forced reconnect"))
  rescue ThreadError
    # Thread already exited between alive? and raise — nothing to do.
  end
end

#last_event_atObject

Time of the last event yielded by the underlying client, regardless of whether the filter accepted it. Watchdog reads this to detect a stalled socket (kernel still thinks the TCP connection is alive but no bytes are arriving).



76
77
78
# File 'lib/tempest/jetstream/stream_manager.rb', line 76

def last_event_at
  @mutex.synchronize { @last_event_at }
end

#running?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/tempest/jetstream/stream_manager.rb', line 68

def running?
  @mutex.synchronize { !!@thread&.alive? }
end

#start(&on_event) ⇒ Object



44
45
46
47
48
49
50
# File 'lib/tempest/jetstream/stream_manager.rb', line 44

def start(&on_event)
  @mutex.synchronize do
    return if @thread&.alive?
    @stopping = false
    @thread = Thread.new { run(on_event) }
  end
end

#stopObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/tempest/jetstream/stream_manager.rb', line 52

def stop
  @logger.info("stream") do
    live = @mutex.synchronize { @cursor_state[:live] }
    "stopping final_cursor=#{live.inspect}"
  end
  @mutex.synchronize { @stopping = true }
  thread = @mutex.synchronize do
    t = @thread
    @thread = nil
    t
  end
  thread&.kill
  thread&.join
  flush_cursor!
end