Module: Legion::LLM::Fleet::ReplyDispatcher
- Extended by:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/fleet/reply_dispatcher.rb
Class Method Summary collapse
- .accepted_protocol?(payload) ⇒ Boolean
- .accepted_type?(type) ⇒ Boolean
- .agent_queue_name ⇒ Object
- .cancel_consumer ⇒ Object
- .correlation_id_for(payload, properties) ⇒ Object
- .current_agent_queue_name ⇒ Object
- .delivery_value(key, payload, properties) ⇒ Object
- .deregister(correlation_id) ⇒ Object
- .ensure_consumer ⇒ Object
- .expected_delivery?(expected, payload, properties) ⇒ Boolean
- .handle_delivery(raw_payload, properties = {}) ⇒ Object
- .normalize_error(payload, correlation_id: nil) ⇒ Object
- .normalize_hash(hash) ⇒ Object
- .normalize_value(value) ⇒ Object
- .parse_payload(raw) ⇒ Object
- .pending_count ⇒ Object
- .register(correlation_id, expected: {}) ⇒ Object
- .reply_queue_expires_ms ⇒ Object
- .reply_queue_prefix ⇒ Object
- .reset! ⇒ Object
- .transport_available? ⇒ Boolean
Class Method Details
.accepted_protocol?(payload) ⇒ Boolean
127 128 129 130 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 127 def accepted_protocol?(payload) payload[:protocol_version] == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION || payload[:protocol_version] == ::Legion::Extensions::Llm::Fleet::Protocol::VERSION.to_s end |
.accepted_type?(type) ⇒ Boolean
120 121 122 123 124 125 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 120 def accepted_type?(type) [ ::Legion::Extensions::Llm::Fleet::Protocol::RESPONSE_TYPE, ::Legion::Extensions::Llm::Fleet::Protocol::ERROR_TYPE ].include?(type.to_s) end |
.agent_queue_name ⇒ Object
61 62 63 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 61 def agent_queue_name @mutex.synchronize { current_agent_queue_name } end |
.cancel_consumer ⇒ Object
98 99 100 101 102 103 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 98 def cancel_consumer @consumer&.cancel @consumer = nil rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.cancel_consumer') end |
.correlation_id_for(payload, properties) ⇒ Object
151 152 153 154 155 156 157 158 159 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 151 def correlation_id_for(payload, properties) payload_cid = payload[:correlation_id] return nil if payload_cid.to_s.empty? property_cid = properties[:correlation_id] return nil if property_cid && property_cid.to_s != payload_cid.to_s payload_cid end |
.current_agent_queue_name ⇒ Object
194 195 196 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 194 def current_agent_queue_name @current_agent_queue_name ||= "#{reply_queue_prefix}.#{SecureRandom.hex(8)}" end |
.delivery_value(key, payload, properties) ⇒ Object
161 162 163 164 165 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 161 def delivery_value(key, payload, properties) return properties[key] if key.to_sym == :type && properties.is_a?(Hash) && properties.key?(key) payload[key] if payload.is_a?(Hash) end |
.deregister(correlation_id) ⇒ Object
31 32 33 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 31 def deregister(correlation_id) @pending.delete(correlation_id) end |
.ensure_consumer ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 77 def ensure_consumer @mutex.synchronize do return if @consumer return unless transport_available? channel = Legion::Transport.connection.create_channel queue = channel.queue(current_agent_queue_name, auto_delete: true, durable: false, expires: reply_queue_expires_ms) @consumer = queue.subscribe(manual_ack: false) do |_delivery, properties, body| props = { correlation_id: properties.correlation_id, type: properties.type } handle_delivery(body, props) end end rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.ensure_consumer') raise end |
.expected_delivery?(expected, payload, properties) ⇒ Boolean
142 143 144 145 146 147 148 149 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 142 def expected_delivery?(expected, payload, properties) expected.all? do |key, expected_value| next true if expected_value.nil? actual = key.to_sym == :correlation_id ? payload[:correlation_id] : delivery_value(key, payload, properties) actual && actual.to_s == expected_value.to_s end end |
.handle_delivery(raw_payload, properties = {}) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 35 def handle_delivery(raw_payload, properties = {}) payload = normalize_hash(parse_payload(raw_payload)) properties = normalize_hash(properties) return unless accepted_type?(delivery_value(:type, payload, properties)) return unless accepted_protocol?(payload) return unless payload[:operation] cid = correlation_id_for(payload, properties) return unless cid entry = @pending[cid] return unless entry return unless expected_delivery?(entry[:expected], payload, properties) future = @pending.delete(cid)&.[](:future) return unless future if delivery_value(:type, payload, properties) == ::Legion::Extensions::Llm::Fleet::Protocol::ERROR_TYPE future.fulfill(normalize_error(payload, correlation_id: cid)) else future.fulfill(payload) end rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.handle_delivery') end |
.normalize_error(payload, correlation_id: nil) ⇒ Object
132 133 134 135 136 137 138 139 140 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 132 def normalize_error(payload, correlation_id: nil) code = payload[:code] || payload.dig(:error, :code) || payload[:message] || 'fleet_error' { success: false, error: code, correlation_id: payload[:correlation_id] || correlation_id, message_context: payload[:message_context] || {} } end |
.normalize_hash(hash) ⇒ Object
167 168 169 170 171 172 173 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 167 def normalize_hash(hash) return {} unless hash.respond_to?(:each) hash.each_with_object({}) do |(key, value), result| result[key.respond_to?(:to_sym) ? key.to_sym : key] = normalize_value(value) end end |
.normalize_value(value) ⇒ Object
175 176 177 178 179 180 181 182 183 184 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 175 def normalize_value(value) case value when Hash normalize_hash(value) when Array value.map { |entry| normalize_value(entry) } else value end end |
.parse_payload(raw) ⇒ Object
111 112 113 114 115 116 117 118 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 111 def parse_payload(raw) return raw if raw.is_a?(Hash) Legion::JSON.load(raw) rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.parse_payload') {} end |
.pending_count ⇒ Object
65 66 67 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 65 def pending_count @pending.size end |
.register(correlation_id, expected: {}) ⇒ Object
21 22 23 24 25 26 27 28 29 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 21 def register(correlation_id, expected: {}) future = Concurrent::Promises.resolvable_future @pending[correlation_id] = { future: future, expected: normalize_hash(expected || {}) } ensure_consumer future rescue StandardError @pending.delete(correlation_id) raise end |
.reply_queue_expires_ms ⇒ Object
190 191 192 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 190 def reply_queue_expires_ms Legion::LLM::Settings.value(:fleet, :dispatch, :reply_queue_expires_ms, default: 60_000) end |
.reply_queue_prefix ⇒ Object
186 187 188 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 186 def reply_queue_prefix Legion::LLM::Settings.value(:fleet, :dispatch, :reply_queue_prefix, default: 'llm.fleet.reply') end |
.reset! ⇒ Object
69 70 71 72 73 74 75 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 69 def reset! @mutex.synchronize do cancel_consumer @pending = Concurrent::Map.new @current_agent_queue_name = nil end end |