Class: Ignis::Collective::ResilientTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/nvruby/collective/resilient_transport.rb

Overview

Resilient transport wrapper with retry, fallback, and circuit breaker Inspired by RapidsMPF’s three-phase protocol and error handling patterns

Examples:

Usage

transport = ResilientTransport.new(
  src_device: 0, dst_device: 1,
  topology: topology_detector
)
transport.send_async(src_ptr, dst_ptr, size, stream)

Constant Summary collapse

MAX_RETRIES =

Maximum retry attempts before fallback

3
RETRY_DELAYS =

Retry delays with exponential backoff (seconds)

[0.1, 0.5, 1.0].freeze
FALLBACK_CHAIN =

Transport fallback chain (highest → lowest performance)

[:p2p, :ipc, :host_staged].freeze
CIRCUIT_BREAKER_THRESHOLD =

Circuit breaker threshold (failures before marking unhealthy)

3
CIRCUIT_BREAKER_RESET =

Circuit breaker reset time (seconds)

60.0
RECOVERABLE_ERRORS =

CUDA error codes that indicate transport failure

[
  702,  # CUDA_ERROR_LAUNCH_TIMEOUT
  716,  # CUDA_ERROR_PEER_ACCESS_NOT_ENABLED
  999,  # CUDA_ERROR_UNKNOWN
].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(src_device:, dst_device:, topology:, preferred_transport: nil) ⇒ ResilientTransport

Create resilient transport wrapper

Parameters:

  • src_device (Integer)

    Source GPU

  • dst_device (Integer)

    Destination GPU

  • topology (Topology::Detector)

    Topology for path detection

  • preferred_transport (Symbol, nil) (defaults to: nil)

    Force specific transport



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/nvruby/collective/resilient_transport.rb', line 64

def initialize(src_device:, dst_device:, topology:, preferred_transport: nil)
  @src_device = src_device
  @dst_device = dst_device
  @topology = topology
  @preferred_transport = preferred_transport

  @transports = {}
  @health_status = Hash.new { |h, k| h[k] = { failures: 0, last_failure: nil } }
  @current_transport_type = nil
  @active_transport = nil
  @initialized = false
end

Instance Attribute Details

#active_transportTransport::Base (readonly)

Returns Active transport.

Returns:



53
54
55
# File 'lib/nvruby/collective/resilient_transport.rb', line 53

def active_transport
  @active_transport
end

#current_transport_typeSymbol (readonly)

Returns Current transport type.

Returns:

  • (Symbol)

    Current transport type



50
51
52
# File 'lib/nvruby/collective/resilient_transport.rb', line 50

def current_transport_type
  @current_transport_type
end

#dst_deviceInteger (readonly)

Returns Destination GPU device ID.

Returns:

  • (Integer)

    Destination GPU device ID



47
48
49
# File 'lib/nvruby/collective/resilient_transport.rb', line 47

def dst_device
  @dst_device
end

#health_statusHash (readonly)

Returns Transport health status.

Returns:

  • (Hash)

    Transport health status



56
57
58
# File 'lib/nvruby/collective/resilient_transport.rb', line 56

def health_status
  @health_status
end

#src_deviceInteger (readonly)

Returns Source GPU device ID.

Returns:

  • (Integer)

    Source GPU device ID



44
45
46
# File 'lib/nvruby/collective/resilient_transport.rb', line 44

def src_device
  @src_device
end

Instance Method Details

#destroy!void

This method returns an undefined value.

Clean up resources



171
172
173
174
175
176
# File 'lib/nvruby/collective/resilient_transport.rb', line 171

def destroy!
  @transports.each_value(&:destroy!)
  @transports.clear
  @active_transport = nil
  @initialized = false
end

#estimated_bandwidthFloat

Get estimated bandwidth

Returns:

  • (Float)

    GB/s



153
154
155
# File 'lib/nvruby/collective/resilient_transport.rb', line 153

def estimated_bandwidth
  @active_transport&.estimated_bandwidth || 0.0
end

#force_fallback!Boolean

Force fallback to next transport in chain

Returns:

  • (Boolean)

    True if fallback succeeded



159
160
161
# File 'lib/nvruby/collective/resilient_transport.rb', line 159

def force_fallback!
  try_fallback!
end

#healthy?Boolean

Check if transport is healthy

Returns:

  • (Boolean)

    True if healthy



141
142
143
# File 'lib/nvruby/collective/resilient_transport.rb', line 141

def healthy?
  @active_transport&.ready? && !circuit_open?(@current_transport_type)
end

#initialize!void

This method returns an undefined value.

Initialize transports



79
80
81
82
83
84
# File 'lib/nvruby/collective/resilient_transport.rb', line 79

def initialize!
  return if @initialized

  select_initial_transport!
  @initialized = true
end

#ready?Boolean

Check if any transport is available

Returns:

  • (Boolean)

    True if ready



147
148
149
# File 'lib/nvruby/collective/resilient_transport.rb', line 147

def ready?
  @initialized && @active_transport&.ready?
end

#reset_health!void

This method returns an undefined value.

Reset all circuit breakers



165
166
167
# File 'lib/nvruby/collective/resilient_transport.rb', line 165

def reset_health!
  @health_status.clear
end

#send_async(src_ptr, dst_ptr, size, stream = nil) ⇒ Boolean

Send data with retry and fallback

Parameters:

  • src_ptr (FFI::Pointer)

    Source buffer

  • dst_ptr (FFI::Pointer)

    Destination buffer

  • size (Integer)

    Bytes to transfer

  • stream (FFI::Pointer, nil) (defaults to: nil)

    CUDA stream

Returns:

  • (Boolean)

    Success status



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/nvruby/collective/resilient_transport.rb', line 93

def send_async(src_ptr, dst_ptr, size, stream = nil)
  ensure_initialized!

  attempt = 0
  last_error = nil

  while attempt < MAX_RETRIES
    begin
      # Transports expose copy_async(dst, src, size, stream) (P2P) or
      # send_async(buffer, size, stream) (base) — NOT a 4-arg send_async,
      # which raised ArgumentError on every attempt before.
      result = if @active_transport.respond_to?(:copy_async)
                 @active_transport.copy_async(dst_ptr, src_ptr, size, stream)
               else
                 @active_transport.send_async(src_ptr, size, stream)
               end
      reset_circuit_breaker!(@current_transport_type)
      return result
    rescue StandardError => e
      last_error = e
      record_failure!(@current_transport_type, e)
      attempt += 1

      if attempt < MAX_RETRIES
        sleep(RETRY_DELAYS[[attempt - 1, RETRY_DELAYS.size - 1].min])
      end
    end
  end

  # All retries failed, try fallback
  if try_fallback!
    send_async(src_ptr, dst_ptr, size, stream)
  else
    raise TransportError, "All transports failed: #{last_error&.message}"
  end
end

#synchronize(_stream = nil) ⇒ void

This method returns an undefined value.

Synchronize transfer completion

Parameters:

  • stream (FFI::Pointer, nil)

    CUDA stream



134
135
136
137
# File 'lib/nvruby/collective/resilient_transport.rb', line 134

def synchronize(_stream = nil)
  # Transports define synchronize! (no args), not synchronize(stream).
  @active_transport&.synchronize!
end

#to_sString

Returns Human-readable description.

Returns:

  • (String)

    Human-readable description



179
180
181
182
183
# File 'lib/nvruby/collective/resilient_transport.rb', line 179

def to_s
  status = healthy? ? "healthy" : "degraded"
  "ResilientTransport[#{@src_device}#{@dst_device}]: " \
    "#{@current_transport_type} (#{status})"
end