Class: Tempest::Jetstream::Watchdog

Inherits:
Object
  • Object
show all
Defined in:
lib/tempest/jetstream/watchdog.rb

Overview

Detects stalled Jetstream connections and force-reconnects them.

Background: after macOS sleep/wake the kernel may still consider the WebSocket’s TCP socket alive, so Jetstream::Client#each_event blocks in ‘recv` indefinitely instead of raising. StreamManager’s reconnect loop therefore never runs. The watchdog periodically inspects ‘stream_manager.last_event_at` and, if no event has arrived within `threshold_seconds`, calls `force_reconnect` to break the stalled call.

The threshold has to be long enough that a quiet but healthy stream (e.g. ‘–feed=self`, where only the user’s own posts come through) doesn’t trip it. macOS sleeps are typically minutes to hours, so 10 minutes catches real stalls without flapping on idle streams.

Constant Summary collapse

DEFAULT_THRESHOLD_SECONDS =
600
DEFAULT_INTERVAL_SECONDS =
30

Instance Method Summary collapse

Constructor Details

#initialize(stream_manager:, threshold_seconds: DEFAULT_THRESHOLD_SECONDS, interval_seconds: DEFAULT_INTERVAL_SECONDS, clock: -> { Time.now }, sleeper: ->(s) { sleep(s) }, logger: nil, did: nil) ⇒ Watchdog

Returns a new instance of Watchdog.



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/tempest/jetstream/watchdog.rb', line 23

def initialize(stream_manager:,
               threshold_seconds: DEFAULT_THRESHOLD_SECONDS,
               interval_seconds: DEFAULT_INTERVAL_SECONDS,
               clock: -> { Time.now },
               sleeper: ->(s) { sleep(s) },
               logger: nil, did: nil)
  @stream_manager = stream_manager
  @threshold_seconds = threshold_seconds
  @interval_seconds = interval_seconds
  @clock = clock
  @sleeper = sleeper
  base = logger || Tempest::DebugLog.null_channel
  @logger = did ? base.with(did: did) : base
  @thread = nil
  @mutex = Mutex.new
  @stopping = false
end

Instance Method Details

#startObject



41
42
43
44
45
46
47
# File 'lib/tempest/jetstream/watchdog.rb', line 41

def start
  @mutex.synchronize do
    return if @thread&.alive?
    @stopping = false
    @thread = Thread.new { run }
  end
end

#stopObject



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/tempest/jetstream/watchdog.rb', line 49

def stop
  thread = @mutex.synchronize do
    @stopping = true
    t = @thread
    @thread = nil
    t
  end
  return unless thread
  thread.kill
  thread.join
end