Class: Quonfig::WorkerSupervisor

Inherits:
Object
  • Object
show all
Defined in:
lib/quonfig/worker_supervisor.rb

Overview

Single supervisor for a long-lived background worker (SSE read loop, fallback poller). Catches unhandled exceptions at the worker boundary, logs them, increments worker_restart_total, and restarts with exponential backoff capped at 30s.

Contract: integration-test-data/chaos/supervisor-test-contract.md Plan: project/plans/sdk-hardening-and-verification.md (Phase 1)

The worker is a Proc-like callable invoked as worker.call(notify_delivered) where notify_delivered is a Proc the worker calls when it has handed at least one envelope to the cache. That signal resets the backoff so a transient blip doesn’t double the delay on the next disconnect.

Shutdown is signaled by Thread#raise(Quonfig::Shutdown) into the supervisor thread. Logger writes and bookkeeping use Thread.handle_interrupt so a concurrent raise doesn’t trip Ruby’s “log writing failed” path.

Constant Summary collapse

METRIC_NAME =
'quonfig_sdk_worker_restart_total'
DEFAULT_INITIAL_BACKOFF =
0.5
DEFAULT_MAX_BACKOFF =
30.0
DEFAULT_MULTIPLIER =
2.0
SHUTDOWN_TIMEOUT_SEC =
5.0
LOG =
Quonfig::InternalLogger.new(self)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, worker:, layer: '1', initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, multiplier: DEFAULT_MULTIPLIER, sleep_proc: nil, logger: nil) ⇒ WorkerSupervisor

Returns a new instance of WorkerSupervisor.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/quonfig/worker_supervisor.rb', line 37

def initialize(name:, worker:, layer: '1',
               initial_backoff: DEFAULT_INITIAL_BACKOFF,
               max_backoff: DEFAULT_MAX_BACKOFF,
               multiplier: DEFAULT_MULTIPLIER,
               sleep_proc: nil,
               logger: nil)
  @name = name
  @layer = layer.to_s
  @worker = worker
  @initial_backoff = initial_backoff
  @max_backoff = max_backoff
  @multiplier = multiplier
  @sleep_proc = sleep_proc || ->(seconds) { sleep(seconds) }
  @logger = logger || LOG
  @worker_restart_total = 0
  @worker_restart_labels = {
    sdk: 'ruby',
    sdk_version: Quonfig::VERSION,
    layer: @layer
  }.freeze
  @mutex = Mutex.new
  @stop_requested = false
  @thread = nil
  @current_backoff = @initial_backoff
end

Instance Attribute Details

#worker_restart_labelsObject (readonly)

Returns the value of attribute worker_restart_labels.



35
36
37
# File 'lib/quonfig/worker_supervisor.rb', line 35

def worker_restart_labels
  @worker_restart_labels
end

#worker_restart_totalObject (readonly)

Returns the value of attribute worker_restart_total.



35
36
37
# File 'lib/quonfig/worker_supervisor.rb', line 35

def worker_restart_total
  @worker_restart_total
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


85
86
87
88
# File 'lib/quonfig/worker_supervisor.rb', line 85

def alive?
  t = @thread
  !t.nil? && t.alive?
end

#startObject



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/quonfig/worker_supervisor.rb', line 63

def start
  @mutex.synchronize do
    return self if @thread&.alive?

    @stop_requested = false
    ready = Queue.new
    @thread = Thread.new do
      # Set report_on_exception + signal "ready" BEFORE entering
      # run_loop. start() blocks on the ready queue so a racing stop()
      # can never raise into a thread that hasn't yet installed its
      # Shutdown rescue.
      Thread.current.report_on_exception = false
      ready << true
      run_loop
    rescue Quonfig::Shutdown
      # cooperative shutdown raced with thread startup; swallowed
    end
    ready.pop
  end
  self
end

#stopObject Also known as: close



90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/quonfig/worker_supervisor.rb', line 90

def stop
  thread = @mutex.synchronize do
    @stop_requested = true
    t = @thread
    @thread = nil
    t
  end
  return if thread.nil?

  raise_shutdown(thread)
  thread.join(SHUTDOWN_TIMEOUT_SEC)
  thread.kill if thread.alive?
  nil
end