Class: Tempest::Jetstream::Watchdog
- Inherits:
-
Object
- Object
- Tempest::Jetstream::Watchdog
- Defined in:
- lib/tempest/jetstream/watchdog.rb
Overview
Detects stalled Jetstream connections and force-reconnects them.
Background: after macOS sleep/wake the kernel may still consider the WebSocket’s TCP socket alive, so Jetstream::Client#each_event blocks in ‘recv` indefinitely instead of raising. StreamManager’s reconnect loop therefore never runs. The watchdog periodically inspects ‘stream_manager.last_event_at` and, if no event has arrived within `threshold_seconds`, calls `force_reconnect` to break the stalled call.
The threshold has to be long enough that a quiet but healthy stream (e.g. ‘–feed=self`, where only the user’s own posts come through) doesn’t trip it. macOS sleeps are typically minutes to hours, so 10 minutes catches real stalls without flapping on idle streams.
Constant Summary collapse
- DEFAULT_THRESHOLD_SECONDS =
600- DEFAULT_INTERVAL_SECONDS =
30
Instance Method Summary collapse
-
#initialize(stream_manager:, threshold_seconds: DEFAULT_THRESHOLD_SECONDS, interval_seconds: DEFAULT_INTERVAL_SECONDS, clock: -> { Time.now }, sleeper: ->(s) { sleep(s) }, logger: nil) ⇒ Watchdog
constructor
A new instance of Watchdog.
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(stream_manager:, threshold_seconds: DEFAULT_THRESHOLD_SECONDS, interval_seconds: DEFAULT_INTERVAL_SECONDS, clock: -> { Time.now }, sleeper: ->(s) { sleep(s) }, logger: nil) ⇒ Watchdog
Returns a new instance of Watchdog.
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/tempest/jetstream/watchdog.rb', line 23 def initialize(stream_manager:, threshold_seconds: DEFAULT_THRESHOLD_SECONDS, interval_seconds: DEFAULT_INTERVAL_SECONDS, clock: -> { Time.now }, sleeper: ->(s) { sleep(s) }, logger: nil) @stream_manager = stream_manager @threshold_seconds = threshold_seconds @interval_seconds = interval_seconds @clock = clock @sleeper = sleeper @logger = logger || Tempest::DebugLog.build_null_logger @thread = nil @mutex = Mutex.new @stopping = false end |
Instance Method Details
#start ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/tempest/jetstream/watchdog.rb', line 40 def start @mutex.synchronize do return if @thread&.alive? @stopping = false @thread = Thread.new { run } end end |
#stop ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/tempest/jetstream/watchdog.rb', line 48 def stop thread = @mutex.synchronize do @stopping = true t = @thread @thread = nil t end return unless thread thread.kill thread.join end |