Class: Async::Service::Supervisor::MemoryMonitor
- Defined in:
- lib/async/service/supervisor/memory_monitor.rb
Overview
Monitors worker memory usage and restarts workers that exceed limits.
Uses the ‘memory` gem to track process memory and detect leaks.
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
Returns the value of attribute cluster.
- #The cluster of processes being monitored.(clusterofprocessesbeingmonitored.) ⇒ Object readonly
Instance Method Summary collapse
-
#add(process_id) ⇒ Object
Add a process to the memory monitor.
-
#as_json ⇒ Object
Serialize memory cluster data for JSON.
-
#initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **options) ⇒ MemoryMonitor
constructor
Create a new memory monitor.
-
#memory_leak_detected(process_id, monitor) ⇒ Object
Invoked when a memory leak is detected.
-
#register(supervisor_controller) ⇒ Object
Register a worker with the memory monitor.
-
#remove(supervisor_controller) ⇒ Object
Remove a worker from the memory monitor.
-
#run_once ⇒ Object
Run one iteration of the memory monitor.
Methods inherited from Monitor
Constructor Details
#initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **options) ⇒ MemoryMonitor
Create a new memory monitor.
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/async/service/supervisor/memory_monitor.rb', line 24 def initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **) super(interval: interval) @cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit, free_size_minimum: free_size_minimum) # We use these options when adding processes to the cluster: @options = @processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity} # Queue to serialize cluster modifications to prevent race conditions: @guard = Mutex.new end |
Instance Attribute Details
#cluster ⇒ Object (readonly)
Returns the value of attribute cluster.
38 39 40 |
# File 'lib/async/service/supervisor/memory_monitor.rb', line 38 def cluster @cluster end |
#The cluster of processes being monitored.(clusterofprocessesbeingmonitored.) ⇒ Object (readonly)
38 |
# File 'lib/async/service/supervisor/memory_monitor.rb', line 38 attr_reader :cluster |
Instance Method Details
#add(process_id) ⇒ Object
Add a process to the memory monitor. You may override this to control how processes are added to the cluster.
43 44 45 |
# File 'lib/async/service/supervisor/memory_monitor.rb', line 43 def add(process_id) @cluster.add(process_id, **@options) end |
#as_json ⇒ Object
Serialize memory cluster data for JSON.
89 90 91 |
# File 'lib/async/service/supervisor/memory_monitor.rb', line 89 def as_json @cluster.as_json end |
#memory_leak_detected(process_id, monitor) ⇒ Object
Invoked when a memory leak is detected.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/async/service/supervisor/memory_monitor.rb', line 98 def memory_leak_detected(process_id, monitor) Console.warn(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor) # Kill the process gently: begin Console.info(self, "Killing process!", child: {process_id: process_id}) Process.kill(:INT, process_id) rescue Errno::ESRCH # No such process - he's dead Jim. rescue => error Console.warn(self, "Failed to kill process!", child: {process_id: process_id}, exception: error) end true end |
#register(supervisor_controller) ⇒ Object
Register a worker with the memory monitor.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/async/service/supervisor/memory_monitor.rb', line 50 def register(supervisor_controller) process_id = supervisor_controller.process_id return unless process_id Console.debug(self, "Registering worker.", supervisor_controller: supervisor_controller, process_id: process_id) @guard.synchronize do controllers = @processes[process_id] if controllers.empty? Console.debug(self, "Registering process.", child: {process_id: process_id}) self.add(process_id) end controllers.add(supervisor_controller) end end |
#remove(supervisor_controller) ⇒ Object
Remove a worker from the memory monitor.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/async/service/supervisor/memory_monitor.rb', line 71 def remove(supervisor_controller) process_id = supervisor_controller.process_id return unless process_id @guard.synchronize do controllers = @processes[process_id] controllers.delete(supervisor_controller) if controllers.empty? Console.debug(self, "Removing process.", child: {process_id: process_id}) @cluster.remove(process_id) @processes.delete(process_id) end end end |
#run_once ⇒ Object
Run one iteration of the memory monitor.
115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/async/service/supervisor/memory_monitor.rb', line 115 def run_once @guard.synchronize do # This block must return true if the process was killed. @cluster.check! do |process_id, monitor| begin memory_leak_detected(process_id, monitor) rescue => error Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error) end end end end |