Module: Legion::LLM::Fleet::ReplyDispatcher

Extended by:
Legion::Logging::Helper
Defined in:
lib/legion/llm/fleet/reply_dispatcher.rb

Class Method Summary collapse

Class Method Details

.agent_queue_nameObject



67
68
69
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 67

def agent_queue_name
  @agent_queue_name ||= "llm.fleet.reply.#{SecureRandom.hex(8)}"
end

.cancel_consumerObject



101
102
103
104
105
106
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 101

def cancel_consumer
  @consumer&.cancel
  @consumer = nil
rescue StandardError => e
  handle_exception(e, level: :warn)
end

.deregister(correlation_id) ⇒ Object



25
26
27
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 25

def deregister(correlation_id)
  @pending.delete(correlation_id)
end

.ensure_consumerObject



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 82

def ensure_consumer
  @mutex.synchronize do
    return if @consumer
    return unless transport_available?

    channel = Legion::Transport.connection.create_channel
    queue = channel.queue(agent_queue_name, auto_delete: true, durable: false)
    @consumer = queue.subscribe(manual_ack: false) do |_delivery, properties, body|
      props = {
        correlation_id: properties.correlation_id,
        type:           properties.type
      }
      handle_delivery(body, props)
    end
  end
rescue StandardError => e
  handle_exception(e, level: :warn)
end

.fulfill_nack(correlation_id) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 58

def fulfill_nack(correlation_id)
  future = @pending.delete(correlation_id)
  return unless future

  future.fulfill({ success: false, error: 'fleet_backpressure' })
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.fulfill_nack')
end

.fulfill_return(correlation_id) ⇒ Object



49
50
51
52
53
54
55
56
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 49

def fulfill_return(correlation_id)
  future = @pending.delete(correlation_id)
  return unless future

  future.fulfill({ success: false, error: 'no_fleet_queue' })
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.fulfill_return')
end

.handle_delivery(raw_payload, properties = {}) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 29

def handle_delivery(raw_payload, properties = {})
  payload = parse_payload(raw_payload)
  cid = properties[:correlation_id] || payload[:correlation_id]
  return unless cid

  future = @pending.delete(cid)
  return unless future

  # Type-aware dispatch (new protocol) with fallback to legacy (no type)
  case properties[:type]
  when 'llm.fleet.error'
    future.fulfill(normalize_error(payload))
  else
    # 'llm.fleet.response' or legacy (no type)
    future.fulfill(payload)
  end
rescue StandardError => e
  handle_exception(e, level: :warn)
end

.normalize_error(payload) ⇒ Object



128
129
130
131
132
133
134
135
136
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 128

def normalize_error(payload)
  error = payload[:error] || {}
  {
    success:         false,
    error:           error.is_a?(Hash) ? error[:code] || error[:message] || 'fleet_error' : error.to_s,
    message_context: payload[:message_context] || {},
    raw_error:       error
  }
end

.parse_payload(raw) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 114

def parse_payload(raw)
  return raw if raw.is_a?(Hash)

  if defined?(Legion::JSON)
    Legion::JSON.load(raw)
  else
    require 'json'
    ::JSON.parse(raw, symbolize_names: true)
  end
rescue StandardError => e
  handle_exception(e, level: :debug)
  {}
end

.pending_countObject



71
72
73
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 71

def pending_count
  @pending.size
end

.register(correlation_id) ⇒ Object



18
19
20
21
22
23
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 18

def register(correlation_id)
  future = Concurrent::Promises.resolvable_future
  @pending[correlation_id] = future
  ensure_consumer
  future
end

.reset!Object



75
76
77
78
79
80
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 75

def reset!
  @mutex.synchronize do
    cancel_consumer
    @pending = Concurrent::Map.new
  end
end

.transport_available?Boolean

Returns:

  • (Boolean)


108
109
110
111
112
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 108

def transport_available?
  defined?(Legion::Transport) &&
    Legion::Transport.respond_to?(:connection) &&
    Legion::Transport.connection
end