Class: Ignis::Collective::Algorithms::Pipeliner
- Inherits:
-
Object
- Object
- Ignis::Collective::Algorithms::Pipeliner
- Defined in:
- lib/nvruby/collective/algorithms/pipeliner.rb
Overview
Message Pipelining for Overlapping Communication and Computation
Splits large messages into chunks that can be processed concurrently:
-
While GPU processes chunk N, transfer chunk N+1
-
Hides transfer latency behind compute
Optimal chunk size balances:
-
Too small: overhead dominates
-
Too large: no overlap opportunity
Defined Under Namespace
Classes: PipelineStage
Constant Summary collapse
- STATE_PENDING =
Pipeline states
:pending- STATE_TRANSFERRING =
:transferring- STATE_COMPUTING =
:computing- STATE_COMPLETE =
:complete- DEFAULT_CHUNK_SIZE =
Default chunk size: 512KB (good for PCIe 4.0)
512 * 1024
Instance Attribute Summary collapse
-
#chunk_size ⇒ Integer
readonly
Chunk size in bytes.
-
#num_stages ⇒ Integer
readonly
Number of pipeline stages (double-buffering = 2).
Class Method Summary collapse
-
.optimal_chunk_size(bandwidth_gbps:, compute_gflops:, flops_per_element: 1.0, element_size: 4) ⇒ Integer
Calculate optimal chunk size based on bandwidth and compute speed.
Instance Method Summary collapse
-
#execute_pipelined(buffers:, total_size:, dtype:, op:, transfer_fn:, reduce_fn:, streams: nil) ⇒ void
Execute a pipelined reduction operation.
-
#execute_pipelined_all_reduce(ring:, buffers:, total_size:, dtype:, op:, gpu_streams: nil) ⇒ void
Execute a pipelined AllReduce with overlap.
-
#initialize(chunk_size: DEFAULT_CHUNK_SIZE, num_stages: 2) ⇒ Pipeliner
constructor
A new instance of Pipeliner.
Constructor Details
#initialize(chunk_size: DEFAULT_CHUNK_SIZE, num_stages: 2) ⇒ Pipeliner
Returns a new instance of Pipeliner.
36 37 38 39 40 |
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 36 def initialize(chunk_size: DEFAULT_CHUNK_SIZE, num_stages: 2) @chunk_size = chunk_size @num_stages = num_stages @streams = {} end |
Instance Attribute Details
#chunk_size ⇒ Integer (readonly)
Returns Chunk size in bytes.
29 30 31 |
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 29 def chunk_size @chunk_size end |
#num_stages ⇒ Integer (readonly)
Returns Number of pipeline stages (double-buffering = 2).
32 33 34 |
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 32 def num_stages @num_stages end |
Class Method Details
.optimal_chunk_size(bandwidth_gbps:, compute_gflops:, flops_per_element: 1.0, element_size: 4) ⇒ Integer
Calculate optimal chunk size based on bandwidth and compute speed
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 49 def self.optimal_chunk_size(bandwidth_gbps:, compute_gflops:, flops_per_element: 1.0, element_size: 4) # Balance: transfer_time = compute_time # chunk_size / bandwidth = (chunk_size / element_size) * flops_per_element / compute_throughput # Convert GB/s to bytes/s bandwidth_bps = bandwidth_gbps * 1e9 # Convert GFLOPS to FLOPS compute_flops = compute_gflops * 1e9 # chunk_size / bandwidth = (chunk_size * flops_per_element) / (element_size * compute_flops) # Solving: chunk_size = bandwidth * element_size * compute_flops / (compute_flops + bandwidth * flops_per_element) # Simplified: aim for transfer time ~ compute time # chunk ~= bandwidth * target_latency_seconds target_latency = 0.001 # 1ms pipeline stage optimal = (bandwidth_bps * target_latency).to_i # Clamp to reasonable range [[optimal, 64 * 1024].max, 4 * 1024 * 1024].min end |
Instance Method Details
#execute_pipelined(buffers:, total_size:, dtype:, op:, transfer_fn:, reduce_fn:, streams: nil) ⇒ void
This method returns an undefined value.
Execute a pipelined reduction operation
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 80 def execute_pipelined(buffers:, total_size:, dtype:, op:, transfer_fn:, reduce_fn:, streams: nil) # Calculate chunks n_chunks = (total_size + @chunk_size - 1) / @chunk_size # Create streams if not provided streams ||= create_streams(@num_stages, buffers[0]) # Initialize pipeline stages stages = n_chunks.times.map do |i| offset = i * @chunk_size size = [total_size - offset, @chunk_size].min PipelineStage.new(chunk_id: i, offset: offset, size: size, state: STATE_PENDING) end # Pipeline execution active_stages = [] pending_stages = stages.dup while pending_stages.any? || active_stages.any? # Start new transfers up to num_stages while active_stages.size < @num_stages && pending_stages.any? stage = pending_stages.shift stream = streams[stage.chunk_id % @num_stages] # Start transfer transfer_fn.call(stage.offset, stage.size, stream) stage.state = STATE_TRANSFERRING active_stages << stage end # Check for completed transfers, start compute active_stages.each do |stage| if stage.state == STATE_TRANSFERRING stream = streams[stage.chunk_id % @num_stages] # Sync to ensure transfer complete sync_stream(stream) # Start compute reduce_fn.call(stage.offset, stage.size, stream) stage.state = STATE_COMPUTING end end # Check for completed compute active_stages.reject! do |stage| if stage.state == STATE_COMPUTING stream = streams[stage.chunk_id % @num_stages] sync_stream(stream) stage.state = STATE_COMPLETE true else false end end end end |
#execute_pipelined_all_reduce(ring:, buffers:, total_size:, dtype:, op:, gpu_streams: nil) ⇒ void
This method returns an undefined value.
Execute a pipelined AllReduce with overlap
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 147 def execute_pipelined_all_reduce(ring:, buffers:, total_size:, dtype:, op:, gpu_streams: nil) n_gpus = ring.n_gpus n_chunks = (total_size + @chunk_size - 1) / @chunk_size # Create streams: 2 per GPU for double-buffering gpu_streams ||= n_gpus.times.map do |rank| CUDA::RuntimeAPI.cudaSetDevice(ring.ring_order[rank]) create_streams_for_device(@num_stages) end # Process in pipeline fashion n_chunks.times do |chunk_id| chunk_offset = chunk_id * @chunk_size chunk_size = [total_size - chunk_offset, @chunk_size].min stage_idx = chunk_id % @num_stages # Get streams for this stage stage_streams = gpu_streams.map { |streams| streams[stage_idx] } # Create offset buffers chunk_buffers = buffers.map { |buf| ptr_offset(buf, chunk_offset) } # Execute ring all-reduce on this chunk ring.all_reduce( buffers: chunk_buffers, sizes: [chunk_size] * n_gpus, dtype: dtype, op: op, streams: stage_streams ) end # Final sync gpu_streams.flatten.each { |s| sync_stream(s) } end |