Class: Tempest::Jetstream::Watchdog

Inherits:
Object
  • Object
show all
Defined in:
lib/tempest/jetstream/watchdog.rb

Overview

Detects stalled Jetstream connections and force-reconnects them.

Background: after macOS sleep/wake the kernel may still consider the WebSocket’s TCP socket alive, so Jetstream::Client#each_event blocks in ‘recv` indefinitely instead of raising. StreamManager’s reconnect loop therefore never runs. The watchdog periodically inspects ‘stream_manager.last_event_at` and, if no event has arrived within `threshold_seconds`, calls `force_reconnect` to break the stalled call.

The threshold has to be long enough that a quiet but healthy stream (e.g. ‘–feed=self`, where only the user’s own posts come through) doesn’t trip it. macOS sleeps are typically minutes to hours, so 10 minutes catches real stalls without flapping on idle streams.

Constant Summary collapse

DEFAULT_THRESHOLD_SECONDS =
600
DEFAULT_INTERVAL_SECONDS =
30

Instance Method Summary collapse

Constructor Details

#initialize(stream_manager:, threshold_seconds: DEFAULT_THRESHOLD_SECONDS, interval_seconds: DEFAULT_INTERVAL_SECONDS, clock: -> { Time.now }, sleeper: ->(s) { sleep(s) }, logger: nil) ⇒ Watchdog

Returns a new instance of Watchdog.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/tempest/jetstream/watchdog.rb', line 23

def initialize(stream_manager:,
               threshold_seconds: DEFAULT_THRESHOLD_SECONDS,
               interval_seconds: DEFAULT_INTERVAL_SECONDS,
               clock: -> { Time.now },
               sleeper: ->(s) { sleep(s) },
               logger: nil)
  @stream_manager = stream_manager
  @threshold_seconds = threshold_seconds
  @interval_seconds = interval_seconds
  @clock = clock
  @sleeper = sleeper
  @logger = logger || Tempest::DebugLog.build_null_logger
  @thread = nil
  @mutex = Mutex.new
  @stopping = false
end

Instance Method Details

#startObject



40
41
42
43
44
45
46
# File 'lib/tempest/jetstream/watchdog.rb', line 40

def start
  @mutex.synchronize do
    return if @thread&.alive?
    @stopping = false
    @thread = Thread.new { run }
  end
end

#stopObject



48
49
50
51
52
53
54
55
56
57
58
# File 'lib/tempest/jetstream/watchdog.rb', line 48

def stop
  thread = @mutex.synchronize do
    @stopping = true
    t = @thread
    @thread = nil
    t
  end
  return unless thread
  thread.kill
  thread.join
end