Class: Ignis::Collective::Communicator
- Inherits:
-
Object
- Object
- Ignis::Collective::Communicator
- Defined in:
- lib/nvruby/collective/communicator.rb
Overview
Primary user-facing abstraction for collective operations Provides AllReduce, Broadcast, Reduce, and other collective primitives
Constant Summary collapse
- REDUCTION_OPS =
Reduction operations
[:sum, :prod, :min, :max, :avg].freeze
Instance Attribute Summary collapse
-
#device_manager ⇒ DeviceManager
readonly
Device manager.
-
#gpu_ids ⇒ Array<Integer>
readonly
GPU device IDs in this communicator.
-
#rank ⇒ Integer
readonly
Rank of this communicator (for multi-process).
-
#transport_selector ⇒ TransportSelector
readonly
Transport selector.
-
#world_size ⇒ Integer
readonly
Total number of ranks.
Instance Method Summary collapse
-
#all_gather(tensors, stream: nil) ⇒ Array<Array<NvArray>>
AllGather - gather tensors from all GPUs to all GPUs.
-
#all_reduce(tensors, op: :sum, stream: nil) ⇒ Array<NvArray>
Perform AllReduce operation - reduce and distribute result to all GPUs.
-
#all_reduce_async(tensors, op: :sum, stream:) ⇒ Array<NvArray>
Async AllReduce - requires explicit synchronization.
-
#all_to_all(send_buffers, recv_buffers, chunk_size:, stream: nil) ⇒ void
AllToAll - full exchange between all GPUs Each GPU sends N chunks (one to each GPU) and receives N chunks (one from each GPU).
-
#barrier ⇒ void
Barrier synchronization across all GPUs.
-
#broadcast(tensor, root: 0, stream: nil) ⇒ Array<NvArray>
Broadcast tensor from root GPU to all GPUs.
-
#destroy! ⇒ void
Clean up all resources.
-
#initialize(gpu_ids:, rank: 0, world_size: 1) ⇒ Communicator
constructor
Create a new communicator for the specified GPUs.
-
#initialize! ⇒ self
Initialize the communicator (detect topology, enable P2P, etc.).
-
#inspect ⇒ String
Detailed inspection.
-
#performance_summary ⇒ Hash
Get performance summary.
-
#ready? ⇒ Boolean
Check if communicator is ready.
-
#recv(buffer, src_rank:, size:, stream: nil) ⇒ void
Point-to-point receive (no-op, actual receive happens in send_recv).
-
#reduce(tensors, root: 0, op: :sum, stream: nil) ⇒ NvArray
Reduce tensors to root GPU.
-
#reduce_scatter(tensors, op: :sum, stream: nil) ⇒ Array<FFI::Pointer>
ReduceScatter - reduce and scatter result.
-
#send(tensor, dest_rank:, size: nil, stream: nil) ⇒ void
Point-to-point send from current rank to destination.
-
#send_recv(buffer, src_rank:, dst_buffer:, dst_rank:, size:, stream: nil) ⇒ void
Point-to-point send from specific source rank.
-
#to_s ⇒ String
Human-readable description.
-
#topology ⇒ Topology::Matrix
Get the topology matrix.
Constructor Details
#initialize(gpu_ids:, rank: 0, world_size: 1) ⇒ Communicator
Create a new communicator for the specified GPUs
37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/nvruby/collective/communicator.rb', line 37 def initialize(gpu_ids:, rank: 0, world_size: 1) @gpu_ids = gpu_ids.dup.freeze @rank = rank @world_size = world_size validate_gpu_ids! @device_manager = DeviceManager.new(device_ids: @gpu_ids) @transport_selector = TransportSelector.new(@gpu_ids) @ring_order = nil @initialized = false end |
Instance Attribute Details
#device_manager ⇒ DeviceManager (readonly)
Returns Device manager.
22 23 24 |
# File 'lib/nvruby/collective/communicator.rb', line 22 def device_manager @device_manager end |
#gpu_ids ⇒ Array<Integer> (readonly)
Returns GPU device IDs in this communicator.
19 20 21 |
# File 'lib/nvruby/collective/communicator.rb', line 19 def gpu_ids @gpu_ids end |
#rank ⇒ Integer (readonly)
Returns Rank of this communicator (for multi-process).
28 29 30 |
# File 'lib/nvruby/collective/communicator.rb', line 28 def rank @rank end |
#transport_selector ⇒ TransportSelector (readonly)
Returns Transport selector.
25 26 27 |
# File 'lib/nvruby/collective/communicator.rb', line 25 def transport_selector @transport_selector end |
#world_size ⇒ Integer (readonly)
Returns Total number of ranks.
31 32 33 |
# File 'lib/nvruby/collective/communicator.rb', line 31 def world_size @world_size end |
Instance Method Details
#all_gather(tensors, stream: nil) ⇒ Array<Array<NvArray>>
AllGather - gather tensors from all GPUs to all GPUs
130 131 132 133 134 135 136 137 138 |
# File 'lib/nvruby/collective/communicator.rb', line 130 def all_gather(tensors, stream: nil) validate_tensors!(tensors) ensure_initialized! return [tensors] if @gpu_ids.size == 1 # TODO: Implement ring all-gather simple_all_gather(tensors, stream) end |
#all_reduce(tensors, op: :sum, stream: nil) ⇒ Array<NvArray>
Perform AllReduce operation - reduce and distribute result to all GPUs
69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/nvruby/collective/communicator.rb', line 69 def all_reduce(tensors, op: :sum, stream: nil) validate_operation!(op) validate_tensors!(tensors) ensure_initialized! # Single GPU case - no-op return tensors if @gpu_ids.size == 1 # Use Ring AllReduce for multi-GPU ring_all_reduce(tensors, op, stream) end |
#all_reduce_async(tensors, op: :sum, stream:) ⇒ Array<NvArray>
Async AllReduce - requires explicit synchronization
86 87 88 89 90 |
# File 'lib/nvruby/collective/communicator.rb', line 86 def all_reduce_async(tensors, op: :sum, stream:) raise ArgumentError, "Stream required for async operation" unless stream all_reduce(tensors, op: op, stream: stream) end |
#all_to_all(send_buffers, recv_buffers, chunk_size:, stream: nil) ⇒ void
This method returns an undefined value.
AllToAll - full exchange between all GPUs Each GPU sends N chunks (one to each GPU) and receives N chunks (one from each GPU)
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/nvruby/collective/communicator.rb', line 196 def all_to_all(send_buffers, recv_buffers, chunk_size:, stream: nil) ensure_initialized! n = @gpu_ids.size return if n == 1 streams = stream ? [stream] * n : create_null_streams(n) # Phase 1: Copy local data (GPU[i] → GPU[i]) n.times do |rank| gpu_id = @gpu_ids[rank] CUDA::RuntimeAPI.cudaSetDevice(gpu_id) stream_ptr = get_stream_ptr(streams[rank]) CUDA::RuntimeAPI.cudaMemcpyAsync( recv_buffers[rank][rank], send_buffers[rank][rank], chunk_size, CUDA::RuntimeAPI::MEMCPY_DEVICE_TO_DEVICE, stream_ptr ) end # Phase 2: N-1 rounds of pairwise exchange (n - 1).times do |round| n.times do |rank| gpu_id = @gpu_ids[rank] # Calculate partner for this round (rotation pattern) partner = (rank + round + 1) % n partner_gpu = @gpu_ids[partner] stream_ptr = get_stream_ptr(streams[rank]) # Send to partner transport = @transport_selector.select_transport(gpu_id, partner_gpu) if transport.is_a?(Transport::P2PTransport) transport.copy_async( recv_buffers[partner][rank], # Partner receives from me send_buffers[rank][partner], # I send to partner chunk_size, stream_ptr ) end end # Synchronize after each round synchronize_all_streams!(streams) end end |
#barrier ⇒ void
This method returns an undefined value.
Barrier synchronization across all GPUs
337 338 339 340 |
# File 'lib/nvruby/collective/communicator.rb', line 337 def ensure_initialized! @device_manager.synchronize_all! end |
#broadcast(tensor, root: 0, stream: nil) ⇒ Array<NvArray>
Broadcast tensor from root GPU to all GPUs
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/nvruby/collective/communicator.rb', line 97 def broadcast(tensor, root: 0, stream: nil) ensure_initialized! validate_gpu_index!(root) return [tensor] if @gpu_ids.size == 1 # TODO: Implement tree broadcast algorithm # For now, use simple fan-out from root simple_broadcast(tensor, root, stream) end |
#destroy! ⇒ void
This method returns an undefined value.
Clean up all resources
364 365 366 367 368 |
# File 'lib/nvruby/collective/communicator.rb', line 364 def destroy! @transport_selector.destroy! @device_manager.destroy! @initialized = false end |
#initialize! ⇒ self
Initialize the communicator (detect topology, enable P2P, etc.)
52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/nvruby/collective/communicator.rb', line 52 def initialize! return self if @initialized @device_manager.initialize! @device_manager.enable_all_p2p_access! @transport_selector.initialize! @ring_order = @transport_selector.optimal_ring_order @initialized = true self end |
#inspect ⇒ String
Returns Detailed inspection.
377 378 379 380 381 382 |
# File 'lib/nvruby/collective/communicator.rb', line 377 def inspect "#<Ignis::Collective::Communicator " \ "gpu_ids=#{@gpu_ids} " \ "rank=#{@rank}/#{@world_size} " \ "initialized=#{@initialized}>" end |
#performance_summary ⇒ Hash
Get performance summary
358 359 360 |
# File 'lib/nvruby/collective/communicator.rb', line 358 def performance_summary @transport_selector.performance_summary end |
#ready? ⇒ Boolean
Check if communicator is ready
344 345 346 347 348 |
# File 'lib/nvruby/collective/communicator.rb', line 344 def ready? @initialized && @device_manager.ready? && @transport_selector.ready? end |
#recv(buffer, src_rank:, size:, stream: nil) ⇒ void
This method returns an undefined value.
Point-to-point receive (no-op, actual receive happens in send_recv)
327 328 329 330 331 332 333 |
# File 'lib/nvruby/collective/communicator.rb', line 327 def recv(buffer, src_rank:, size:, stream: nil) ensure_initialized! validate_gpu_index!(src_rank) # Actual data transfer happens via send_recv from sender side # This just marks the receive buffer as ready end |
#reduce(tensors, root: 0, op: :sum, stream: nil) ⇒ NvArray
Reduce tensors to root GPU
114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/nvruby/collective/communicator.rb', line 114 def reduce(tensors, root: 0, op: :sum, stream: nil) validate_operation!(op) validate_tensors!(tensors) ensure_initialized! validate_gpu_index!(root) return tensors[0] if @gpu_ids.size == 1 # TODO: Implement tree reduce algorithm simple_reduce(tensors, root, op, stream) end |
#reduce_scatter(tensors, op: :sum, stream: nil) ⇒ Array<FFI::Pointer>
ReduceScatter - reduce and scatter result
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/nvruby/collective/communicator.rb', line 145 def reduce_scatter(tensors, op: :sum, stream: nil) validate_operation!(op) validate_tensors!(tensors) ensure_initialized! return tensors if @gpu_ids.size == 1 ring = Algorithms::Ring.new( ring_order: @ring_order, transport_selector: @transport_selector ) buffers = tensors.map { |t| device_buffer(t) } sizes = tensors.map { |t| byte_size_of(t) } dtype = if tensors[0].respond_to?(:dtype) tensors[0].dtype else :float32 end # Calculate chunk size total_size = sizes[0] chunk_size = ring.calculate_chunk_size(total_size) # Allocate result buffers result_buffers = @gpu_ids.map do |gpu_id| allocate_buffer_on_device(gpu_id, chunk_size) end streams = stream ? [stream] * @gpu_ids.size : create_null_streams(@gpu_ids.size) ring.reduce_scatter( buffers: buffers, result_buffers: result_buffers, sizes: sizes, dtype: dtype, op: op, streams: streams ) result_buffers end |
#send(tensor, dest_rank:, size: nil, stream: nil) ⇒ void
This method returns an undefined value.
Point-to-point send from current rank to destination
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/nvruby/collective/communicator.rb', line 254 def send(tensor, dest_rank:, size: nil, stream: nil) ensure_initialized! validate_gpu_index!(dest_rank) src_rank = 0 # Default sender is rank 0 src_gpu = @gpu_ids[src_rank] dst_gpu = @gpu_ids[dest_rank] return if src_rank == dest_rank buffer = device_buffer(tensor) byte_size = size || byte_size_of(tensor) transport = @transport_selector.select_transport(src_gpu, dst_gpu) stream_ptr = stream ? get_stream_ptr(stream) : FFI::Pointer::NULL if transport.is_a?(Transport::P2PTransport) # P2P copy requires destination buffer # Assumes tensor has been pre-allocated on dest raise ArgumentError, "P2P send requires pre-allocated recv buffer on dest" end end |
#send_recv(buffer, src_rank:, dst_buffer:, dst_rank:, size:, stream: nil) ⇒ void
This method returns an undefined value.
Point-to-point send from specific source rank
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'lib/nvruby/collective/communicator.rb', line 285 def send_recv(buffer, src_rank:, dst_buffer:, dst_rank:, size:, stream: nil) ensure_initialized! validate_gpu_index!(src_rank) validate_gpu_index!(dst_rank) return if src_rank == dst_rank src_gpu = @gpu_ids[src_rank] dst_gpu = @gpu_ids[dst_rank] transport = @transport_selector.select_transport(src_gpu, dst_gpu) stream_ptr = stream ? get_stream_ptr(stream) : FFI::Pointer::NULL if transport.is_a?(Transport::P2PTransport) # Set source device context CUDA::RuntimeAPI.cudaSetDevice(src_gpu) transport.copy_async(dst_buffer, buffer, size, stream_ptr) elsif transport.is_a?(Transport::IPCTransport) # For IPC, export/import handles handle = transport.export_handle(buffer) CUDA::RuntimeAPI.cudaSetDevice(dst_gpu) mapped = transport.import_handle(handle) # Copy from mapped to destination CUDA::RuntimeAPI.cudaMemcpyAsync( dst_buffer, mapped, size, CUDA::RuntimeAPI::MEMCPY_DEVICE_TO_DEVICE, stream_ptr ) transport.close_imported_handle(mapped) end end |
#to_s ⇒ String
Returns Human-readable description.
371 372 373 374 |
# File 'lib/nvruby/collective/communicator.rb', line 371 def to_s status = @initialized ? "ready" : "uninitialized" "Communicator[#{@gpu_ids.size} GPUs, #{status}]" end |
#topology ⇒ Topology::Matrix
Get the topology matrix
352 353 354 |
# File 'lib/nvruby/collective/communicator.rb', line 352 def topology @device_manager.topology&.matrix end |