Class: Legion::LLM::Router::HealthTracker

Inherits:
Object
  • Object
show all
Includes:
Legion::Logging::Helper
Defined in:
lib/legion/llm/router/health_tracker.rb

Constant Summary collapse

OPEN_PENALTY =
-50
LATENCY_THRESHOLD_MS =
5000
LATENCY_PENALTY_STEP =
-10

Instance Method Summary collapse

Constructor Details

#initialize(window_seconds: 300, failure_threshold: 3, cooldown_seconds: 60) ⇒ HealthTracker

Returns a new instance of HealthTracker.



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/legion/llm/router/health_tracker.rb', line 14

def initialize(window_seconds: 300, failure_threshold: 3, cooldown_seconds: 60)
  @window_seconds    = window_seconds
  @failure_threshold = failure_threshold
  @cooldown_seconds  = cooldown_seconds

  @circuits       = {}
  @latency_window = {}
  @handlers       = {}
  @denied_models  = {}
  @mutex          = Monitor.new

  register_default_handlers
end

Instance Method Details

#adjustment(provider, instance: nil, offering_id: nil) ⇒ Object

Returns total priority adjustment for a provider. Combines circuit-breaker penalty and latency penalty. When instance: is given, returns that specific instance’s adjustment. When nil, returns the average across all known instances so one bad node penalizes the provider proportionally instead of globally.



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/legion/llm/router/health_tracker.rb', line 76

def adjustment(provider, instance: nil, offering_id: nil)
  if instance
    key = instance_key(provider, instance)
    return circuit_adjustment(key) + latency_adjustment(key)
  end

  # Check for known instances — return average adjustment if any exist.
  instances = known_instances(provider)
  if instances.empty?
    # Backward compat: use provider-level or offering-level key
    key = health_key(provider, offering_id)
    key = provider if offering_id && !tracked?(key) && tracked?(provider)
    return circuit_adjustment(key) + latency_adjustment(key)
  end

  adjustments = instances.map { |k| circuit_adjustment(k) + latency_adjustment(k) }
  (adjustments.sum.to_f / adjustments.size).round
end

#circuit_state(provider, instance: nil, offering_id: nil) ⇒ Object

Returns :closed, :open, or :half_open. When instance: is given, returns that specific instance’s state. When nil, returns the worst state across all known instances.



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/legion/llm/router/health_tracker.rb', line 98

def circuit_state(provider, instance: nil, offering_id: nil)
  return circuit_state_for_key(instance_key(provider, instance)) if instance

  # Check for known instances — return worst state if any exist
  instances = known_instances(provider)
  if instances.empty?
    # Backward compat: use provider-level or offering-level key
    key = health_key(provider, offering_id)
    key = provider if offering_id && !tracked?(key) && tracked?(provider)
    return circuit_state_for_key(key)
  end

  worst_circuit_state(instances)
end

#clear_denied(provider: nil, instance: nil) ⇒ Object

Clear denied models for a provider (or all if no args).



138
139
140
141
142
143
144
145
146
147
# File 'lib/legion/llm/router/health_tracker.rb', line 138

def clear_denied(provider: nil, instance: nil)
  @mutex.synchronize do
    if provider
      key = instance ? instance_key(provider, instance) : provider.to_s
      @denied_models.delete(key)
    else
      @denied_models.clear
    end
  end
end

#denied_modelsObject

List all denied models (for diagnostics).



133
134
135
# File 'lib/legion/llm/router/health_tracker.rb', line 133

def denied_models
  @mutex.synchronize { @denied_models.dup }
end

#deny_model(provider:, model:, instance: nil, reason: nil) ⇒ Object

Record that a model is denied for a provider+instance (e.g. AccessDenied). Excluded from routing until restart or explicit clear.



115
116
117
118
119
120
121
122
# File 'lib/legion/llm/router/health_tracker.rb', line 115

def deny_model(provider:, model:, instance: nil, reason: nil)
  key = instance ? instance_key(provider, instance) : provider.to_s
  @mutex.synchronize do
    @denied_models[key] ||= {}
    @denied_models[key][model.to_s] = { reason: reason, at: Time.now }
  end
  log.warn("[llm][health_tracker] action=model_denied provider=#{key} model=#{model} reason=#{reason}")
end

#model_denied?(provider:, model:, instance: nil) ⇒ Boolean

Check if a model is denied for a provider+instance.

Returns:

  • (Boolean)


125
126
127
128
129
130
# File 'lib/legion/llm/router/health_tracker.rb', line 125

def model_denied?(provider:, model:, instance: nil)
  key = instance ? instance_key(provider, instance) : provider.to_s
  @mutex.synchronize do
    !@denied_models.dig(key, model.to_s).nil?
  end
end

#register_handler(signal, &block) ⇒ Object

Register a custom handler for a signal type.



29
30
31
# File 'lib/legion/llm/router/health_tracker.rb', line 29

def register_handler(signal, &block)
  @handlers[signal.to_sym] = block
end

#report(provider:, signal:, value:, instance: nil, metadata: {}, offering_id: nil) ⇒ Object

Thread-safe signal intake. Dispatches to the registered handler if one exists. When instance: is given, tracks under “provider/instance”. When instance: is nil, tracks under “provider” (backward compat) or broadcasts to all known instances of that provider.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/legion/llm/router/health_tracker.rb', line 37

def report(provider:, signal:, value:, instance: nil, metadata: {}, offering_id: nil)
  sym     = signal.to_sym
  handler = @handlers[sym]
  return nil unless handler

  log.debug "[llm][health_tracker] action=signal_received provider=#{provider} instance=#{instance || 'all'} signal=#{sym} value=#{value}"

  if instance
    payload = build_payload(provider: provider, instance: instance,
                            key: instance_key(provider, instance),
                            offering_id: offering_id, signal: sym,
                            value: value, metadata: )
    @mutex.synchronize { handler.call(payload) }
  else
    instances = known_instances(provider)
    if instances.empty?
      payload = build_payload(provider: provider, instance: nil,
                              key: health_key(provider, offering_id),
                              offering_id: offering_id, signal: sym,
                              value: value, metadata: )
      @mutex.synchronize { handler.call(payload) }
    else
      @mutex.synchronize do
        instances.each do |inst_key|
          payload = build_payload(provider: provider, instance: nil,
                                  key: inst_key, offering_id: offering_id,
                                  signal: sym, value: value, metadata: )
          handler.call(payload)
        end
      end
    end
  end
end

#reset(provider, instance: nil, offering_id: nil) ⇒ Object

Clears circuit and latency data for a single provider.



150
151
152
153
154
155
156
# File 'lib/legion/llm/router/health_tracker.rb', line 150

def reset(provider, instance: nil, offering_id: nil)
  key = instance ? instance_key(provider, instance) : health_key(provider, offering_id)
  @mutex.synchronize do
    @circuits.delete(key)
    @latency_window.delete(key)
  end
end

#reset_allObject

Clears all state.



159
160
161
162
163
164
165
# File 'lib/legion/llm/router/health_tracker.rb', line 159

def reset_all
  @mutex.synchronize do
    @circuits.clear
    @latency_window.clear
    @denied_models.clear
  end
end