Module: Legion::LLM::Fleet::ReplyDispatcher
- Extended by:
- Legion::Logging::Helper
- Defined in:
- lib/legion/llm/fleet/reply_dispatcher.rb
Class Method Summary collapse
- .agent_queue_name ⇒ Object
- .cancel_consumer ⇒ Object
- .deregister(correlation_id) ⇒ Object
- .ensure_consumer ⇒ Object
- .fulfill_nack(correlation_id) ⇒ Object
- .fulfill_return(correlation_id) ⇒ Object
- .handle_delivery(raw_payload, properties = {}) ⇒ Object
- .normalize_error(payload) ⇒ Object
- .parse_payload(raw) ⇒ Object
- .pending_count ⇒ Object
- .register(correlation_id) ⇒ Object
- .reset! ⇒ Object
- .transport_available? ⇒ Boolean
Class Method Details
.agent_queue_name ⇒ Object
67 68 69 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 67 def agent_queue_name @agent_queue_name ||= "llm.fleet.reply.#{SecureRandom.hex(8)}" end |
.cancel_consumer ⇒ Object
101 102 103 104 105 106 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 101 def cancel_consumer @consumer&.cancel @consumer = nil rescue StandardError => e handle_exception(e, level: :warn) end |
.deregister(correlation_id) ⇒ Object
25 26 27 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 25 def deregister(correlation_id) @pending.delete(correlation_id) end |
.ensure_consumer ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 82 def ensure_consumer @mutex.synchronize do return if @consumer return unless transport_available? channel = Legion::Transport.connection.create_channel queue = channel.queue(agent_queue_name, auto_delete: true, durable: false) @consumer = queue.subscribe(manual_ack: false) do |_delivery, properties, body| props = { correlation_id: properties.correlation_id, type: properties.type } handle_delivery(body, props) end end rescue StandardError => e handle_exception(e, level: :warn) end |
.fulfill_nack(correlation_id) ⇒ Object
58 59 60 61 62 63 64 65 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 58 def fulfill_nack(correlation_id) future = @pending.delete(correlation_id) return unless future future.fulfill({ success: false, error: 'fleet_backpressure' }) rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.fulfill_nack') end |
.fulfill_return(correlation_id) ⇒ Object
49 50 51 52 53 54 55 56 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 49 def fulfill_return(correlation_id) future = @pending.delete(correlation_id) return unless future future.fulfill({ success: false, error: 'no_fleet_queue' }) rescue StandardError => e handle_exception(e, level: :warn, operation: 'llm.fleet.reply_dispatcher.fulfill_return') end |
.handle_delivery(raw_payload, properties = {}) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 29 def handle_delivery(raw_payload, properties = {}) payload = parse_payload(raw_payload) cid = properties[:correlation_id] || payload[:correlation_id] return unless cid future = @pending.delete(cid) return unless future # Type-aware dispatch (new protocol) with fallback to legacy (no type) case properties[:type] when 'llm.fleet.error' future.fulfill(normalize_error(payload)) else # 'llm.fleet.response' or legacy (no type) future.fulfill(payload) end rescue StandardError => e handle_exception(e, level: :warn) end |
.normalize_error(payload) ⇒ Object
128 129 130 131 132 133 134 135 136 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 128 def normalize_error(payload) error = payload[:error] || {} { success: false, error: error.is_a?(Hash) ? error[:code] || error[:message] || 'fleet_error' : error.to_s, message_context: payload[:message_context] || {}, raw_error: error } end |
.parse_payload(raw) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 114 def parse_payload(raw) return raw if raw.is_a?(Hash) if defined?(Legion::JSON) Legion::JSON.load(raw) else require 'json' ::JSON.parse(raw, symbolize_names: true) end rescue StandardError => e handle_exception(e, level: :debug) {} end |
.pending_count ⇒ Object
71 72 73 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 71 def pending_count @pending.size end |
.register(correlation_id) ⇒ Object
18 19 20 21 22 23 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 18 def register(correlation_id) future = Concurrent::Promises.resolvable_future @pending[correlation_id] = future ensure_consumer future end |
.reset! ⇒ Object
75 76 77 78 79 80 |
# File 'lib/legion/llm/fleet/reply_dispatcher.rb', line 75 def reset! @mutex.synchronize do cancel_consumer @pending = Concurrent::Map.new end end |