Module: Legion::Extensions::Llm::Gateway::Helpers::ReplyDispatcher

Defined in:
lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb

Class Method Summary collapse

Class Method Details

.cancel_consumerObject



73
74
75
76
77
78
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 73

def cancel_consumer
  @consumer&.cancel # rubocop:disable ThreadSafety/ClassInstanceVariable
  @consumer = nil # rubocop:disable ThreadSafety/ClassInstanceVariable
rescue StandardError => e
  Legion::Logging.warn("ReplyDispatcher: cancel failed: #{e.message}") if defined?(Legion::Logging) # rubocop:disable Legion/HelperMigration/DirectLogging, Legion/HelperMigration/LoggingGuard
end

.deregister(correlation_id) ⇒ Object



24
25
26
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 24

def deregister(correlation_id)
  @pending.delete(correlation_id) # rubocop:disable ThreadSafety/ClassInstanceVariable
end

.ensure_consumerObject

private



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 54

def ensure_consumer
  @mutex.synchronize do
    return if @consumer
    return unless transport_available?

    queue_name = Rpc.agent_queue_name
    return unless queue_name

    channel = Legion::Transport.connection.create_channel
    queue = channel.queue(queue_name, auto_delete: true, durable: false)
    @consumer = queue.subscribe(manual_ack: false) do |_delivery, properties, body|
      props = { correlation_id: properties.correlation_id }
      handle_delivery(body, props)
    end
  end
rescue StandardError => e
  Legion::Logging.warn("ReplyDispatcher: consumer setup failed: #{e.message}") if defined?(Legion::Logging) # rubocop:disable Legion/HelperMigration/DirectLogging, Legion/HelperMigration/LoggingGuard
end

.handle_delivery(raw_payload, properties = {}) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 28

def handle_delivery(raw_payload, properties = {})
  payload = parse_payload(raw_payload)
  cid = properties[:correlation_id] || payload[:correlation_id]
  return unless cid

  future = @pending.delete(cid) # rubocop:disable ThreadSafety/ClassInstanceVariable
  return unless future

  future.fulfill(payload.merge(success: true))
rescue StandardError => e
  Legion::Logging.warn("ReplyDispatcher: handle_delivery failed: #{e.message}") if defined?(Legion::Logging) # rubocop:disable Legion/HelperMigration/DirectLogging, Legion/HelperMigration/LoggingGuard
end

.log_warn(msg) ⇒ Object



99
100
101
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 99

def log_warn(msg)
  Legion::Logging.warn(msg) if defined?(Legion::Logging) # rubocop:disable Legion/HelperMigration/DirectLogging, Legion/HelperMigration/LoggingGuard
end

.parse_payload(raw) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 86

def parse_payload(raw)
  return raw if raw.is_a?(Hash)

  if defined?(Legion::JSON)
    Legion::JSON.load(raw) # rubocop:disable Legion/HelperMigration/DirectJson
  else
    require 'json'
    ::JSON.parse(raw, symbolize_names: true)
  end
rescue StandardError => _e
  {}
end

.pending_countObject



41
42
43
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 41

def pending_count
  @pending.size # rubocop:disable ThreadSafety/ClassInstanceVariable
end

.register(correlation_id) ⇒ Object



17
18
19
20
21
22
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 17

def register(correlation_id)
  future = Concurrent::Promises.resolvable_future
  @pending[correlation_id] = future # rubocop:disable ThreadSafety/ClassInstanceVariable
  ensure_consumer
  future
end

.reset!Object



45
46
47
48
49
50
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 45

def reset!
  @mutex.synchronize do
    cancel_consumer
    @pending = Concurrent::Map.new
  end
end

.transport_available?Boolean

Returns:

  • (Boolean)


80
81
82
83
84
# File 'lib/legion/extensions/llm/gateway/helpers/reply_dispatcher.rb', line 80

def transport_available?
  defined?(Legion::Transport) && # rubocop:disable Legion/HelperMigration/DefinedTransportGuard
    Legion::Transport.respond_to?(:connection) &&
    Legion::Transport.connection
end