Class: Ignis::Collective::Communicator

Inherits:
Object
  • Object
show all
Defined in:
lib/nvruby/collective/communicator.rb

Overview

Primary user-facing abstraction for collective operations Provides AllReduce, Broadcast, Reduce, and other collective primitives

Constant Summary collapse

REDUCTION_OPS =

Reduction operations

[:sum, :prod, :min, :max, :avg].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(gpu_ids:, rank: 0, world_size: 1) ⇒ Communicator

Create a new communicator for the specified GPUs

Parameters:

  • gpu_ids (Array<Integer>)

    GPU device IDs to include

  • rank (Integer) (defaults to: 0)

    Rank of this process (default 0 for single-process)

  • world_size (Integer) (defaults to: 1)

    Total ranks (default 1 for single-process)



37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/nvruby/collective/communicator.rb', line 37

def initialize(gpu_ids:, rank: 0, world_size: 1)
  @gpu_ids = gpu_ids.dup.freeze
  @rank = rank
  @world_size = world_size

  validate_gpu_ids!

  @device_manager = DeviceManager.new(device_ids: @gpu_ids)
  @transport_selector = TransportSelector.new(@gpu_ids)
  @ring_order = nil
  @initialized = false
end

Instance Attribute Details

#device_managerDeviceManager (readonly)

Returns Device manager.

Returns:



22
23
24
# File 'lib/nvruby/collective/communicator.rb', line 22

def device_manager
  @device_manager
end

#gpu_idsArray<Integer> (readonly)

Returns GPU device IDs in this communicator.

Returns:

  • (Array<Integer>)

    GPU device IDs in this communicator



19
20
21
# File 'lib/nvruby/collective/communicator.rb', line 19

def gpu_ids
  @gpu_ids
end

#rankInteger (readonly)

Returns Rank of this communicator (for multi-process).

Returns:

  • (Integer)

    Rank of this communicator (for multi-process)



28
29
30
# File 'lib/nvruby/collective/communicator.rb', line 28

def rank
  @rank
end

#transport_selectorTransportSelector (readonly)

Returns Transport selector.

Returns:



25
26
27
# File 'lib/nvruby/collective/communicator.rb', line 25

def transport_selector
  @transport_selector
end

#world_sizeInteger (readonly)

Returns Total number of ranks.

Returns:

  • (Integer)

    Total number of ranks



31
32
33
# File 'lib/nvruby/collective/communicator.rb', line 31

def world_size
  @world_size
end

Instance Method Details

#all_gather(tensors, stream: nil) ⇒ Array<Array<NvArray>>

AllGather - gather tensors from all GPUs to all GPUs

Parameters:

  • tensors (Array<NvArray>)

    One tensor per GPU (each may be different size)

  • stream (CUDA::Stream, nil) (defaults to: nil)

    Optional CUDA stream

Returns:

  • (Array<Array<NvArray>>)

    Gathered tensors on each GPU



130
131
132
133
134
135
136
137
138
# File 'lib/nvruby/collective/communicator.rb', line 130

def all_gather(tensors, stream: nil)
  validate_tensors!(tensors)
  ensure_initialized!

  return [tensors] if @gpu_ids.size == 1

  # TODO: Implement ring all-gather
  simple_all_gather(tensors, stream)
end

#all_reduce(tensors, op: :sum, stream: nil) ⇒ Array<NvArray>

Perform AllReduce operation - reduce and distribute result to all GPUs

Parameters:

  • tensors (Array<NvArray>)

    One tensor per GPU

  • op (Symbol) (defaults to: :sum)

    Reduction operation (:sum, :prod, :min, :max)

  • stream (CUDA::Stream, nil) (defaults to: nil)

    Optional CUDA stream

Returns:

  • (Array<NvArray>)

    Reduced tensors (same references as input)



69
70
71
72
73
74
75
76
77
78
79
# File 'lib/nvruby/collective/communicator.rb', line 69

def all_reduce(tensors, op: :sum, stream: nil)
  validate_operation!(op)
  validate_tensors!(tensors)
  ensure_initialized!

  # Single GPU case - no-op
  return tensors if @gpu_ids.size == 1

  # Use Ring AllReduce for multi-GPU
  ring_all_reduce(tensors, op, stream)
end

#all_reduce_async(tensors, op: :sum, stream:) ⇒ Array<NvArray>

Async AllReduce - requires explicit synchronization

Parameters:

  • tensors (Array<NvArray>)

    One tensor per GPU

  • op (Symbol) (defaults to: :sum)

    Reduction operation

  • stream (CUDA::Stream)

    CUDA stream for async execution

Returns:

  • (Array<NvArray>)

    Tensors (result available after sync)

Raises:

  • (ArgumentError)


86
87
88
89
90
# File 'lib/nvruby/collective/communicator.rb', line 86

def all_reduce_async(tensors, op: :sum, stream:)
  raise ArgumentError, "Stream required for async operation" unless stream

  all_reduce(tensors, op: op, stream: stream)
end

#all_to_all(send_buffers, recv_buffers, chunk_size:, stream: nil) ⇒ void

This method returns an undefined value.

AllToAll - full exchange between all GPUs Each GPU sends N chunks (one to each GPU) and receives N chunks (one from each GPU)

Parameters:

  • send_buffers (Array<Array<FFI::Pointer>>)

    N×N array: send_buffers[dst]

  • recv_buffers (Array<Array<FFI::Pointer>>)

    N×N array: recv_buffers[src]

  • chunk_size (Integer)

    Size of each chunk in bytes

  • stream (CUDA::Stream, nil) (defaults to: nil)

    Optional CUDA stream



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# File 'lib/nvruby/collective/communicator.rb', line 196

def all_to_all(send_buffers, recv_buffers, chunk_size:, stream: nil)
  ensure_initialized!

  n = @gpu_ids.size
  return if n == 1

  streams = stream ? [stream] * n : create_null_streams(n)

  # Phase 1: Copy local data (GPU[i] → GPU[i])
  n.times do |rank|
    gpu_id = @gpu_ids[rank]
    CUDA::RuntimeAPI.cudaSetDevice(gpu_id)
    stream_ptr = get_stream_ptr(streams[rank])

    CUDA::RuntimeAPI.cudaMemcpyAsync(
      recv_buffers[rank][rank],
      send_buffers[rank][rank],
      chunk_size,
      CUDA::RuntimeAPI::MEMCPY_DEVICE_TO_DEVICE,
      stream_ptr
    )
  end

  # Phase 2: N-1 rounds of pairwise exchange
  (n - 1).times do |round|
    n.times do |rank|
      gpu_id = @gpu_ids[rank]
      
      # Calculate partner for this round (rotation pattern)
      partner = (rank + round + 1) % n
      partner_gpu = @gpu_ids[partner]

      stream_ptr = get_stream_ptr(streams[rank])

      # Send to partner
      transport = @transport_selector.select_transport(gpu_id, partner_gpu)
      
      if transport.is_a?(Transport::P2PTransport)
        transport.copy_async(
          recv_buffers[partner][rank],  # Partner receives from me
          send_buffers[rank][partner],  # I send to partner
          chunk_size,
          stream_ptr
        )
      end
    end

    # Synchronize after each round
    synchronize_all_streams!(streams)
  end
end

#barriervoid

This method returns an undefined value.

Barrier synchronization across all GPUs



337
338
339
340
# File 'lib/nvruby/collective/communicator.rb', line 337

def barrier
  ensure_initialized!
  @device_manager.synchronize_all!
end

#broadcast(tensor, root: 0, stream: nil) ⇒ Array<NvArray>

Broadcast tensor from root GPU to all GPUs

Parameters:

  • tensor (NvArray)

    Source tensor on root GPU

  • root (Integer) (defaults to: 0)

    Root GPU index (default 0)

  • stream (CUDA::Stream, nil) (defaults to: nil)

    Optional CUDA stream

Returns:

  • (Array<NvArray>)

    Tensors on all GPUs with broadcasted data



97
98
99
100
101
102
103
104
105
106
# File 'lib/nvruby/collective/communicator.rb', line 97

def broadcast(tensor, root: 0, stream: nil)
  ensure_initialized!
  validate_gpu_index!(root)

  return [tensor] if @gpu_ids.size == 1

  # TODO: Implement tree broadcast algorithm
  # For now, use simple fan-out from root
  simple_broadcast(tensor, root, stream)
end

#destroy!void

This method returns an undefined value.

Clean up all resources



364
365
366
367
368
# File 'lib/nvruby/collective/communicator.rb', line 364

def destroy!
  @transport_selector.destroy!
  @device_manager.destroy!
  @initialized = false
end

#initialize!self

Initialize the communicator (detect topology, enable P2P, etc.)

Returns:

  • (self)


52
53
54
55
56
57
58
59
60
61
62
# File 'lib/nvruby/collective/communicator.rb', line 52

def initialize!
  return self if @initialized

  @device_manager.initialize!
  @device_manager.enable_all_p2p_access!
  @transport_selector.initialize!
  @ring_order = @transport_selector.optimal_ring_order

  @initialized = true
  self
end

#inspectString

Returns Detailed inspection.

Returns:

  • (String)

    Detailed inspection



377
378
379
380
381
382
# File 'lib/nvruby/collective/communicator.rb', line 377

def inspect
  "#<Ignis::Collective::Communicator " \
    "gpu_ids=#{@gpu_ids} " \
    "rank=#{@rank}/#{@world_size} " \
    "initialized=#{@initialized}>"
end

#performance_summaryHash

Get performance summary

Returns:

  • (Hash)

    Performance statistics



358
359
360
# File 'lib/nvruby/collective/communicator.rb', line 358

def performance_summary
  @transport_selector.performance_summary
end

#ready?Boolean

Check if communicator is ready

Returns:

  • (Boolean)

    True if initialized



344
345
346
347
348
# File 'lib/nvruby/collective/communicator.rb', line 344

def ready?
  @initialized &&
    @device_manager.ready? &&
    @transport_selector.ready?
end

#recv(buffer, src_rank:, size:, stream: nil) ⇒ void

This method returns an undefined value.

Point-to-point receive (no-op, actual receive happens in send_recv)

Parameters:

  • buffer (FFI::Pointer)

    Buffer to receive into

  • src_rank (Integer)

    Source rank

  • size (Integer)

    Expected size in bytes

  • stream (CUDA::Stream, nil) (defaults to: nil)

    Optional CUDA stream



327
328
329
330
331
332
333
# File 'lib/nvruby/collective/communicator.rb', line 327

def recv(buffer, src_rank:, size:, stream: nil)
  ensure_initialized!
  validate_gpu_index!(src_rank)
  # Actual data transfer happens via send_recv from sender side
  # This just marks the receive buffer as ready
  barrier
end

#reduce(tensors, root: 0, op: :sum, stream: nil) ⇒ NvArray

Reduce tensors to root GPU

Parameters:

  • tensors (Array<NvArray>)

    One tensor per GPU

  • root (Integer) (defaults to: 0)

    Root GPU index

  • op (Symbol) (defaults to: :sum)

    Reduction operation

  • stream (CUDA::Stream, nil) (defaults to: nil)

    Optional CUDA stream

Returns:

  • (NvArray)

    Reduced tensor on root GPU



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/nvruby/collective/communicator.rb', line 114

def reduce(tensors, root: 0, op: :sum, stream: nil)
  validate_operation!(op)
  validate_tensors!(tensors)
  ensure_initialized!
  validate_gpu_index!(root)

  return tensors[0] if @gpu_ids.size == 1

  # TODO: Implement tree reduce algorithm
  simple_reduce(tensors, root, op, stream)
end

#reduce_scatter(tensors, op: :sum, stream: nil) ⇒ Array<FFI::Pointer>

ReduceScatter - reduce and scatter result

Parameters:

  • tensors (Array<NvArray>)

    One tensor per GPU

  • op (Symbol) (defaults to: :sum)

    Reduction operation

  • stream (CUDA::Stream, nil) (defaults to: nil)

    Optional CUDA stream

Returns:

  • (Array<FFI::Pointer>)

    Scattered reduced chunks (chunk size = total_size / N)



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/nvruby/collective/communicator.rb', line 145

def reduce_scatter(tensors, op: :sum, stream: nil)
  validate_operation!(op)
  validate_tensors!(tensors)
  ensure_initialized!

  return tensors if @gpu_ids.size == 1

  ring = Algorithms::Ring.new(
    ring_order: @ring_order,
    transport_selector: @transport_selector
  )

  buffers = tensors.map { |t| device_buffer(t) }
  sizes = tensors.map { |t| byte_size_of(t) }

  dtype = if tensors[0].respond_to?(:dtype)
            tensors[0].dtype
          else
            :float32
          end

  # Calculate chunk size
  total_size = sizes[0]
  chunk_size = ring.calculate_chunk_size(total_size)

  # Allocate result buffers
  result_buffers = @gpu_ids.map do |gpu_id|
    allocate_buffer_on_device(gpu_id, chunk_size)
  end

  streams = stream ? [stream] * @gpu_ids.size : create_null_streams(@gpu_ids.size)

  ring.reduce_scatter(
    buffers: buffers,
    result_buffers: result_buffers,
    sizes: sizes,
    dtype: dtype,
    op: op,
    streams: streams
  )

  result_buffers
end

#send(tensor, dest_rank:, size: nil, stream: nil) ⇒ void

This method returns an undefined value.

Point-to-point send from current rank to destination

Parameters:

  • tensor (NvArray, FFI::Pointer)

    Data to send

  • dest_rank (Integer)

    Destination rank (index in gpu_ids)

  • size (Integer, nil) (defaults to: nil)

    Size in bytes (inferred from tensor if nil)

  • stream (CUDA::Stream, nil) (defaults to: nil)

    Optional CUDA stream



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/nvruby/collective/communicator.rb', line 254

def send(tensor, dest_rank:, size: nil, stream: nil)
  ensure_initialized!
  validate_gpu_index!(dest_rank)

  src_rank = 0  # Default sender is rank 0
  src_gpu = @gpu_ids[src_rank]
  dst_gpu = @gpu_ids[dest_rank]

  return if src_rank == dest_rank

  buffer = device_buffer(tensor)
  byte_size = size || byte_size_of(tensor)

  transport = @transport_selector.select_transport(src_gpu, dst_gpu)
  stream_ptr = stream ? get_stream_ptr(stream) : FFI::Pointer::NULL

  if transport.is_a?(Transport::P2PTransport)
    # P2P copy requires destination buffer
    # Assumes tensor has been pre-allocated on dest
    raise ArgumentError, "P2P send requires pre-allocated recv buffer on dest"
  end
end

#send_recv(buffer, src_rank:, dst_buffer:, dst_rank:, size:, stream: nil) ⇒ void

This method returns an undefined value.

Point-to-point send from specific source rank

Parameters:

  • buffer (FFI::Pointer)

    Source buffer on src_rank GPU

  • src_rank (Integer)

    Source rank

  • dst_buffer (FFI::Pointer)

    Destination buffer on dst_rank GPU

  • dst_rank (Integer)

    Destination rank

  • size (Integer)

    Size in bytes

  • stream (CUDA::Stream, nil) (defaults to: nil)

    Optional CUDA stream



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/nvruby/collective/communicator.rb', line 285

def send_recv(buffer, src_rank:, dst_buffer:, dst_rank:, size:, stream: nil)
  ensure_initialized!
  validate_gpu_index!(src_rank)
  validate_gpu_index!(dst_rank)

  return if src_rank == dst_rank

  src_gpu = @gpu_ids[src_rank]
  dst_gpu = @gpu_ids[dst_rank]

  transport = @transport_selector.select_transport(src_gpu, dst_gpu)
  stream_ptr = stream ? get_stream_ptr(stream) : FFI::Pointer::NULL

  if transport.is_a?(Transport::P2PTransport)
    # Set source device context
    CUDA::RuntimeAPI.cudaSetDevice(src_gpu)
    transport.copy_async(dst_buffer, buffer, size, stream_ptr)
  elsif transport.is_a?(Transport::IPCTransport)
    # For IPC, export/import handles
    handle = transport.export_handle(buffer)
    CUDA::RuntimeAPI.cudaSetDevice(dst_gpu)
    mapped = transport.import_handle(handle)
    
    # Copy from mapped to destination
    CUDA::RuntimeAPI.cudaMemcpyAsync(
      dst_buffer,
      mapped,
      size,
      CUDA::RuntimeAPI::MEMCPY_DEVICE_TO_DEVICE,
      stream_ptr
    )
    
    transport.close_imported_handle(mapped)
  end
end

#to_sString

Returns Human-readable description.

Returns:

  • (String)

    Human-readable description



371
372
373
374
# File 'lib/nvruby/collective/communicator.rb', line 371

def to_s
  status = @initialized ? "ready" : "uninitialized"
  "Communicator[#{@gpu_ids.size} GPUs, #{status}]"
end

#topologyTopology::Matrix

Get the topology matrix

Returns:



352
353
354
# File 'lib/nvruby/collective/communicator.rb', line 352

def topology
  @device_manager.topology&.matrix
end