Class: Ignis::Collective::ResilientTransport
- Inherits:
-
Object
- Object
- Ignis::Collective::ResilientTransport
- Defined in:
- lib/nvruby/collective/resilient_transport.rb
Overview
Resilient transport wrapper with retry, fallback, and circuit breaker Inspired by RapidsMPF’s three-phase protocol and error handling patterns
Constant Summary collapse
- MAX_RETRIES =
Maximum retry attempts before fallback
3- RETRY_DELAYS =
Retry delays with exponential backoff (seconds)
[0.1, 0.5, 1.0].freeze
- FALLBACK_CHAIN =
Transport fallback chain (highest → lowest performance)
[:p2p, :ipc, :host_staged].freeze
- CIRCUIT_BREAKER_THRESHOLD =
Circuit breaker threshold (failures before marking unhealthy)
3- CIRCUIT_BREAKER_RESET =
Circuit breaker reset time (seconds)
60.0- RECOVERABLE_ERRORS =
CUDA error codes that indicate transport failure
[ 702, # CUDA_ERROR_LAUNCH_TIMEOUT 716, # CUDA_ERROR_PEER_ACCESS_NOT_ENABLED 999, # CUDA_ERROR_UNKNOWN ].freeze
Instance Attribute Summary collapse
-
#active_transport ⇒ Transport::Base
readonly
Active transport.
-
#current_transport_type ⇒ Symbol
readonly
Current transport type.
-
#dst_device ⇒ Integer
readonly
Destination GPU device ID.
-
#health_status ⇒ Hash
readonly
Transport health status.
-
#src_device ⇒ Integer
readonly
Source GPU device ID.
Instance Method Summary collapse
-
#destroy! ⇒ void
Clean up resources.
-
#estimated_bandwidth ⇒ Float
Get estimated bandwidth.
-
#force_fallback! ⇒ Boolean
Force fallback to next transport in chain.
-
#healthy? ⇒ Boolean
Check if transport is healthy.
-
#initialize(src_device:, dst_device:, topology:, preferred_transport: nil) ⇒ ResilientTransport
constructor
Create resilient transport wrapper.
-
#initialize! ⇒ void
Initialize transports.
-
#ready? ⇒ Boolean
Check if any transport is available.
-
#reset_health! ⇒ void
Reset all circuit breakers.
-
#send_async(src_ptr, dst_ptr, size, stream = nil) ⇒ Boolean
Send data with retry and fallback.
-
#synchronize(_stream = nil) ⇒ void
Synchronize transfer completion.
-
#to_s ⇒ String
Human-readable description.
Constructor Details
#initialize(src_device:, dst_device:, topology:, preferred_transport: nil) ⇒ ResilientTransport
Create resilient transport wrapper
64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 64 def initialize(src_device:, dst_device:, topology:, preferred_transport: nil) @src_device = src_device @dst_device = dst_device @topology = topology @preferred_transport = preferred_transport @transports = {} @health_status = Hash.new { |h, k| h[k] = { failures: 0, last_failure: nil } } @current_transport_type = nil @active_transport = nil @initialized = false end |
Instance Attribute Details
#active_transport ⇒ Transport::Base (readonly)
Returns Active transport.
53 54 55 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 53 def active_transport @active_transport end |
#current_transport_type ⇒ Symbol (readonly)
Returns Current transport type.
50 51 52 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 50 def current_transport_type @current_transport_type end |
#dst_device ⇒ Integer (readonly)
Returns Destination GPU device ID.
47 48 49 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 47 def dst_device @dst_device end |
#health_status ⇒ Hash (readonly)
Returns Transport health status.
56 57 58 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 56 def health_status @health_status end |
#src_device ⇒ Integer (readonly)
Returns Source GPU device ID.
44 45 46 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 44 def src_device @src_device end |
Instance Method Details
#destroy! ⇒ void
This method returns an undefined value.
Clean up resources
171 172 173 174 175 176 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 171 def destroy! @transports.each_value(&:destroy!) @transports.clear @active_transport = nil @initialized = false end |
#estimated_bandwidth ⇒ Float
Get estimated bandwidth
153 154 155 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 153 def estimated_bandwidth @active_transport&.estimated_bandwidth || 0.0 end |
#force_fallback! ⇒ Boolean
Force fallback to next transport in chain
159 160 161 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 159 def force_fallback! try_fallback! end |
#healthy? ⇒ Boolean
Check if transport is healthy
141 142 143 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 141 def healthy? @active_transport&.ready? && !circuit_open?(@current_transport_type) end |
#initialize! ⇒ void
This method returns an undefined value.
Initialize transports
79 80 81 82 83 84 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 79 def initialize! return if @initialized select_initial_transport! @initialized = true end |
#ready? ⇒ Boolean
Check if any transport is available
147 148 149 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 147 def ready? @initialized && @active_transport&.ready? end |
#reset_health! ⇒ void
This method returns an undefined value.
Reset all circuit breakers
165 166 167 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 165 def reset_health! @health_status.clear end |
#send_async(src_ptr, dst_ptr, size, stream = nil) ⇒ Boolean
Send data with retry and fallback
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 93 def send_async(src_ptr, dst_ptr, size, stream = nil) ensure_initialized! attempt = 0 last_error = nil while attempt < MAX_RETRIES begin # Transports expose copy_async(dst, src, size, stream) (P2P) or # send_async(buffer, size, stream) (base) — NOT a 4-arg send_async, # which raised ArgumentError on every attempt before. result = if @active_transport.respond_to?(:copy_async) @active_transport.copy_async(dst_ptr, src_ptr, size, stream) else @active_transport.send_async(src_ptr, size, stream) end reset_circuit_breaker!(@current_transport_type) return result rescue StandardError => e last_error = e record_failure!(@current_transport_type, e) attempt += 1 if attempt < MAX_RETRIES sleep(RETRY_DELAYS[[attempt - 1, RETRY_DELAYS.size - 1].min]) end end end # All retries failed, try fallback if try_fallback! send_async(src_ptr, dst_ptr, size, stream) else raise TransportError, "All transports failed: #{last_error&.}" end end |
#synchronize(_stream = nil) ⇒ void
This method returns an undefined value.
Synchronize transfer completion
134 135 136 137 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 134 def synchronize(_stream = nil) # Transports define synchronize! (no args), not synchronize(stream). @active_transport&.synchronize! end |
#to_s ⇒ String
Returns Human-readable description.
179 180 181 182 183 |
# File 'lib/nvruby/collective/resilient_transport.rb', line 179 def to_s status = healthy? ? "healthy" : "degraded" "ResilientTransport[#{@src_device}→#{@dst_device}]: " \ "#{@current_transport_type} (#{status})" end |