Class: Quonfig::SSEConfigClient::ReadDeadlineWatchdog

Inherits:
Object
  • Object
show all
Defined in:
lib/quonfig/sse_config_client.rb

Overview

Background watchdog that interrupts the worker thread if no chunk arrives within deadline_s seconds. Uses Thread#raise — the only reliable cross-platform way to unblock a Ruby thread blocked in Net::HTTP‘s body-read on macOS. (Closing or shutting down the underlying socket from another thread does NOT wake the reader on macOS; the kernel discards future reads but the in-flight syscall stays blocked until something else trips. sdk-go and sdk-node solve the equivalent problem with context cancellation / AbortController, which Ruby lacks at the IO layer.) Thread#raise is essentially what Timeout.timeout does internally; using it directly avoids Timeout.timeout’s sketch reputation around ensure blocks.

Constant Summary collapse

POLL_INTERVAL =
0.25

Instance Method Summary collapse

Constructor Details

#initialize(worker:, deadline_s:, stopped:, logger:) ⇒ ReadDeadlineWatchdog

Returns a new instance of ReadDeadlineWatchdog.



429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/quonfig/sse_config_client.rb', line 429

def initialize(worker:, deadline_s:, stopped:, logger:)
  @worker = worker
  @deadline_s = deadline_s
  @stopped = stopped
  @logger = logger
  @active = true
  # Mutex covers @active AND the decision to fire Thread#raise. stop()
  # holds the mutex when flipping @active false, so a +stop+ that
  # arrives mid-deadline-check cannot lose the race against the
  # watchdog's @worker.raise call (which would inject a spurious
  # SSEReadDeadlineExceeded into the worker thread right after a
  # clean read_body return).
  @mutex = Mutex.new
  @last_read_at = Concurrent::AtomicReference.new(Process.clock_gettime(Process::CLOCK_MONOTONIC))
end

Instance Method Details

#reset!Object



449
450
451
# File 'lib/quonfig/sse_config_client.rb', line 449

def reset!
  @last_read_at.set(Process.clock_gettime(Process::CLOCK_MONOTONIC))
end

#startObject



445
446
447
# File 'lib/quonfig/sse_config_client.rb', line 445

def start
  @thread = Thread.new { watch }
end

#stopObject



453
454
455
456
457
# File 'lib/quonfig/sse_config_client.rb', line 453

def stop
  @mutex.synchronize { @active = false }
  @thread&.join(1)
  @thread = nil
end