Class: Ignis::Collective::CommunicatorHealer
- Inherits:
-
Object
- Object
- Ignis::Collective::CommunicatorHealer
- Defined in:
- lib/nvruby/collective/communicator_healer.rb
Overview
Communicator healing for dynamic reconfiguration on GPU failure Inspired by Universal Checkpointing (USENIX ATC 2025) patterns
Enables recovery without full restart:
1. Detect failed GPUs via HealthMonitor
2. Exclude from active set
3. Rebuild topology and transports
4. Invalidate stale CUDA Graphs
5. Resume operations with reduced GPU count
Instance Attribute Summary collapse
-
#active_devices ⇒ Array<Integer>
readonly
Currently active GPU IDs.
-
#communicator ⇒ Communicator
readonly
Parent communicator.
-
#failed_devices ⇒ Array<Integer>
readonly
Failed GPU IDs.
-
#heal_count ⇒ Integer
readonly
Total heal operations performed.
-
#heal_history ⇒ Array<Hash>
readonly
Heal history.
Instance Method Summary collapse
-
#degraded? ⇒ Boolean
Check if any GPUs have failed.
-
#heal!(failed_gpu_ids) ⇒ Boolean
Perform healing operation - exclude failed GPUs and rebuild.
-
#health_summary ⇒ Hash
Get health summary.
-
#initialize(communicator) ⇒ CommunicatorHealer
constructor
Create healer for a communicator.
-
#on_post_heal {|failed_gpu_ids, status| ... } ⇒ Object
Register post-heal callback.
-
#on_pre_heal {|failed_gpu_ids| ... } ⇒ Object
Register pre-heal callback.
-
#recover!(gpu_id) ⇒ Boolean
Attempt to recover a failed GPU.
-
#register_cuda_graph(graph) ⇒ void
Register CUDA Graph for invalidation on heal.
-
#to_s ⇒ String
Human-readable status.
-
#unregister_cuda_graph(graph) ⇒ void
Unregister CUDA Graph.
-
#world_size ⇒ Integer
Get current world size (active GPUs).
Constructor Details
#initialize(communicator) ⇒ CommunicatorHealer
Create healer for a communicator
42 43 44 45 46 47 48 49 50 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 42 def initialize(communicator) @communicator = communicator @active_devices = communicator.gpu_ids.dup @failed_devices = [] @heal_count = 0 @heal_history = [] @callbacks = { pre_heal: [], post_heal: [] } @cuda_graph_cache = [] end |
Instance Attribute Details
#active_devices ⇒ Array<Integer> (readonly)
Returns Currently active GPU IDs.
28 29 30 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 28 def active_devices @active_devices end |
#communicator ⇒ Communicator (readonly)
Returns Parent communicator.
25 26 27 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 25 def communicator @communicator end |
#failed_devices ⇒ Array<Integer> (readonly)
Returns Failed GPU IDs.
31 32 33 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 31 def failed_devices @failed_devices end |
#heal_count ⇒ Integer (readonly)
Returns Total heal operations performed.
34 35 36 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 34 def heal_count @heal_count end |
#heal_history ⇒ Array<Hash> (readonly)
Returns Heal history.
37 38 39 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 37 def heal_history @heal_history end |
Instance Method Details
#degraded? ⇒ Boolean
Check if any GPUs have failed
145 146 147 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 145 def degraded? @failed_devices.any? end |
#heal!(failed_gpu_ids) ⇒ Boolean
Perform healing operation - exclude failed GPUs and rebuild
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 56 def heal!(failed_gpu_ids) return true if failed_gpu_ids.empty? notify_pre_heal(failed_gpu_ids) begin # 1. Record failed devices @failed_devices |= failed_gpu_ids @active_devices -= failed_gpu_ids # 2. Validate we have enough GPUs left if @active_devices.size < minimum_gpu_count raise HealingError, "Too few GPUs remaining: #{@active_devices.size}" end # 3. Invalidate CUDA Graphs (they reference old topology) invalidate_cuda_graphs! # 4. Rebuild topology for survivors rebuild_topology! # 5. Rebuild transports rebuild_transports! # 6. Update communicator state update_communicator_state! # 7. Record success record_heal(failed_gpu_ids, :success) notify_post_heal(failed_gpu_ids, :success) true rescue StandardError => e record_heal(failed_gpu_ids, :failed, e.) notify_post_heal(failed_gpu_ids, :failed) raise end end |
#health_summary ⇒ Hash
Get health summary
151 152 153 154 155 156 157 158 159 160 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 151 def health_summary { active_count: @active_devices.size, failed_count: @failed_devices.size, active_devices: @active_devices.dup, failed_devices: @failed_devices.dup, heal_count: @heal_count, degraded: degraded? } end |
#on_post_heal {|failed_gpu_ids, status| ... } ⇒ Object
Register post-heal callback
170 171 172 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 170 def on_post_heal(&block) @callbacks[:post_heal] << block end |
#on_pre_heal {|failed_gpu_ids| ... } ⇒ Object
Register pre-heal callback
164 165 166 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 164 def on_pre_heal(&block) @callbacks[:pre_heal] << block end |
#recover!(gpu_id) ⇒ Boolean
Attempt to recover a failed GPU
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 99 def recover!(gpu_id) return false unless @failed_devices.include?(gpu_id) # Test if GPU is responsive return false unless test_gpu_health(gpu_id) # Reintegrate @failed_devices.delete(gpu_id) @active_devices << gpu_id @active_devices.sort! # Rebuild topology with recovered GPU rebuild_topology! rebuild_transports! update_communicator_state! record_heal([gpu_id], :recovered) true rescue StandardError false end |
#register_cuda_graph(graph) ⇒ void
This method returns an undefined value.
Register CUDA Graph for invalidation on heal
125 126 127 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 125 def register_cuda_graph(graph) @cuda_graph_cache << graph unless @cuda_graph_cache.include?(graph) end |
#to_s ⇒ String
Returns Human-readable status.
175 176 177 178 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 175 def to_s status = degraded? ? "degraded" : "healthy" "CommunicatorHealer[#{@active_devices.size}/#{@communicator.gpu_ids.size} active, #{status}]" end |
#unregister_cuda_graph(graph) ⇒ void
This method returns an undefined value.
Unregister CUDA Graph
133 134 135 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 133 def unregister_cuda_graph(graph) @cuda_graph_cache.delete(graph) end |
#world_size ⇒ Integer
Get current world size (active GPUs)
139 140 141 |
# File 'lib/nvruby/collective/communicator_healer.rb', line 139 def world_size @active_devices.size end |