Module: Legion::Extensions::Llm::Gateway::Runners::FleetHandler

Defined in:
lib/legion/extensions/llm/gateway/runners/fleet_handler.rb

Class Method Summary collapse

Class Method Details

.build_response(correlation_id, response) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 72

def build_response(correlation_id, response)
  {
    correlation_id:  correlation_id,
    response:        response,
    input_tokens:    extract_token(response, :input_tokens),
    output_tokens:   extract_token(response, :output_tokens),
    thinking_tokens: extract_token(response, :thinking_tokens),
    provider:        extract_field(response, :provider),
    model_id:        extract_field(response, :model)
  }
end

.call_chat(payload) ⇒ Object



48
49
50
51
52
53
54
55
56
57
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 48

def call_chat(payload)
  messages = payload[:messages]
  if messages.is_a?(Array) && messages.size > 1
    Legion::LLM.chat(model: payload[:model], messages: messages, # rubocop:disable Legion/HelperMigration/DirectLlm
                     caller: { extension: 'lex-llm-gateway', operation: 'fleet' })
  else
    Legion::LLM.chat(model: payload[:model], message: messages&.dig(0, :content), # rubocop:disable Legion/HelperMigration/DirectLlm
                     caller: { extension: 'lex-llm-gateway', operation: 'fleet' })
  end
end

.call_embed(payload) ⇒ Object



66
67
68
69
70
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 66

def call_embed(payload)
  text = payload[:text] || payload.dig(:messages, 0, :content)
  Legion::LLM.embed(model: payload[:model], text: text, # rubocop:disable Legion/HelperMigration/DirectLlm
                    caller: { extension: 'lex-llm-gateway', operation: 'fleet' })
end

.call_local_llm(payload) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 35

def call_local_llm(payload)
  return { error: 'llm_not_available' } unless defined?(Legion::LLM)

  case payload[:request_type]&.to_s
  when 'structured'
    call_structured(payload)
  when 'embed'
    call_embed(payload)
  else
    call_chat(payload)
  end
end

.call_structured(payload) ⇒ Object



59
60
61
62
63
64
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 59

def call_structured(payload)
  Legion::LLM.structured(model:    payload[:model], # rubocop:disable Legion/HelperMigration/DirectLlm
                         messages: payload[:messages],
                         schema:   payload[:schema],
                         caller:   { extension: 'lex-llm-gateway', operation: 'fleet' })
end

.extract_field(response, field) ⇒ Object



116
117
118
119
120
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 116

def extract_field(response, field)
  return nil unless response.respond_to?(field)

  response.public_send(field)
end

.extract_token(response, field) ⇒ Object



110
111
112
113
114
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 110

def extract_token(response, field)
  return 0 unless response.respond_to?(field)

  response.public_send(field).to_i
end

.handle_fleet_request(payload) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 11

def handle_fleet_request(payload)
  token = payload[:signed_token]
  if require_auth? && !valid_token?(token)
    error_response = { success: false, error: 'invalid_token' }
    publish_reply(payload[:reply_to], payload[:correlation_id], error_response) if payload[:reply_to]
    return error_response
  end

  response = call_local_llm(payload)
  response_hash = build_response(payload[:correlation_id], response)
  publish_reply(payload[:reply_to], payload[:correlation_id], response_hash) if payload[:reply_to]
  response_hash
end

.log_warn(msg) ⇒ Object



106
107
108
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 106

def log_warn(msg)
  Legion::Logging.warn(msg) if defined?(Legion::Logging) # rubocop:disable Legion/HelperMigration/DirectLogging, Legion/HelperMigration/LoggingGuard
end

.publish_reply(reply_to, correlation_id, response_hash) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 84

def publish_reply(reply_to, correlation_id, response_hash)
  return unless defined?(Legion::Transport) # rubocop:disable Legion/HelperMigration/DefinedTransportGuard

  payload = if defined?(Legion::JSON)
              Legion::JSON.dump(response_hash) # rubocop:disable Legion/HelperMigration/DirectJson
            else
              require 'json'
              ::JSON.generate(response_hash)
            end

  channel = Legion::Transport.connection.create_channel
  channel.default_exchange.publish(
    payload,
    routing_key:    reply_to,
    correlation_id: correlation_id,
    content_type:   'application/json'
  )
  channel.close
rescue StandardError => e
  Legion::Logging.warn("FleetHandler: publish_reply failed: #{e.message}") if defined?(Legion::Logging) # rubocop:disable Legion/HelperMigration/DirectLogging, Legion/HelperMigration/LoggingGuard
end

.require_auth?Boolean

Returns:

  • (Boolean)


25
26
27
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 25

def require_auth?
  Fleet.require_auth?
end

.valid_token?(token) ⇒ Boolean

Returns:

  • (Boolean)


29
30
31
32
33
# File 'lib/legion/extensions/llm/gateway/runners/fleet_handler.rb', line 29

def valid_token?(token)
  return true if token.nil? && !require_auth?

  !Helpers::Auth.validate_token(token).nil?
end