Module: Legion::LLM::Fleet::ReplyDispatcher

Extended by:
Legion::Logging::Helper
Defined in:
lib/legion/llm/fleet/reply_dispatcher.rb

Class Method Summary collapse

Class Method Details

.agent_queue_nameObject



52
53
54
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 52

def agent_queue_name
  @agent_queue_name ||= "llm.fleet.reply.#{SecureRandom.hex(8)}"
end

.cancel_consumerObject



86
87
88
89
90
91
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 86

def cancel_consumer
  @consumer&.cancel
  @consumer = nil
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.cancel_consumer')
end

.delivery_value(key, payload, properties) ⇒ Object



141
142
143
144
145
146
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 141

def delivery_value(key, payload, properties)
  return properties[key] if properties.is_a?(Hash) && properties.key?(key)
  return properties[key.to_s] if properties.is_a?(Hash) && properties.key?(key.to_s)

  payload_value(payload, key)
end

.deregister(correlation_id) ⇒ Object



25
26
27
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 25

def deregister(correlation_id)
  @pending.delete(correlation_id)
end

.ensure_consumerObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 67

def ensure_consumer
  @mutex.synchronize do
    return if @consumer
    return unless transport_available?

    channel = Legion::Transport.connection.create_channel
    queue = channel.queue(agent_queue_name, auto_delete: true, durable: false)
    @consumer = queue.subscribe(manual_ack: false) do |_delivery, properties, body|
      props = {
        correlation_id: properties.correlation_id,
        type:           properties.type
      }
      handle_delivery(body, props)
    end
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.ensure_consumer')
end

.expected_delivery?(expected, payload, properties) ⇒ Boolean

Returns:

  • (Boolean)


130
131
132
133
134
135
136
137
138
139
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 130

def expected_delivery?(expected, payload, properties)
  return true unless expected.is_a?(Hash)

  expected.all? do |key, expected_value|
    next true if expected_value.nil?

    actual = delivery_value(key, payload, properties)
    actual && actual.to_s == expected_value.to_s
  end
end

.handle_delivery(raw_payload, properties = {}) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 29

def handle_delivery(raw_payload, properties = {})
  payload = parse_payload(raw_payload)
  cid = delivery_value(:correlation_id, payload, properties)
  return unless cid

  entry = @pending[cid]
  return unless entry
  return unless expected_delivery?(entry[:expected], payload, properties)

  future = @pending.delete(cid)[:future]

  # Type-aware dispatch (new protocol) with fallback to legacy (no type)
  case delivery_value(:type, payload, properties)
  when 'llm.fleet.error'
    future.fulfill(normalize_error(payload, correlation_id: cid))
  else
    # 'llm.fleet.response' or legacy (no type)
    future.fulfill(payload)
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.handle_delivery')
end

.normalize_error(payload, correlation_id: nil) ⇒ Object



113
114
115
116
117
118
119
120
121
122
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 113

def normalize_error(payload, correlation_id: nil)
  error = payload_value(payload, :error) || {}
  {
    success:         false,
    error:           normalized_error_code(error),
    correlation_id:  payload_value(payload, :correlation_id) || correlation_id,
    message_context: payload_value(payload, :message_context) || {},
    raw_error:       error
  }
end

.normalized_error_code(error) ⇒ Object



124
125
126
127
128
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 124

def normalized_error_code(error)
  return error.to_s unless error.is_a?(Hash)

  payload_value(error, :code) || payload_value(error, :message) || 'fleet_error'
end

.parse_payload(raw) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 99

def parse_payload(raw)
  return raw if raw.is_a?(Hash)

  if defined?(Legion::JSON)
    Legion::JSON.load(raw)
  else
    require 'json'
    ::JSON.parse(raw, symbolize_names: true)
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.parse_payload')
  {}
end

.payload_value(payload, key) ⇒ Object



148
149
150
151
152
153
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 148

def payload_value(payload, key)
  return payload[key] if payload.is_a?(Hash) && payload.key?(key)
  return payload[key.to_s] if payload.is_a?(Hash) && payload.key?(key.to_s)

  nil
end

.pending_countObject



56
57
58
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 56

def pending_count
  @pending.size
end

.register(correlation_id, expected: {}) ⇒ Object



18
19
20
21
22
23
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 18

def register(correlation_id, expected: {})
  future = Concurrent::Promises.resolvable_future
  @pending[correlation_id] = { future: future, expected: expected || {} }
  ensure_consumer
  future
end

.reset!Object



60
61
62
63
64
65
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 60

def reset!
  @mutex.synchronize do
    cancel_consumer
    @pending = Concurrent::Map.new
  end
end

.transport_available?Boolean

Returns:

  • (Boolean)


93
94
95
96
97
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 93

def transport_available?
  defined?(Legion::Transport) &&
    Legion::Transport.respond_to?(:connection) &&
    Legion::Transport.connection
end