Module: Legion::LLM::Fleet::ReplyDispatcher

Extended by:
Legion::Logging::Helper
Defined in:
lib/legion/llm/fleet/reply_dispatcher.rb

Class Method Summary collapse

Class Method Details

.accepted_protocol?(payload) ⇒ Boolean

Returns:

  • (Boolean)


127
128
129
130
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 127

def accepted_protocol?(payload)
  payload[:protocol_version] == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION ||
    payload[:protocol_version] == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION.to_s
end

.accepted_type?(type) ⇒ Boolean

Returns:

  • (Boolean)


120
121
122
123
124
125
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 120

def accepted_type?(type)
  [
    ::Legion::Extensions::Llm::Fleet::Protocol::RESPONSE_TYPE,
    ::Legion::Extensions::Llm::Fleet::Protocol::ERROR_TYPE
  ].include?(type.to_s)
end

.agent_queue_nameObject



61
62
63
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 61

def agent_queue_name
  @mutex.synchronize { current_agent_queue_name }
end

.cancel_consumerObject



98
99
100
101
102
103
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 98

def cancel_consumer
  @consumer&.cancel
  @consumer = nil
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.cancel_consumer')
end

.correlation_id_for(payload, properties) ⇒ Object



151
152
153
154
155
156
157
158
159
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 151

def correlation_id_for(payload, properties)
  payload_cid = payload[:correlation_id]
  return nil if payload_cid.to_s.empty?

  property_cid = properties[:correlation_id]
  return nil if property_cid && property_cid.to_s != payload_cid.to_s

  payload_cid
end

.current_agent_queue_nameObject



194
195
196
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 194

def current_agent_queue_name
  @current_agent_queue_name ||= "#{reply_queue_prefix}.#{SecureRandom.hex(8)}"
end

.delivery_value(key, payload, properties) ⇒ Object



161
162
163
164
165
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 161

def delivery_value(key, payload, properties)
  return properties[key] if key.to_sym == :type && properties.is_a?(Hash) && properties.key?(key)

  payload[key] if payload.is_a?(Hash)
end

.deregister(correlation_id) ⇒ Object



31
32
33
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 31

def deregister(correlation_id)
  @pending.delete(correlation_id)
end

.ensure_consumerObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 77

def ensure_consumer
  @mutex.synchronize do
    return if @consumer
    return unless transport_available?

    channel = Legion::Transport.connection.create_channel
    queue = channel.queue(current_agent_queue_name, auto_delete: true, durable: false,
                                            expires: reply_queue_expires_ms)
    @consumer = queue.subscribe(manual_ack: false) do |_delivery, properties, body|
      props = {
        correlation_id: properties.correlation_id,
        type:           properties.type
      }
      handle_delivery(body, props)
    end
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.ensure_consumer')
  raise
end

.expected_delivery?(expected, payload, properties) ⇒ Boolean

Returns:

  • (Boolean)


142
143
144
145
146
147
148
149
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 142

def expected_delivery?(expected, payload, properties)
  expected.all? do |key, expected_value|
    next true if expected_value.nil?

    actual = key.to_sym == :correlation_id ? payload[:correlation_id] : delivery_value(key, payload, properties)
    actual && actual.to_s == expected_value.to_s
  end
end

.handle_delivery(raw_payload, properties = {}) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 35

def handle_delivery(raw_payload, properties = {})
  payload = normalize_hash(parse_payload(raw_payload))
  properties = normalize_hash(properties)
  return unless accepted_type?(delivery_value(:type, payload, properties))
  return unless accepted_protocol?(payload)
  return unless payload[:operation]

  cid = correlation_id_for(payload, properties)
  return unless cid

  entry = @pending[cid]
  return unless entry
  return unless expected_delivery?(entry[:expected], payload, properties)

  future = @pending.delete(cid)&.[](:future)
  return unless future

  if delivery_value(:type, payload, properties) == ::Legion::Extensions::Llm::Fleet::Protocol::ERROR_TYPE
    future.fulfill(normalize_error(payload, correlation_id: cid))
  else
    future.fulfill(payload)
  end
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.handle_delivery')
end

.normalize_error(payload, correlation_id: nil) ⇒ Object



132
133
134
135
136
137
138
139
140
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 132

def normalize_error(payload, correlation_id: nil)
  code = payload[:code] || payload.dig(:error, :code) || payload[:message] || 'fleet_error'
  {
    success:         false,
    error:           code,
    correlation_id:  payload[:correlation_id] || correlation_id,
    message_context: payload[:message_context] || {}
  }
end

.normalize_hash(hash) ⇒ Object



167
168
169
170
171
172
173
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 167

def normalize_hash(hash)
  return {} unless hash.respond_to?(:each)

  hash.each_with_object({}) do |(key, value), result|
    result[key.respond_to?(:to_sym) ? key.to_sym : key] = normalize_value(value)
  end
end

.normalize_value(value) ⇒ Object



175
176
177
178
179
180
181
182
183
184
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 175

def normalize_value(value)
  case value
  when Hash
    normalize_hash(value)
  when Array
    value.map { |entry| normalize_value(entry) }
  else
    value
  end
end

.parse_payload(raw) ⇒ Object



111
112
113
114
115
116
117
118
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 111

def parse_payload(raw)
  return raw if raw.is_a?(Hash)

  Legion::JSON.load(raw)
rescue StandardError => e
  handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.parse_payload')
  {}
end

.pending_countObject



65
66
67
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 65

def pending_count
  @pending.size
end

.register(correlation_id, expected: {}) ⇒ Object



21
22
23
24
25
26
27
28
29
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 21

def register(correlation_id, expected: {})
  future = Concurrent::Promises.resolvable_future
  @pending[correlation_id] = { future: future, expected: normalize_hash(expected || {}) }
  ensure_consumer
  future
rescue StandardError
  @pending.delete(correlation_id)
  raise
end

.reply_queue_expires_msObject



190
191
192
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 190

def reply_queue_expires_ms
  Legion::LLM::Settings.value(:fleet, :dispatch, :reply_queue_expires_ms, default: 60_000)
end

.reply_queue_prefixObject



186
187
188
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 186

def reply_queue_prefix
  Legion::LLM::Settings.value(:fleet, :dispatch, :reply_queue_prefix, default: 'llm.fleet.reply')
end

.reset!Object



69
70
71
72
73
74
75
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 69

def reset!
  @mutex.synchronize do
    cancel_consumer
    @pending = Concurrent::Map.new
    @current_agent_queue_name = nil
  end
end

.transport_available?Boolean

Returns:

  • (Boolean)


105
106
107
108
109
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 105

def transport_available?
  defined?(Legion::Transport) &&
    Legion::Transport.respond_to?(:connection) &&
    Legion::Transport.connection
end