Class: Ignis::Collective::Algorithms::Pipeliner

Inherits:
Object
  • Object
show all
Defined in:
lib/nvruby/collective/algorithms/pipeliner.rb

Overview

Message Pipelining for Overlapping Communication and Computation

Splits large messages into chunks that can be processed concurrently:

  • While GPU processes chunk N, transfer chunk N+1

  • Hides transfer latency behind compute

Optimal chunk size balances:

  • Too small: overhead dominates

  • Too large: no overlap opportunity

Defined Under Namespace

Classes: PipelineStage

Constant Summary collapse

STATE_PENDING =

Pipeline states

:pending
STATE_TRANSFERRING =
:transferring
STATE_COMPUTING =
:computing
STATE_COMPLETE =
:complete
DEFAULT_CHUNK_SIZE =

Default chunk size: 512KB (good for PCIe 4.0)

512 * 1024

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(chunk_size: DEFAULT_CHUNK_SIZE, num_stages: 2) ⇒ Pipeliner

Returns a new instance of Pipeliner.

Parameters:

  • chunk_size (Integer) (defaults to: DEFAULT_CHUNK_SIZE)

    Size of each pipeline chunk

  • num_stages (Integer) (defaults to: 2)

    Number of concurrent stages (default 2)



36
37
38
39
40
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 36

def initialize(chunk_size: DEFAULT_CHUNK_SIZE, num_stages: 2)
  @chunk_size = chunk_size
  @num_stages = num_stages
  @streams = {}
end

Instance Attribute Details

#chunk_sizeInteger (readonly)

Returns Chunk size in bytes.

Returns:

  • (Integer)

    Chunk size in bytes



29
30
31
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 29

def chunk_size
  @chunk_size
end

#num_stagesInteger (readonly)

Returns Number of pipeline stages (double-buffering = 2).

Returns:

  • (Integer)

    Number of pipeline stages (double-buffering = 2)



32
33
34
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 32

def num_stages
  @num_stages
end

Class Method Details

.optimal_chunk_size(bandwidth_gbps:, compute_gflops:, flops_per_element: 1.0, element_size: 4) ⇒ Integer

Calculate optimal chunk size based on bandwidth and compute speed

Parameters:

  • bandwidth_gbps (Float)

    Transfer bandwidth in GB/s

  • compute_gflops (Float)

    Compute throughput in GFLOPS

  • flops_per_element (Float) (defaults to: 1.0)

    FLOPs per element in reduction

  • element_size (Integer) (defaults to: 4)

    Bytes per element

Returns:

  • (Integer)

    Optimal chunk size in bytes



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 49

def self.optimal_chunk_size(bandwidth_gbps:, compute_gflops:, flops_per_element: 1.0, element_size: 4)
  # Balance: transfer_time = compute_time
  # chunk_size / bandwidth = (chunk_size / element_size) * flops_per_element / compute_throughput
  
  # Convert GB/s to bytes/s
  bandwidth_bps = bandwidth_gbps * 1e9
  # Convert GFLOPS to FLOPS
  compute_flops = compute_gflops * 1e9

  # chunk_size / bandwidth = (chunk_size * flops_per_element) / (element_size * compute_flops)
  # Solving: chunk_size = bandwidth * element_size * compute_flops / (compute_flops + bandwidth * flops_per_element)
  
  # Simplified: aim for transfer time ~ compute time
  # chunk ~= bandwidth * target_latency_seconds
  target_latency = 0.001  # 1ms pipeline stage
  optimal = (bandwidth_bps * target_latency).to_i

  # Clamp to reasonable range
  [[optimal, 64 * 1024].max, 4 * 1024 * 1024].min
end

Instance Method Details

#execute_pipelined(buffers:, total_size:, dtype:, op:, transfer_fn:, reduce_fn:, streams: nil) ⇒ void

This method returns an undefined value.

Execute a pipelined reduction operation

Parameters:

  • buffers (Array<FFI::Pointer>)

    Device buffers

  • total_size (Integer)

    Total buffer size

  • dtype (Symbol)

    Data type

  • op (Symbol)

    Reduction operation

  • transfer_fn (Proc)

    Transfer function (chunk_offset, chunk_size, stream) -> void

  • reduce_fn (Proc)

    Reduce function (chunk_offset, chunk_size, stream) -> void

  • streams (Array<CUDA::Stream>) (defaults to: nil)

    CUDA streams (one per stage)



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 80

def execute_pipelined(buffers:, total_size:, dtype:, op:, transfer_fn:, reduce_fn:, streams: nil)
  # Calculate chunks
  n_chunks = (total_size + @chunk_size - 1) / @chunk_size
  
  # Create streams if not provided
  streams ||= create_streams(@num_stages, buffers[0])
  
  # Initialize pipeline stages
  stages = n_chunks.times.map do |i|
    offset = i * @chunk_size
    size = [total_size - offset, @chunk_size].min
    PipelineStage.new(chunk_id: i, offset: offset, size: size, state: STATE_PENDING)
  end

  # Pipeline execution
  active_stages = []
  pending_stages = stages.dup
  
  while pending_stages.any? || active_stages.any?
    # Start new transfers up to num_stages
    while active_stages.size < @num_stages && pending_stages.any?
      stage = pending_stages.shift
      stream = streams[stage.chunk_id % @num_stages]
      
      # Start transfer
      transfer_fn.call(stage.offset, stage.size, stream)
      stage.state = STATE_TRANSFERRING
      active_stages << stage
    end

    # Check for completed transfers, start compute
    active_stages.each do |stage|
      if stage.state == STATE_TRANSFERRING
        stream = streams[stage.chunk_id % @num_stages]
        
        # Sync to ensure transfer complete
        sync_stream(stream)
        
        # Start compute
        reduce_fn.call(stage.offset, stage.size, stream)
        stage.state = STATE_COMPUTING
      end
    end

    # Check for completed compute
    active_stages.reject! do |stage|
      if stage.state == STATE_COMPUTING
        stream = streams[stage.chunk_id % @num_stages]
        sync_stream(stream)
        stage.state = STATE_COMPLETE
        true
      else
        false
      end
    end
  end
end

#execute_pipelined_all_reduce(ring:, buffers:, total_size:, dtype:, op:, gpu_streams: nil) ⇒ void

This method returns an undefined value.

Execute a pipelined AllReduce with overlap

Parameters:

  • ring (Ring)

    Ring algorithm instance

  • buffers (Array<FFI::Pointer>)

    Device buffers

  • total_size (Integer)

    Total buffer size

  • dtype (Symbol)

    Data type

  • op (Symbol)

    Reduction operation

  • gpu_streams (Array<Array<CUDA::Stream>>) (defaults to: nil)

    Streams per GPU per stage



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/nvruby/collective/algorithms/pipeliner.rb', line 147

def execute_pipelined_all_reduce(ring:, buffers:, total_size:, dtype:, op:, gpu_streams: nil)
  n_gpus = ring.n_gpus
  n_chunks = (total_size + @chunk_size - 1) / @chunk_size

  # Create streams: 2 per GPU for double-buffering
  gpu_streams ||= n_gpus.times.map do |rank|
    CUDA::RuntimeAPI.cudaSetDevice(ring.ring_order[rank])
    create_streams_for_device(@num_stages)
  end

  # Process in pipeline fashion
  n_chunks.times do |chunk_id|
    chunk_offset = chunk_id * @chunk_size
    chunk_size = [total_size - chunk_offset, @chunk_size].min

    stage_idx = chunk_id % @num_stages

    # Get streams for this stage
    stage_streams = gpu_streams.map { |streams| streams[stage_idx] }

    # Create offset buffers
    chunk_buffers = buffers.map { |buf| ptr_offset(buf, chunk_offset) }

    # Execute ring all-reduce on this chunk
    ring.all_reduce(
      buffers: chunk_buffers,
      sizes: [chunk_size] * n_gpus,
      dtype: dtype,
      op: op,
      streams: stage_streams
    )
  end

  # Final sync
  gpu_streams.flatten.each { |s| sync_stream(s) }
end