Class: Async::Service::Supervisor::UtilizationMonitor::SegmentAllocator
- Inherits:
-
Object
- Object
- Async::Service::Supervisor::UtilizationMonitor::SegmentAllocator
- Defined in:
- lib/async/service/supervisor/utilization_monitor.rb
Overview
Allocates and manages shared memory segments for worker utilization data.
Manages a shared memory file that workers can write utilization metrics to. Allocates segments to workers and maintains a free list for reuse. Each process (supervisor and workers) maps the shared memory file independently.
Instance Method Summary collapse
-
#allocate(worker_id, schema) ⇒ Object
Allocate a segment for a worker.
-
#allocation(worker_id) ⇒ Object
Get the allocation information for a worker.
-
#close ⇒ Object
Close the shared memory file.
-
#free(worker_id) ⇒ Object
Free a segment allocated to a worker.
-
#initialize(path, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512, growth_factor: 2) ⇒ SegmentAllocator
constructor
Initialize a new shared memory manager.
-
#read(worker_id) ⇒ Object
Read utilization data from a worker’s allocated segment.
-
#resize(new_size) ⇒ Object
Resize the shared memory file.
-
#size ⇒ Object
Get the current size of the shared memory file.
-
#update_schema(worker_id, schema) ⇒ Object
Update the schema for an existing allocation.
Constructor Details
#initialize(path, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512, growth_factor: 2) ⇒ SegmentAllocator
Initialize a new shared memory manager.
Creates and maps the shared memory file. Workers will map the same file independently using the provided path.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 35 def initialize(path, size: IO::Buffer::PAGE_SIZE * 8, segment_size: 512, growth_factor: 2) @path = path @size = size @segment_size = segment_size @growth_factor = growth_factor File.unlink(path) rescue nil @file = File.open(path, "w+b") @file.truncate(size) # Supervisor maps the file for reading worker data @buffer = IO::Buffer.map(@file, size) # Track allocated segments: worker_id => {offset: Integer, schema: Array} @allocations = {} # Free list of segment offsets @free_list = [] # Initialize free list with all segments (0...(@size / @segment_size)).each do |segment_index| @free_list << (segment_index * @segment_size) end end |
Instance Method Details
#allocate(worker_id, schema) ⇒ Object
Allocate a segment for a worker.
Automatically resizes the shared memory file if no segments are available.
66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 66 def allocate(worker_id, schema) # Try to resize if we're out of segments if @free_list.empty? unless resize(@size * @growth_factor) return nil end end offset = @free_list.shift @allocations[worker_id] = {offset: offset, schema: schema} return offset end |
#allocation(worker_id) ⇒ Object
Get the allocation information for a worker.
93 94 95 |
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 93 def allocation(worker_id) @allocations[worker_id] end |
#close ⇒ Object
Close the shared memory file.
186 187 188 189 |
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 186 def close @file&.close @buffer = nil end |
#free(worker_id) ⇒ Object
Free a segment allocated to a worker.
83 84 85 86 87 |
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 83 def free(worker_id) if allocation = @allocations.delete(worker_id) @free_list << allocation[:offset] end end |
#read(worker_id) ⇒ Object
Read utilization data from a worker’s allocated segment.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 118 def read(worker_id) allocation = @allocations[worker_id] return nil unless allocation offset = allocation[:offset] schema = allocation[:schema] result = {} schema.each do |key, type, field_offset| absolute_offset = offset + field_offset # Use IO::Buffer type symbols directly (i32, u32, i64, u64, f32, f64) # IO::Buffer accepts both lowercase and uppercase versions begin result[key] = @buffer.get_value(type, absolute_offset) rescue => error Console.warn(self, "Failed to read value", type: type, key: key, offset: absolute_offset, exception: error) end end return result end |
#resize(new_size) ⇒ Object
Resize the shared memory file.
Extends the file to the new size, remaps the buffer, and adds new segments to the free list. The new size must be larger than the current size and should be page-aligned for optimal performance.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 149 def resize(new_size) old_size = @size return false if new_size <= old_size # Ensure new size is page-aligned (rounds up to nearest page boundary) page_size = IO::Buffer::PAGE_SIZE new_size = (((new_size + page_size - 1) / page_size) * page_size).to_i begin # Extend the file: @file.truncate(new_size) # Remap the buffer to the new size: @buffer&.free @buffer = IO::Buffer.map(@file, new_size) # Calculate new segments to add to free list: old_segment_count = old_size / @segment_size new_segment_count = new_size / @segment_size # Add new segments to free list: (old_segment_count...new_segment_count).each do |segment_index| @free_list << (segment_index * @segment_size) end @size = new_size Console.info(self, "Resized shared memory", old_size: old_size, new_size: new_size, segments_added: new_segment_count - old_segment_count) return true rescue => error Console.error(self, "Failed to resize shared memory", old_size: old_size, new_size: new_size, exception: error) return false end end |
#size ⇒ Object
Get the current size of the shared memory file.
100 101 102 |
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 100 def size @size end |
#update_schema(worker_id, schema) ⇒ Object
Update the schema for an existing allocation.
108 109 110 111 112 |
# File 'lib/async/service/supervisor/utilization_monitor.rb', line 108 def update_schema(worker_id, schema) if allocation = @allocations[worker_id] allocation[:schema] = schema end end |