Class: Ignis::Collective::Algorithms::Ring
- Inherits:
-
Object
- Object
- Ignis::Collective::Algorithms::Ring
- Defined in:
- lib/nvruby/collective/algorithms/ring.rb
Overview
Ring AllReduce algorithm implementation
The Ring algorithm performs AllReduce in 2*(N-1) steps where N is number of GPUs:
-
Scatter-Reduce phase: N-1 steps, each GPU sends a chunk and receives+reduces another
-
AllGather phase: N-1 steps, each GPU sends its reduced chunk and receives another
Bandwidth complexity: 2 * (N-1)/N * data_size (asymptotically optimal) Latency complexity: 2 * (N-1) * alpha (linear in N)
Best for: Large messages (>1MB) where bandwidth dominates latency
Defined Under Namespace
Classes: ChunkInfo
Instance Attribute Summary collapse
-
#n_gpus ⇒ Integer
readonly
Number of participants.
-
#ring_order ⇒ Array<Integer>
readonly
Ring order (GPU IDs in ring sequence).
-
#transport_selector ⇒ TransportSelector
readonly
Transport selector for GPU pairs.
Instance Method Summary collapse
-
#all_gather_standalone(send_buffers:, recv_buffers:, send_sizes:, streams:) ⇒ void
Perform Ring AllGather - gather all chunks to all GPUs.
-
#all_reduce(buffers:, sizes:, dtype:, op:, streams:) ⇒ void
Perform Ring AllReduce.
-
#calculate_chunk_size(total_size) ⇒ Integer
Largest chunk size in bytes (ceil division) — used by callers only for allocating result buffers big enough to hold any single chunk.
-
#chunk_layout(total_bytes, elem_size) ⇒ Array<Array(Integer,Integer,Integer,Integer)>
Even element-wise chunk layout (NCCL-style).
-
#initialize(ring_order:, transport_selector:) ⇒ Ring
constructor
A new instance of Ring.
-
#reduce_scatter(buffers:, result_buffers:, sizes:, dtype:, op:, streams:) ⇒ void
Perform Ring ReduceScatter - reduce and scatter result.
-
#scatter_reduce_only(buffers:, sizes:, dtype:, op:, streams:) ⇒ Object
Perform only the scatter-reduce phase (for testing/benchmarking).
Constructor Details
#initialize(ring_order:, transport_selector:) ⇒ Ring
Returns a new instance of Ring.
33 34 35 36 37 38 |
# File 'lib/nvruby/collective/algorithms/ring.rb', line 33 def initialize(ring_order:, transport_selector:) @ring_order = ring_order.dup.freeze @n_gpus = ring_order.size @transport_selector = transport_selector @chunk_counts = {} end |
Instance Attribute Details
#n_gpus ⇒ Integer (readonly)
Returns Number of participants.
26 27 28 |
# File 'lib/nvruby/collective/algorithms/ring.rb', line 26 def n_gpus @n_gpus end |
#ring_order ⇒ Array<Integer> (readonly)
Returns Ring order (GPU IDs in ring sequence).
23 24 25 |
# File 'lib/nvruby/collective/algorithms/ring.rb', line 23 def ring_order @ring_order end |
#transport_selector ⇒ TransportSelector (readonly)
Returns Transport selector for GPU pairs.
29 30 31 |
# File 'lib/nvruby/collective/algorithms/ring.rb', line 29 def transport_selector @transport_selector end |
Instance Method Details
#all_gather_standalone(send_buffers:, recv_buffers:, send_sizes:, streams:) ⇒ void
This method returns an undefined value.
Perform Ring AllGather - gather all chunks to all GPUs
Each GPU starts with a chunk of data. After AllGather, each GPU has all chunks from all GPUs concatenated.
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/nvruby/collective/algorithms/ring.rb', line 94 def all_gather_standalone(send_buffers:, recv_buffers:, send_sizes:, streams:) validate_inputs_gather!(send_buffers, recv_buffers, streams) return if @n_gpus == 1 chunk_size = send_sizes[0] # Copy each GPU's local chunk to its position in the result buffer @n_gpus.times do |rank| gpu_id = @ring_order[rank] CUDA::RuntimeAPI.cudaSetDevice(gpu_id) src_offset = 0 dst_offset = rank * chunk_size stream_ptr = get_stream_ptr(streams[rank]) # Copy local data to correct position CUDA::RuntimeAPI.cudaMemcpyAsync( ptr_offset(recv_buffers[rank], dst_offset), send_buffers[rank], chunk_size, CUDA::RuntimeAPI::MEMCPY_DEVICE_TO_DEVICE, stream_ptr ) end synchronize_all_streams!(streams) # N-1 ring steps to propagate all chunks (@n_gpus - 1).times do |step| @n_gpus.times do |rank| gpu_id = @ring_order[rank] # Calculate which chunk to send (the one we just received) send_chunk_id = (rank - step + @n_gpus) % @n_gpus send_offset = send_chunk_id * chunk_size next_rank = (rank + 1) % @n_gpus next_gpu = @ring_order[next_rank] transport = @transport_selector.select_transport(gpu_id, next_gpu) stream_ptr = get_stream_ptr(streams[rank]) src_ptr = ptr_offset(recv_buffers[rank], send_offset) dst_ptr = ptr_offset(recv_buffers[next_rank], send_offset) move!(transport, dst_ptr, src_ptr, chunk_size, stream_ptr) end synchronize_all_streams!(streams) end end |
#all_reduce(buffers:, sizes:, dtype:, op:, streams:) ⇒ void
This method returns an undefined value.
Perform Ring AllReduce
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/nvruby/collective/algorithms/ring.rb', line 48 def all_reduce(buffers:, sizes:, dtype:, op:, streams:) validate_inputs!(buffers, sizes, streams) return if @n_gpus == 1 # Single GPU - no-op # Even element-wise chunk layout (handles non-divisible sizes without # overrunning the buffer on the last chunk). total_size = sizes[0] layout = chunk_layout(total_size, dtype_elem_size(dtype)) # Allocate temp buffers for receive recv_buffers = allocate_recv_buffers(total_size) begin # Phase 1: Scatter-Reduce scatter_reduce!(buffers, recv_buffers, layout, dtype, op, streams) # Phase 2: AllGather all_gather!(buffers, recv_buffers, layout, streams) ensure free_recv_buffers(recv_buffers) end end |
#calculate_chunk_size(total_size) ⇒ Integer
Largest chunk size in bytes (ceil division) — used by callers only for allocating result buffers big enough to hold any single chunk.
205 206 207 |
# File 'lib/nvruby/collective/algorithms/ring.rb', line 205 def calculate_chunk_size(total_size) (total_size + @n_gpus - 1) / @n_gpus end |
#chunk_layout(total_bytes, elem_size) ⇒ Array<Array(Integer,Integer,Integer,Integer)>
Even element-wise chunk layout (NCCL-style). Distributes elements as evenly as possible across the N chunks so they tile the whole buffer with NO overrun even when (n_elements % N) != 0 — the previous ceil-rounded byte chunking read/wrote past the buffer on the last chunk.
215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/nvruby/collective/algorithms/ring.rb', line 215 def chunk_layout(total_bytes, elem_size) total_elems = total_bytes / elem_size base = total_elems / @n_gpus rem = total_elems % @n_gpus off_e = 0 Array.new(@n_gpus) do |k| n_e = base + (k < rem ? 1 : 0) entry = [off_e * elem_size, n_e * elem_size, off_e, n_e] off_e += n_e entry end end |
#reduce_scatter(buffers:, result_buffers:, sizes:, dtype:, op:, streams:) ⇒ void
This method returns an undefined value.
Perform Ring ReduceScatter - reduce and scatter result
Each GPU starts with a full buffer. After ReduceScatter, each GPU has 1/N of the reduced result (different chunks on different GPUs).
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/nvruby/collective/algorithms/ring.rb', line 159 def reduce_scatter(buffers:, result_buffers:, sizes:, dtype:, op:, streams:) validate_inputs!(buffers, sizes, streams) return if @n_gpus == 1 total_size = sizes[0] layout = chunk_layout(total_size, dtype_elem_size(dtype)) # Allocate temp buffers temp_buffers = allocate_recv_buffers(total_size) begin # Scatter-Reduce phase only (same as first half of AllReduce) scatter_reduce!(buffers, temp_buffers, layout, dtype, op, streams) # Copy each GPU's final chunk to result buffer @n_gpus.times do |rank| gpu_id = @ring_order[rank] CUDA::RuntimeAPI.cudaSetDevice(gpu_id) # After scatter-reduce, GPU[rank] has fully reduced chunk[(rank+1) % N] final_chunk_id = (rank + 1) % @n_gpus src_offset, n_bytes, = layout[final_chunk_id] next if n_bytes.zero? stream_ptr = get_stream_ptr(streams[rank]) CUDA::RuntimeAPI.cudaMemcpyAsync( result_buffers[rank], ptr_offset(buffers[rank], src_offset), n_bytes, CUDA::RuntimeAPI::MEMCPY_DEVICE_TO_DEVICE, stream_ptr ) end synchronize_all_streams!(streams) ensure free_recv_buffers(temp_buffers) end end |
#scatter_reduce_only(buffers:, sizes:, dtype:, op:, streams:) ⇒ Object
Perform only the scatter-reduce phase (for testing/benchmarking)
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/nvruby/collective/algorithms/ring.rb', line 73 def scatter_reduce_only(buffers:, sizes:, dtype:, op:, streams:) layout = chunk_layout(sizes[0], dtype_elem_size(dtype)) recv_buffers = allocate_recv_buffers(sizes[0]) begin scatter_reduce!(buffers, recv_buffers, layout, dtype, op, streams) ensure free_recv_buffers(recv_buffers) end end |