Class: Quonfig::WorkerSupervisor
- Inherits:
-
Object
- Object
- Quonfig::WorkerSupervisor
- Defined in:
- lib/quonfig/worker_supervisor.rb
Overview
Single supervisor for a long-lived background worker (SSE read loop, fallback poller). Catches unhandled exceptions at the worker boundary, logs them, increments worker_restart_total, and restarts with exponential backoff capped at 30s.
Contract: integration-test-data/chaos/supervisor-test-contract.md Plan: project/plans/sdk-hardening-and-verification.md (Phase 1)
The worker is a Proc-like callable invoked as worker.call(notify_delivered) where notify_delivered is a Proc the worker calls when it has handed at least one envelope to the cache. That signal resets the backoff so a transient blip doesn’t double the delay on the next disconnect.
Shutdown is signaled by Thread#raise(Quonfig::Shutdown) into the supervisor thread. Logger writes and bookkeeping use Thread.handle_interrupt so a concurrent raise doesn’t trip Ruby’s “log writing failed” path.
Constant Summary collapse
- METRIC_NAME =
'quonfig_sdk_worker_restart_total'- DEFAULT_INITIAL_BACKOFF =
0.5- DEFAULT_MAX_BACKOFF =
30.0- DEFAULT_MULTIPLIER =
2.0- SHUTDOWN_TIMEOUT_SEC =
5.0- LOG =
Quonfig::InternalLogger.new(self)
Instance Attribute Summary collapse
-
#worker_restart_labels ⇒ Object
readonly
Returns the value of attribute worker_restart_labels.
-
#worker_restart_total ⇒ Object
readonly
Returns the value of attribute worker_restart_total.
Instance Method Summary collapse
- #alive? ⇒ Boolean
-
#initialize(name:, worker:, layer: '1', initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, multiplier: DEFAULT_MULTIPLIER, sleep_proc: nil, logger: nil) ⇒ WorkerSupervisor
constructor
A new instance of WorkerSupervisor.
- #start ⇒ Object
- #stop ⇒ Object (also: #close)
Constructor Details
#initialize(name:, worker:, layer: '1', initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, multiplier: DEFAULT_MULTIPLIER, sleep_proc: nil, logger: nil) ⇒ WorkerSupervisor
Returns a new instance of WorkerSupervisor.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/quonfig/worker_supervisor.rb', line 37 def initialize(name:, worker:, layer: '1', initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, multiplier: DEFAULT_MULTIPLIER, sleep_proc: nil, logger: nil) @name = name @layer = layer.to_s @worker = worker @initial_backoff = initial_backoff @max_backoff = max_backoff @multiplier = multiplier @sleep_proc = sleep_proc || ->(seconds) { sleep(seconds) } @logger = logger || LOG @worker_restart_total = 0 @worker_restart_labels = { sdk: 'ruby', sdk_version: Quonfig::VERSION, layer: @layer }.freeze @mutex = Mutex.new @stop_requested = false @thread = nil @current_backoff = @initial_backoff end |
Instance Attribute Details
#worker_restart_labels ⇒ Object (readonly)
Returns the value of attribute worker_restart_labels.
35 36 37 |
# File 'lib/quonfig/worker_supervisor.rb', line 35 def worker_restart_labels @worker_restart_labels end |
#worker_restart_total ⇒ Object (readonly)
Returns the value of attribute worker_restart_total.
35 36 37 |
# File 'lib/quonfig/worker_supervisor.rb', line 35 def worker_restart_total @worker_restart_total end |
Instance Method Details
#alive? ⇒ Boolean
85 86 87 88 |
# File 'lib/quonfig/worker_supervisor.rb', line 85 def alive? t = @thread !t.nil? && t.alive? end |
#start ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/quonfig/worker_supervisor.rb', line 63 def start @mutex.synchronize do return self if @thread&.alive? @stop_requested = false ready = Queue.new @thread = Thread.new do # Set report_on_exception + signal "ready" BEFORE entering # run_loop. start() blocks on the ready queue so a racing stop() # can never raise into a thread that hasn't yet installed its # Shutdown rescue. Thread.current.report_on_exception = false ready << true run_loop rescue Quonfig::Shutdown # cooperative shutdown raced with thread startup; swallowed end ready.pop end self end |
#stop ⇒ Object Also known as: close
90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/quonfig/worker_supervisor.rb', line 90 def stop thread = @mutex.synchronize do @stop_requested = true t = @thread @thread = nil t end return if thread.nil? raise_shutdown(thread) thread.join(SHUTDOWN_TIMEOUT_SEC) thread.kill if thread.alive? nil end |