Module: Legion::LLM::Fleet::ReplyDispatcher

Extended by:
Legion::Logging::Helper
Defined in:
lib/legion/llm/fleet/reply_dispatcher.rb

Class Method Summary collapse

Class Method Details

.accepted_protocol?(payload) ⇒ Boolean

Returns:

  • (Boolean)


128
129
130
131
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 128

def accepted_protocol?(payload)
  payload[:protocol_version] == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION ||
    payload[:protocol_version] == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION.to_s
end

.accepted_type?(type) ⇒ Boolean

Returns:

  • (Boolean)


121
122
123
124
125
126
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 121

def accepted_type?(type)
  [
    ::Legion::Extensions::Llm::Fleet::Protocol::RESPONSE_TYPE,
    ::Legion::Extensions::Llm::Fleet::Protocol::ERROR_TYPE
  ].include?(type.to_s)
end

.agent_queue_nameObject



62
63
64
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 62

def agent_queue_name
  @mutex.synchronize { current_agent_queue_name }
end

.cancel_consumerObject



99
100
101
102
103
104
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 99

def cancel_consumer
  @consumer&.cancel
  @consumer = nil
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.cancel_consumer')
end

.correlation_id_for(payload, properties) ⇒ Object



152
153
154
155
156
157
158
159
160
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 152

def correlation_id_for(payload, properties)
  payload_cid = payload[:correlation_id]
  return nil if payload_cid.to_s.empty?

  property_cid = properties[:correlation_id]
  return nil if property_cid && property_cid.to_s != payload_cid.to_s

  payload_cid
end

.current_agent_queue_nameObject



195
196
197
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 195

def current_agent_queue_name
  @current_agent_queue_name ||= "#{reply_queue_prefix}.#{SecureRandom.hex(8)}"
end

.delivery_value(key, payload, properties) ⇒ Object



162
163
164
165
166
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 162

def delivery_value(key, payload, properties)
  return properties[key] if key.to_sym == :type && properties.is_a?(Hash) && properties.key?(key)

  payload[key] if payload.is_a?(Hash)
end

.deregister(correlation_id) ⇒ Object



32
33
34
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 32

def deregister(correlation_id)
  @pending.delete(correlation_id)
end

.ensure_consumerObject



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 78

def ensure_consumer
  @mutex.synchronize do
    return if @consumer
    return unless transport_available?

    channel = Legion::Transport.connection.create_channel
    queue = channel.queue(current_agent_queue_name, auto_delete: true, durable: false,
                                            expires: reply_queue_expires_ms)
    @consumer = queue.subscribe(manual_ack: false) do |_delivery, properties, body|
      props = {
        correlation_id: properties.correlation_id,
        type:           properties.type
      }
      handle_delivery(body, props)
    end
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.ensure_consumer')
  raise
end

.expected_delivery?(expected, payload, properties) ⇒ Boolean

Returns:

  • (Boolean)


143
144
145
146
147
148
149
150
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 143

def expected_delivery?(expected, payload, properties)
  expected.all? do |key, expected_value|
    next true if expected_value.nil?

    actual = key.to_sym == :correlation_id ? payload[:correlation_id] : delivery_value(key, payload, properties)
    actual && actual.to_s == expected_value.to_s
  end
end

.handle_delivery(raw_payload, properties = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 36

def handle_delivery(raw_payload, properties = {})
  payload = normalize_hash(parse_payload(raw_payload))
  properties = normalize_hash(properties)
  return unless accepted_type?(delivery_value(:type, payload, properties))
  return unless accepted_protocol?(payload)
  return unless payload[:operation]

  cid = correlation_id_for(payload, properties)
  return unless cid

  entry = @pending[cid]
  return unless entry
  return unless expected_delivery?(entry[:expected], payload, properties)

  future = @pending.delete(cid)&.[](:future)
  return unless future

  if delivery_value(:type, payload, properties) == ::Legion::Extensions::Llm::Fleet::Protocol::ERROR_TYPE
    future.fulfill(normalize_error(payload, correlation_id: cid))
  else
    future.fulfill(payload)
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.handle_delivery')
end

.normalize_error(payload, correlation_id: nil) ⇒ Object



133
134
135
136
137
138
139
140
141
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 133

def normalize_error(payload, correlation_id: nil)
  code = payload[:code] || payload.dig(:error, :code) || payload[:message] || 'fleet_error'
  {
    success:         false,
    error:           code,
    correlation_id:  payload[:correlation_id] || correlation_id,
    message_context: payload[:message_context] || {}
  }
end

.normalize_hash(hash) ⇒ Object



168
169
170
171
172
173
174
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 168

def normalize_hash(hash)
  return {} unless hash.respond_to?(:each)

  hash.each_with_object({}) do |(key, value), result|
    result[key.respond_to?(:to_sym) ? key.to_sym : key] = normalize_value(value)
  end
end

.normalize_value(value) ⇒ Object



176
177
178
179
180
181
182
183
184
185
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 176

def normalize_value(value)
  case value
  when Hash
    normalize_hash(value)
  when Array
    value.map { |entry| normalize_value(entry) }
  else
    value
  end
end

.parse_payload(raw) ⇒ Object



112
113
114
115
116
117
118
119
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 112

def parse_payload(raw)
  return raw if raw.is_a?(Hash)

  Legion::JSON.load(raw)
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.parse_payload')
  {}
end

.pending_countObject



66
67
68
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 66

def pending_count
  @pending.size
end

.register(correlation_id, expected: {}) ⇒ Object



21
22
23
24
25
26
27
28
29
30
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 21

def register(correlation_id, expected: {})
  future = Concurrent::Promises.resolvable_future
  @pending[correlation_id] = { future: future, expected: normalize_hash(expected || {}) }
  ensure_consumer
  future
rescue StandardError => e
  log.debug "[llm][fleet][reply_dispatcher] action=register_cleanup correlation_id=#{correlation_id} error=#{e.class} message=#{e.message}"
  @pending.delete(correlation_id)
  raise
end

.reply_queue_expires_msObject



191
192
193
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 191

def reply_queue_expires_ms
  Legion::Settings[:llm][:fleet][:dispatch][:reply_queue_expires_ms]
end

.reply_queue_prefixObject



187
188
189
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 187

def reply_queue_prefix
  Legion::Settings[:llm][:fleet][:dispatch][:reply_queue_prefix]
end

.reset!Object



70
71
72
73
74
75
76
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 70

def reset!
  @mutex.synchronize do
    cancel_consumer
    @pending = Concurrent::Map.new
    @current_agent_queue_name = nil
  end
end

.transport_available?Boolean

Returns:

  • (Boolean)


106
107
108
109
110
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 106

def transport_available?
  defined?(Legion::Transport) &&
    Legion::Transport.respond_to?(:connection) &&
    Legion::Transport.connection
end