Class: Ignis::Collective::CommunicatorHealer

Inherits:
Object
  • Object
show all
Defined in:
lib/nvruby/collective/communicator_healer.rb

Overview

Communicator healing for dynamic reconfiguration on GPU failure Inspired by Universal Checkpointing (USENIX ATC 2025) patterns

Enables recovery without full restart:

1. Detect failed GPUs via HealthMonitor
2. Exclude from active set
3. Rebuild topology and transports
4. Invalidate stale CUDA Graphs
5. Resume operations with reduced GPU count

Examples:

Usage with communicator

healer = CommunicatorHealer.new(communicator)
monitor.on_failure { |gpu| healer.heal!([gpu]) }

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(communicator) ⇒ CommunicatorHealer

Create healer for a communicator

Parameters:



42
43
44
45
46
47
48
49
50
# File 'lib/nvruby/collective/communicator_healer.rb', line 42

def initialize(communicator)
  @communicator = communicator
  @active_devices = communicator.gpu_ids.dup
  @failed_devices = []
  @heal_count = 0
  @heal_history = []
  @callbacks = { pre_heal: [], post_heal: [] }
  @cuda_graph_cache = []
end

Instance Attribute Details

#active_devicesArray<Integer> (readonly)

Returns Currently active GPU IDs.

Returns:

  • (Array<Integer>)

    Currently active GPU IDs



28
29
30
# File 'lib/nvruby/collective/communicator_healer.rb', line 28

def active_devices
  @active_devices
end

#communicatorCommunicator (readonly)

Returns Parent communicator.

Returns:



25
26
27
# File 'lib/nvruby/collective/communicator_healer.rb', line 25

def communicator
  @communicator
end

#failed_devicesArray<Integer> (readonly)

Returns Failed GPU IDs.

Returns:

  • (Array<Integer>)

    Failed GPU IDs



31
32
33
# File 'lib/nvruby/collective/communicator_healer.rb', line 31

def failed_devices
  @failed_devices
end

#heal_countInteger (readonly)

Returns Total heal operations performed.

Returns:

  • (Integer)

    Total heal operations performed



34
35
36
# File 'lib/nvruby/collective/communicator_healer.rb', line 34

def heal_count
  @heal_count
end

#heal_historyArray<Hash> (readonly)

Returns Heal history.

Returns:

  • (Array<Hash>)

    Heal history



37
38
39
# File 'lib/nvruby/collective/communicator_healer.rb', line 37

def heal_history
  @heal_history
end

Instance Method Details

#degraded?Boolean

Check if any GPUs have failed

Returns:

  • (Boolean)

    True if degraded



145
146
147
# File 'lib/nvruby/collective/communicator_healer.rb', line 145

def degraded?
  @failed_devices.any?
end

#heal!(failed_gpu_ids) ⇒ Boolean

Perform healing operation - exclude failed GPUs and rebuild

Parameters:

  • failed_gpu_ids (Array<Integer>)

    GPUs to exclude

Returns:

  • (Boolean)

    True if healing succeeded



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/nvruby/collective/communicator_healer.rb', line 56

def heal!(failed_gpu_ids)
  return true if failed_gpu_ids.empty?

  notify_pre_heal(failed_gpu_ids)

  begin
    # 1. Record failed devices
    @failed_devices |= failed_gpu_ids
    @active_devices -= failed_gpu_ids

    # 2. Validate we have enough GPUs left
    if @active_devices.size < minimum_gpu_count
      raise HealingError, "Too few GPUs remaining: #{@active_devices.size}"
    end

    # 3. Invalidate CUDA Graphs (they reference old topology)
    invalidate_cuda_graphs!

    # 4. Rebuild topology for survivors
    rebuild_topology!

    # 5. Rebuild transports
    rebuild_transports!

    # 6. Update communicator state
    update_communicator_state!

    # 7. Record success
    record_heal(failed_gpu_ids, :success)
    notify_post_heal(failed_gpu_ids, :success)

    true
  rescue StandardError => e
    record_heal(failed_gpu_ids, :failed, e.message)
    notify_post_heal(failed_gpu_ids, :failed)
    raise
  end
end

#health_summaryHash

Get health summary

Returns:

  • (Hash)

    Health statistics



151
152
153
154
155
156
157
158
159
160
# File 'lib/nvruby/collective/communicator_healer.rb', line 151

def health_summary
  {
    active_count: @active_devices.size,
    failed_count: @failed_devices.size,
    active_devices: @active_devices.dup,
    failed_devices: @failed_devices.dup,
    heal_count: @heal_count,
    degraded: degraded?
  }
end

#on_post_heal {|failed_gpu_ids, status| ... } ⇒ Object

Register post-heal callback

Yields:

  • (failed_gpu_ids, status)

    Called after healing



170
171
172
# File 'lib/nvruby/collective/communicator_healer.rb', line 170

def on_post_heal(&block)
  @callbacks[:post_heal] << block
end

#on_pre_heal {|failed_gpu_ids| ... } ⇒ Object

Register pre-heal callback

Yields:

  • (failed_gpu_ids)

    Called before healing



164
165
166
# File 'lib/nvruby/collective/communicator_healer.rb', line 164

def on_pre_heal(&block)
  @callbacks[:pre_heal] << block
end

#recover!(gpu_id) ⇒ Boolean

Attempt to recover a failed GPU

Parameters:

  • gpu_id (Integer)

    GPU to recover

Returns:

  • (Boolean)

    True if recovery succeeded



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/nvruby/collective/communicator_healer.rb', line 99

def recover!(gpu_id)
  return false unless @failed_devices.include?(gpu_id)

  # Test if GPU is responsive
  return false unless test_gpu_health(gpu_id)

  # Reintegrate
  @failed_devices.delete(gpu_id)
  @active_devices << gpu_id
  @active_devices.sort!

  # Rebuild topology with recovered GPU
  rebuild_topology!
  rebuild_transports!
  update_communicator_state!

  record_heal([gpu_id], :recovered)
  true
rescue StandardError
  false
end

#register_cuda_graph(graph) ⇒ void

This method returns an undefined value.

Register CUDA Graph for invalidation on heal

Parameters:

  • graph (CUDA::Graph, FFI::Pointer)

    Graph to track



125
126
127
# File 'lib/nvruby/collective/communicator_healer.rb', line 125

def register_cuda_graph(graph)
  @cuda_graph_cache << graph unless @cuda_graph_cache.include?(graph)
end

#to_sString

Returns Human-readable status.

Returns:

  • (String)

    Human-readable status



175
176
177
178
# File 'lib/nvruby/collective/communicator_healer.rb', line 175

def to_s
  status = degraded? ? "degraded" : "healthy"
  "CommunicatorHealer[#{@active_devices.size}/#{@communicator.gpu_ids.size} active, #{status}]"
end

#unregister_cuda_graph(graph) ⇒ void

This method returns an undefined value.

Unregister CUDA Graph

Parameters:

  • graph (CUDA::Graph, FFI::Pointer)

    Graph to untrack



133
134
135
# File 'lib/nvruby/collective/communicator_healer.rb', line 133

def unregister_cuda_graph(graph)
  @cuda_graph_cache.delete(graph)
end

#world_sizeInteger

Get current world size (active GPUs)

Returns:

  • (Integer)

    Number of active GPUs



139
140
141
# File 'lib/nvruby/collective/communicator_healer.rb', line 139

def world_size
  @active_devices.size
end