Module: Ignis::Collective::ArrayOps
- Defined in:
- lib/nvruby/collective/array_ops.rb
Overview
NvArray Integration for Collective Operations
Provides tensor-aware wrappers around collective primitives that work directly with NvArray tensors.
Class Method Summary collapse
-
.all_gather(tensors, comm:, stream: nil) ⇒ Array<NvArray>
AllGather - each GPU contributes, all receive all.
-
.all_reduce(tensors, op: :sum, comm:, stream: nil) ⇒ Array<NvArray>
AllReduce on NvArray tensors.
-
.broadcast(tensor, root: 0, comm:, stream: nil) ⇒ Array<NvArray>
Broadcast NvArray from root to all GPUs.
-
.reduce(tensors, root: 0, op: :sum, comm:, stream: nil) ⇒ NvArray
Reduce to root GPU.
-
.reduce_scatter(tensors, op: :sum, comm:, stream: nil) ⇒ Array<NvArray>
ReduceScatter - reduce then scatter.
-
.scatter(tensor, root: 0, comm:, stream: nil) ⇒ Array<NvArray>
Distribute tensor across GPUs (scatter).
Class Method Details
.all_gather(tensors, comm:, stream: nil) ⇒ Array<NvArray>
AllGather - each GPU contributes, all receive all
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/nvruby/collective/array_ops.rb', line 75 def all_gather(tensors, comm:, stream: nil) validate_tensors!(tensors) # Calculate total size after gathering n_gpus = tensors.size chunk_shape = tensors[0].shape.dup gathered_shape = chunk_shape.dup gathered_shape[0] = chunk_shape[0] * n_gpus # Concatenate along first axis gathered_pointers = comm.all_gather(tensors, stream: stream) # Wrap results in NvArrays gathered_pointers.map.with_index do |ptr_or_arr, i| if ptr_or_arr.is_a?(Array) # Already NvArrays ptr_or_arr[i] else create_nvarray_from_ptr( ptr: ptr_or_arr, shape: gathered_shape, dtype: tensors[0].dtype, device: comm.gpu_ids[i] ) end end end |
.all_reduce(tensors, op: :sum, comm:, stream: nil) ⇒ Array<NvArray>
AllReduce on NvArray tensors
18 19 20 21 22 23 |
# File 'lib/nvruby/collective/array_ops.rb', line 18 def all_reduce(tensors, op: :sum, comm:, stream: nil) validate_tensors!(tensors) validate_devices!(tensors, comm) comm.all_reduce(tensors, op: op, stream: stream) end |
.broadcast(tensor, root: 0, comm:, stream: nil) ⇒ Array<NvArray>
Broadcast NvArray from root to all GPUs
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/nvruby/collective/array_ops.rb', line 32 def broadcast(tensor, root: 0, comm:, stream: nil) validate_single_tensor!(tensor) # Comm.broadcast returns FFI::Pointers, wrap in NvArrays pointers = comm.broadcast(tensor, root: root, stream: stream) pointers.map.with_index do |ptr, i| if i == root tensor else create_nvarray_from_ptr( ptr: ptr, shape: tensor.shape, dtype: tensor.dtype, device: comm.gpu_ids[i] ) end end end |
.reduce(tensors, root: 0, op: :sum, comm:, stream: nil) ⇒ NvArray
Reduce to root GPU
60 61 62 63 64 65 66 67 |
# File 'lib/nvruby/collective/array_ops.rb', line 60 def reduce(tensors, root: 0, op: :sum, comm:, stream: nil) validate_tensors!(tensors) result = comm.reduce(tensors, root: root, op: op, stream: stream) # result is already the root tensor or pointer result.is_a?(NvArray) ? result : tensors[root] end |
.reduce_scatter(tensors, op: :sum, comm:, stream: nil) ⇒ Array<NvArray>
ReduceScatter - reduce then scatter
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/nvruby/collective/array_ops.rb', line 109 def reduce_scatter(tensors, op: :sum, comm:, stream: nil) validate_tensors!(tensors) n_gpus = tensors.size full_shape = tensors[0].shape.dup chunk_shape = full_shape.dup chunk_shape[0] = full_shape[0] / n_gpus chunk_pointers = comm.reduce_scatter(tensors, op: op, stream: stream) chunk_pointers.map.with_index do |ptr, i| create_nvarray_from_ptr( ptr: ptr, shape: chunk_shape, dtype: tensors[0].dtype, device: comm.gpu_ids[i] ) end end |
.scatter(tensor, root: 0, comm:, stream: nil) ⇒ Array<NvArray>
Distribute tensor across GPUs (scatter)
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/nvruby/collective/array_ops.rb', line 136 def scatter(tensor, root: 0, comm:, stream: nil) validate_single_tensor!(tensor) n_gpus = comm.gpu_ids.size full_shape = tensor.shape.dup chunk_shape = full_shape.dup chunk_shape[0] = full_shape[0] / n_gpus chunk_size = tensor.nbytes / n_gpus chunks = [] n_gpus.times do |i| if i == root # Root keeps first chunk chunk_ptr = tensor.device_ptr chunks << create_nvarray_from_ptr( ptr: chunk_ptr, shape: chunk_shape, dtype: tensor.dtype, device: comm.gpu_ids[i] ) else # Allocate and send chunk chunk_ptr = allocate_on_device(comm.gpu_ids[i], chunk_size) offset = i * chunk_size src_ptr = FFI::Pointer.new(:uint8, tensor.device_ptr.address + offset) comm.send_recv( src_ptr, src_rank: root, dst_buffer: chunk_ptr, dst_rank: i, size: chunk_size, stream: stream ) chunks << create_nvarray_from_ptr( ptr: chunk_ptr, shape: chunk_shape, dtype: tensor.dtype, device: comm.gpu_ids[i] ) end end chunks end |