Module: Legion::LLM::Fleet::ReplyDispatcher
- Extended by:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/fleet/reply_dispatcher.rb
Class Method Summary collapse
- .agent_queue_name ⇒ Object
- .cancel_consumer ⇒ Object
- .delivery_value(key, payload, properties) ⇒ Object
- .deregister(correlation_id) ⇒ Object
- .ensure_consumer ⇒ Object
- .expected_delivery?(expected, payload, properties) ⇒ Boolean
- .handle_delivery(raw_payload, properties = {}) ⇒ Object
- .normalize_error(payload, correlation_id: nil) ⇒ Object
- .normalized_error_code(error) ⇒ Object
- .parse_payload(raw) ⇒ Object
- .payload_value(payload, key) ⇒ Object
- .pending_count ⇒ Object
- .register(correlation_id, expected: {}) ⇒ Object
- .reset! ⇒ Object
- .transport_available? ⇒ Boolean
Class Method Details
.agent_queue_name ⇒ Object
52 53 54 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 52 def agent_queue_name @agent_queue_name ||= "llm.fleet.reply.#{SecureRandom.hex(8)}" end |
.cancel_consumer ⇒ Object
86 87 88 89 90 91 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 86 def cancel_consumer @consumer&.cancel @consumer = nil rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.cancel_consumer') end |
.delivery_value(key, payload, properties) ⇒ Object
141 142 143 144 145 146 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 141 def delivery_value(key, payload, properties) return properties[key] if properties.is_a?(Hash) && properties.key?(key) return properties[key.to_s] if properties.is_a?(Hash) && properties.key?(key.to_s) payload_value(payload, key) end |
.deregister(correlation_id) ⇒ Object
25 26 27 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 25 def deregister(correlation_id) @pending.delete(correlation_id) end |
.ensure_consumer ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 67 def ensure_consumer @mutex.synchronize do return if @consumer return unless transport_available? channel = Legion::Transport.connection.create_channel queue = channel.queue(agent_queue_name, auto_delete: true, durable: false) @consumer = queue.subscribe(manual_ack: false) do |_delivery, properties, body| props = { correlation_id: properties.correlation_id, type: properties.type } handle_delivery(body, props) end end rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.ensure_consumer') end |
.expected_delivery?(expected, payload, properties) ⇒ Boolean
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 130 def expected_delivery?(expected, payload, properties) return true unless expected.is_a?(Hash) expected.all? do |key, expected_value| next true if expected_value.nil? actual = delivery_value(key, payload, properties) actual && actual.to_s == expected_value.to_s end end |
.handle_delivery(raw_payload, properties = {}) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 29 def handle_delivery(raw_payload, properties = {}) payload = parse_payload(raw_payload) cid = delivery_value(:correlation_id, payload, properties) return unless cid entry = @pending[cid] return unless entry return unless expected_delivery?(entry[:expected], payload, properties) future = @pending.delete(cid)[:future] # Type-aware dispatch (new protocol) with fallback to legacy (no type) case delivery_value(:type, payload, properties) when 'llm.fleet.error' future.fulfill(normalize_error(payload, correlation_id: cid)) else # 'llm.fleet.response' or legacy (no type) future.fulfill(payload) end rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.handle_delivery') end |
.normalize_error(payload, correlation_id: nil) ⇒ Object
113 114 115 116 117 118 119 120 121 122 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 113 def normalize_error(payload, correlation_id: nil) error = payload_value(payload, :error) || {} { success: false, error: normalized_error_code(error), correlation_id: payload_value(payload, :correlation_id) || correlation_id, message_context: payload_value(payload, :message_context) || {}, raw_error: error } end |
.normalized_error_code(error) ⇒ Object
124 125 126 127 128 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 124 def normalized_error_code(error) return error.to_s unless error.is_a?(Hash) payload_value(error, :code) || payload_value(error, :message) || 'fleet_error' end |
.parse_payload(raw) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 99 def parse_payload(raw) return raw if raw.is_a?(Hash) if defined?(Legion::JSON) Legion::JSON.load(raw) else require 'json' ::JSON.parse(raw, symbolize_names: true) end rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.parse_payload') {} end |
.payload_value(payload, key) ⇒ Object
148 149 150 151 152 153 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 148 def payload_value(payload, key) return payload[key] if payload.is_a?(Hash) && payload.key?(key) return payload[key.to_s] if payload.is_a?(Hash) && payload.key?(key.to_s) nil end |
.pending_count ⇒ Object
56 57 58 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 56 def pending_count @pending.size end |
.register(correlation_id, expected: {}) ⇒ Object
18 19 20 21 22 23 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 18 def register(correlation_id, expected: {}) future = Concurrent::Promises.resolvable_future @pending[correlation_id] = { future: future, expected: expected || {} } ensure_consumer future end |
.reset! ⇒ Object
60 61 62 63 64 65 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 60 def reset! @mutex.synchronize do cancel_consumer @pending = Concurrent::Map.new end end |