Class: Quonfig::SSEConfigClient::ReadDeadlineWatchdog
- Inherits:
-
Object
- Object
- Quonfig::SSEConfigClient::ReadDeadlineWatchdog
- Defined in:
- lib/quonfig/sse_config_client.rb
Overview
Background watchdog that interrupts the worker thread if no chunk arrives within deadline_s seconds. Uses Thread#raise — the only reliable cross-platform way to unblock a Ruby thread blocked in Net::HTTP‘s body-read on macOS. (Closing or shutting down the underlying socket from another thread does NOT wake the reader on macOS; the kernel discards future reads but the in-flight syscall stays blocked until something else trips. sdk-go and sdk-node solve the equivalent problem with context cancellation / AbortController, which Ruby lacks at the IO layer.) Thread#raise is essentially what Timeout.timeout does internally; using it directly avoids Timeout.timeout’s sketch reputation around ensure blocks.
Constant Summary collapse
- POLL_INTERVAL =
0.25
Instance Method Summary collapse
-
#initialize(worker:, deadline_s:, stopped:, logger:) ⇒ ReadDeadlineWatchdog
constructor
A new instance of ReadDeadlineWatchdog.
- #reset! ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(worker:, deadline_s:, stopped:, logger:) ⇒ ReadDeadlineWatchdog
Returns a new instance of ReadDeadlineWatchdog.
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/quonfig/sse_config_client.rb', line 429 def initialize(worker:, deadline_s:, stopped:, logger:) @worker = worker @deadline_s = deadline_s @stopped = stopped @logger = logger @active = true # Mutex covers @active AND the decision to fire Thread#raise. stop() # holds the mutex when flipping @active false, so a +stop+ that # arrives mid-deadline-check cannot lose the race against the # watchdog's @worker.raise call (which would inject a spurious # SSEReadDeadlineExceeded into the worker thread right after a # clean read_body return). @mutex = Mutex.new @last_read_at = Concurrent::AtomicReference.new(Process.clock_gettime(Process::CLOCK_MONOTONIC)) end |
Instance Method Details
#reset! ⇒ Object
449 450 451 |
# File 'lib/quonfig/sse_config_client.rb', line 449 def reset! @last_read_at.set(Process.clock_gettime(Process::CLOCK_MONOTONIC)) end |
#start ⇒ Object
445 446 447 |
# File 'lib/quonfig/sse_config_client.rb', line 445 def start @thread = Thread.new { watch } end |
#stop ⇒ Object
453 454 455 456 457 |
# File 'lib/quonfig/sse_config_client.rb', line 453 def stop @mutex.synchronize { @active = false } @thread&.join(1) @thread = nil end |