Module: Legion::Extensions::Llm::Gateway::Helpers::ReplyDispatcher
- Defined in:
- lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb
Class Method Summary collapse
- .cancel_consumer ⇒ Object
- .deregister(correlation_id) ⇒ Object
-
.ensure_consumer ⇒ Object
private.
- .handle_delivery(raw_payload, properties = {}) ⇒ Object
- .log_warn(msg) ⇒ Object
- .parse_payload(raw) ⇒ Object
- .pending_count ⇒ Object
- .register(correlation_id) ⇒ Object
- .reset! ⇒ Object
- .transport_available? ⇒ Boolean
Class Method Details
.cancel_consumer ⇒ Object
73 74 75 76 77 78 |
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 73 def cancel_consumer @consumer&.cancel # rubocop:disable ThreadSafety/ClassInstanceVariable @consumer = nil # rubocop:disable ThreadSafety/ClassInstanceVariable rescue StandardError => e Legion::Logging.warn("ReplyDispatcher: cancel failed: #{e.}") if defined?(Legion::Logging) # rubocop:disable Legion/HelperMigration/DirectLogging, Legion/HelperMigration/LoggingGuard end |
.deregister(correlation_id) ⇒ Object
24 25 26 |
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 24 def deregister(correlation_id) @pending.delete(correlation_id) # rubocop:disable ThreadSafety/ClassInstanceVariable end |
.ensure_consumer ⇒ Object
private
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 54 def ensure_consumer @mutex.synchronize do return if @consumer return unless transport_available? queue_name = Rpc.agent_queue_name return unless queue_name channel = Legion::Transport.connection.create_channel queue = channel.queue(queue_name, auto_delete: true, durable: false) @consumer = queue.subscribe(manual_ack: false) do |_delivery, properties, body| props = { correlation_id: properties.correlation_id } handle_delivery(body, props) end end rescue StandardError => e Legion::Logging.warn("ReplyDispatcher: consumer setup failed: #{e.}") if defined?(Legion::Logging) # rubocop:disable Legion/HelperMigration/DirectLogging, Legion/HelperMigration/LoggingGuard end |
.handle_delivery(raw_payload, properties = {}) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 28 def handle_delivery(raw_payload, properties = {}) payload = parse_payload(raw_payload) cid = properties[:correlation_id] || payload[:correlation_id] return unless cid future = @pending.delete(cid) # rubocop:disable ThreadSafety/ClassInstanceVariable return unless future future.fulfill(payload.merge(success: true)) rescue StandardError => e Legion::Logging.warn("ReplyDispatcher: handle_delivery failed: #{e.}") if defined?(Legion::Logging) # rubocop:disable Legion/HelperMigration/DirectLogging, Legion/HelperMigration/LoggingGuard end |
.log_warn(msg) ⇒ Object
99 100 101 |
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 99 def log_warn(msg) Legion::Logging.warn(msg) if defined?(Legion::Logging) # rubocop:disable Legion/HelperMigration/DirectLogging, Legion/HelperMigration/LoggingGuard end |
.parse_payload(raw) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 86 def parse_payload(raw) return raw if raw.is_a?(Hash) if defined?(Legion::JSON) Legion::JSON.load(raw) # rubocop:disable Legion/HelperMigration/DirectJson else require 'json' ::JSON.parse(raw, symbolize_names: true) end rescue StandardError => _e {} end |
.pending_count ⇒ Object
41 42 43 |
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 41 def pending_count @pending.size # rubocop:disable ThreadSafety/ClassInstanceVariable end |
.register(correlation_id) ⇒ Object
17 18 19 20 21 22 |
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 17 def register(correlation_id) future = Concurrent::Promises.resolvable_future @pending[correlation_id] = future # rubocop:disable ThreadSafety/ClassInstanceVariable ensure_consumer future end |
.reset! ⇒ Object
45 46 47 48 49 50 |
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 45 def reset! @mutex.synchronize do cancel_consumer @pending = Concurrent::Map.new end end |
.transport_available? ⇒ Boolean
80 81 82 83 84 |
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 80 def transport_available? defined?(Legion::Transport) && # rubocop:disable Legion/HelperMigration/DefinedTransportGuard Legion::Transport.respond_to?(:connection) && Legion::Transport.connection end |