Class: Async::Service::Supervisor::MemoryMonitor

Inherits:
Monitor
  • Object
show all
Defined in:
lib/async/service/supervisor/memory_monitor.rb

Overview

Monitors worker memory usage and restarts workers that exceed limits.

Uses the ‘memory` gem to track process memory and detect leaks.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Monitor

#run, #status, #to_json

Constructor Details

#initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **options) ⇒ MemoryMonitor

Create a new memory monitor.



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/async/service/supervisor/memory_monitor.rb', line 24

def initialize(interval: 10, total_size_limit: nil, free_size_minimum: nil, **options)
	super(interval: interval)
	@cluster = Memory::Leak::Cluster.new(total_size_limit: total_size_limit, free_size_minimum: free_size_minimum)
	
	# We use these options when adding processes to the cluster:
	@options = options
	
	@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
	
	# Queue to serialize cluster modifications to prevent race conditions:
	@guard = Mutex.new
end

Instance Attribute Details

#clusterObject (readonly)

Returns the value of attribute cluster.



38
39
40
# File 'lib/async/service/supervisor/memory_monitor.rb', line 38

def cluster
  @cluster
end

#The cluster of processes being monitored.(clusterofprocessesbeingmonitored.) ⇒ Object (readonly)



38
# File 'lib/async/service/supervisor/memory_monitor.rb', line 38

attr_reader :cluster

Instance Method Details

#add(process_id) ⇒ Object

Add a process to the memory monitor. You may override this to control how processes are added to the cluster.



43
44
45
# File 'lib/async/service/supervisor/memory_monitor.rb', line 43

def add(process_id)
	@cluster.add(process_id, **@options)
end

#as_jsonObject

Serialize memory cluster data for JSON.



89
90
91
# File 'lib/async/service/supervisor/memory_monitor.rb', line 89

def as_json
	@cluster.as_json
end

#memory_leak_detected(process_id, monitor) ⇒ Object

Invoked when a memory leak is detected.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/async/service/supervisor/memory_monitor.rb', line 98

def memory_leak_detected(process_id, monitor)
	Console.warn(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor)
	
	# Kill the process gently:
	begin
		Console.info(self, "Killing process!", child: {process_id: process_id})
		Process.kill(:INT, process_id)
	rescue Errno::ESRCH
		# No such process - he's dead Jim.
	rescue => error
		Console.warn(self, "Failed to kill process!", child: {process_id: process_id}, exception: error)
	end
	
	true
end

#register(supervisor_controller) ⇒ Object

Register a worker with the memory monitor.



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/async/service/supervisor/memory_monitor.rb', line 50

def register(supervisor_controller)
	process_id = supervisor_controller.process_id
	return unless process_id
	
	Console.debug(self, "Registering worker.", supervisor_controller: supervisor_controller, process_id: process_id)
	
	@guard.synchronize do
		controllers = @processes[process_id]
		
		if controllers.empty?
			Console.debug(self, "Registering process.", child: {process_id: process_id})
			self.add(process_id)
		end
		
		controllers.add(supervisor_controller)
	end
end

#remove(supervisor_controller) ⇒ Object

Remove a worker from the memory monitor.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/async/service/supervisor/memory_monitor.rb', line 71

def remove(supervisor_controller)
	process_id = supervisor_controller.process_id
	return unless process_id
	
	@guard.synchronize do
		controllers = @processes[process_id]
		
		controllers.delete(supervisor_controller)
		
		if controllers.empty?
			Console.debug(self, "Removing process.", child: {process_id: process_id})
			@cluster.remove(process_id)
			@processes.delete(process_id)
		end
	end
end

#run_onceObject

Run one iteration of the memory monitor.



115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/async/service/supervisor/memory_monitor.rb', line 115

def run_once
	@guard.synchronize do
		# This block must return true if the process was killed.
		@cluster.check! do |process_id, monitor|
			begin
				memory_leak_detected(process_id, monitor)
			rescue => error
				Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
			end
		end
	end
end