Class: Tempest::Jetstream::Watchdog

Inherits:
Object
  • Object
show all
Defined in:
lib/tempest/jetstream/watchdog.rb

Overview

Detects stalled Jetstream connections and force-reconnects them.

Background: after macOS sleep/wake the kernel may still consider the WebSocket’s TCP socket alive, so Jetstream::Client#each_event blocks in ‘recv` indefinitely instead of raising. StreamManager’s reconnect loop therefore never runs. The watchdog periodically inspects ‘stream_manager.last_event_at` and, if no event has arrived within `threshold_seconds`, calls `force_reconnect` to break the stalled call.

Constant Summary collapse

DEFAULT_THRESHOLD_SECONDS =
90
DEFAULT_INTERVAL_SECONDS =
30

Instance Method Summary collapse

Constructor Details

#initialize(stream_manager:, threshold_seconds: DEFAULT_THRESHOLD_SECONDS, interval_seconds: DEFAULT_INTERVAL_SECONDS, clock: -> { Time.now }, sleeper: ->(s) { sleep(s) }, logger: nil) ⇒ Watchdog

Returns a new instance of Watchdog.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/tempest/jetstream/watchdog.rb', line 18

def initialize(stream_manager:,
               threshold_seconds: DEFAULT_THRESHOLD_SECONDS,
               interval_seconds: DEFAULT_INTERVAL_SECONDS,
               clock: -> { Time.now },
               sleeper: ->(s) { sleep(s) },
               logger: nil)
  @stream_manager = stream_manager
  @threshold_seconds = threshold_seconds
  @interval_seconds = interval_seconds
  @clock = clock
  @sleeper = sleeper
  @logger = logger || Tempest::DebugLog.build_null_logger
  @thread = nil
  @mutex = Mutex.new
  @stopping = false
end

Instance Method Details

#startObject



35
36
37
38
39
40
41
# File 'lib/tempest/jetstream/watchdog.rb', line 35

def start
  @mutex.synchronize do
    return if @thread&.alive?
    @stopping = false
    @thread = Thread.new { run }
  end
end

#stopObject



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/tempest/jetstream/watchdog.rb', line 43

def stop
  thread = @mutex.synchronize do
    @stopping = true
    t = @thread
    @thread = nil
    t
  end
  return unless thread
  thread.kill
  thread.join
end