Class: Async::Service::Supervisor::UtilizationMonitor

Inherits:
Monitor
  • Object
show all
Defined in:
lib/async/service/supervisor/utilization_monitor.rb,
lib/metrics/provider/async/service/supervisor/utilization_monitor.rb

Overview

Monitors worker utilization metrics aggregated by service name.

Uses shared memory to efficiently collect utilization metrics from workers and aggregates them by service name for monitoring and reporting.

Defined Under Namespace

Classes: SegmentAllocator

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Monitor

#run, #status, #to_json

Constructor Details

#initialize(path: "utilization.shm", interval: 10, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512) ⇒ UtilizationMonitor

Initialize a new utilization monitor.



197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 197

def initialize(path: "utilization.shm", interval: 10, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512)
	super(interval: interval)
	@path = path
	@segment_size = segment_size
	
	@allocator = SegmentAllocator.new(path, size: size, segment_size: segment_size)
	
	# Track workers: worker_id => supervisor_controller
	@workers = {}
	
	@guard = Mutex.new
end

Class Method Details

.monitor_typeObject

The key used when this monitor’s status is aggregated with others.



276
277
278
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 276

def self.monitor_type
	:utilization_monitor
end

Instance Method Details

#as_jsonObject

Serialize utilization data for JSON.



319
320
321
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 319

def as_json
	self.sample
end

#emit(metrics) ⇒ Object

Emit the utilization metrics.



326
327
328
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 326

def emit(metrics)
	Console.info(self, "Utilization:", metrics: metrics)
end

#register(supervisor_controller) ⇒ Object

Register a worker with the utilization monitor.

Allocates a segment of shared memory and instructs the worker to map the shared memory file and expose utilization information at the allocated offset. The worker maps the file independently and returns its schema.



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 217

def register(supervisor_controller)
	@guard.synchronize do
		worker_id = supervisor_controller.id
		return unless worker_id
		
		# Allocate a segment first (we'll get schema from worker)
		offset = @allocator.allocate(worker_id, [])
		
		unless offset
			Console.warn(self, "Failed to allocate utilization segment", worker_id: worker_id)
			return
		end
		
		# Inform worker of the shared memory path, size, and allocated offset
		# The worker will map the file itself and return its schema
		begin
			worker = supervisor_controller.worker
			
			if worker
				# Pass the segment size - observer will handle page alignment and file mapping
				schema = worker.setup_utilization_observer(@path, @segment_size, offset)
				
				# Update the allocation with the actual schema
				if schema && !schema.empty?
					@allocator.update_schema(worker_id, schema)
					@workers[worker_id] = supervisor_controller
					
					Console.info(self, "Registered worker utilization", worker_id: worker_id, offset: offset, schema: schema)
				else
					# Worker didn't provide schema, free the allocation
					@allocator.free(worker_id)
					Console.info(self, "Worker did not provide utilization schema", worker_id: worker_id)
				end
			end
		rescue => error
			Console.error(self, "Error setting up worker utilization", worker_id: worker_id, exception: error)
			@allocator.free(worker_id)
		end
	end
end

#remove(supervisor_controller) ⇒ Object

Remove a worker from the utilization monitor.

Returns the allocated segment back to the free list.



263
264
265
266
267
268
269
270
271
272
273
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 263

def remove(supervisor_controller)
	@guard.synchronize do
		worker_id = supervisor_controller.id
		return unless worker_id
		
		@workers.delete(worker_id)
		@allocator.free(worker_id)
		
		Console.debug(self, "Freed utilization segment", worker_id: worker_id)
	end
end

#run_onceObject

Run one iteration of the utilization monitor.



331
332
333
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 331

def run_once
	self.emit(self.as_json)
end

#sampleObject

Sample aggregated utilization data.



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 283

def sample
	@guard.synchronize do
		aggregated = {}
		
		@workers.each do |worker_id, supervisor_controller|
			service_name = supervisor_controller.state[:name] || "unknown"
			
			data = @allocator.read(worker_id)
			next unless data
			
			# Initialize service aggregation if needed
			aggregated[service_name] ||= {}
			
			# Sum up all numeric fields
			data.each do |key, value|
				if value.is_a?(Numeric)
					aggregated[service_name][key] ||= 0
					aggregated[service_name][key] += value
				else
					# For non-numeric values, we could handle differently
					# For now, just store the last value
					aggregated[service_name][key] = value
				end
			end
			
			# Count workers per service (for utilization denominator)
			aggregated[service_name][:worker_count] = (aggregated[service_name][:worker_count] || 0) + 1
		end
		
		aggregated
	end
end