Class: Async::Service::Supervisor::UtilizationMonitor::SegmentAllocator

Inherits:
Object
  • Object
show all
Defined in:
lib/async/service/supervisor/utilization_monitor.rb

Overview

Allocates and manages shared memory segments for worker utilization data.

Manages a shared memory file that workers can write utilization metrics to. Allocates segments to workers and maintains a free list for reuse. Each process (supervisor and workers) maps the shared memory file independently.

Instance Method Summary collapse

Constructor Details

#initialize(path, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512, growth_factor: 2) ⇒ SegmentAllocator

Initialize a new shared memory manager.

Creates and maps the shared memory file. Workers will map the same file independently using the provided path.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 35

def initialize(path, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512, growth_factor: 2)
	@path = path
	@size = size
	@segment_size = segment_size
	@growth_factor = growth_factor
	
	File.unlink(path) rescue nil
	@file = File.open(path, "w+b")
	@file.truncate(size)
	# Supervisor maps the file for reading worker data
	@buffer = IO::Buffer.map(@file, size)
	
	# Track allocated segments: worker_id => {offset: Integer, schema: Array}
	@allocations = {}
	
	# Free list of segment offsets
	@free_list = []
	
	# Initialize free list with all segments
	(0...(@size / @segment_size)).each do |segment_index|
		@free_list << (segment_index * @segment_size)
	end
end

Instance Method Details

#allocate(worker_id, schema) ⇒ Object

Allocate a segment for a worker.

Automatically resizes the shared memory file if no segments are available.



66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 66

def allocate(worker_id, schema)
	# Try to resize if we're out of segments
	if @free_list.empty?
		unless resize(@size * @growth_factor)
			return nil
		end
	end
	
	offset = @free_list.shift
	@allocations[worker_id] = {offset: offset, schema: schema}
	
	return offset
end

#allocation(worker_id) ⇒ Object

Get the allocation information for a worker.



93
94
95
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 93

def allocation(worker_id)
	@allocations[worker_id]
end

#closeObject

Close the shared memory file.



186
187
188
189
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 186

def close
	@file&.close
	@buffer = nil
end

#free(worker_id) ⇒ Object

Free a segment allocated to a worker.



83
84
85
86
87
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 83

def free(worker_id)
	if allocation = @allocations.delete(worker_id)
		@free_list << allocation[:offset]
	end
end

#read(worker_id) ⇒ Object

Read utilization data from a worker’s allocated segment.



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 118

def read(worker_id)
	allocation = @allocations[worker_id]
	return nil unless allocation
	
	offset = allocation[:offset]
	schema = allocation[:schema]
	
	result = {}
	schema.each do |key, type, field_offset|
		absolute_offset = offset + field_offset
		
		# Use IO::Buffer type symbols directly (i32, u32, i64, u64, f32, f64)
		# IO::Buffer accepts both lowercase and uppercase versions
		begin
			result[key] = @buffer.get_value(type, absolute_offset)
		rescue => error
			Console.warn(self, "Failed to read value", type: type, key: key, offset: absolute_offset, exception: error)
		end
	end
	
	return result
end

#resize(new_size) ⇒ Object

Resize the shared memory file.

Extends the file to the new size, remaps the buffer, and adds new segments to the free list. The new size must be larger than the current size and should be page-aligned for optimal performance.



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 149

def resize(new_size)
	old_size = @size
	return false if new_size <= old_size
	
	# Ensure new size is page-aligned (rounds up to nearest page boundary)
	page_size = IO::Buffer::PAGE_SIZE
	new_size = (((new_size + page_size - 1) / page_size) * page_size).to_i
	
	begin
		# Extend the file:
		@file.truncate(new_size)
		
		# Remap the buffer to the new size:
		@buffer&.free
		@buffer = IO::Buffer.map(@file, new_size)
		
		# Calculate new segments to add to free list:
		old_segment_count = old_size / @segment_size
		new_segment_count = new_size / @segment_size
		
		# Add new segments to free list:
		(old_segment_count...new_segment_count).each do |segment_index|
			@free_list << (segment_index * @segment_size)
		end
		
		@size = new_size
		
		Console.info(self, "Resized shared memory", old_size: old_size, new_size: new_size, segments_added: new_segment_count - old_segment_count)
		
		return true
	rescue => error
		Console.error(self, "Failed to resize shared memory", old_size: old_size, new_size: new_size, exception: error)
		return false
	end
end

#sizeObject

Get the current size of the shared memory file.



100
101
102
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 100

def size
	@size
end

#update_schema(worker_id, schema) ⇒ Object

Update the schema for an existing allocation.



108
109
110
111
112
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 108

def update_schema(worker_id, schema)
	if allocation = @allocations[worker_id]
		allocation[:schema] = schema
	end
end