Module: Legion::Extensions::Ollama::Runners::Fleet
- Defined in:
- lib/legion/extensions/ollama/runners/fleet.rb
Overview
Fleet runner — handles inbound AMQP LLM request messages and dispatches them to the appropriate Ollama::Client method based on request_type.
Called by Actor::ModelWorker with use_runner? = false.
Class Method Summary collapse
-
.dispatch(model:, request_type:, **payload) ⇒ Hash
Dispatch to the correct Ollama client method by request_type.
-
.handle_request(model:, request_type: 'chat', reply_to: nil, correlation_id: nil, message_context: {}, **payload) ⇒ Object
Primary entry point called by the subscription actor.
-
.publish_error(reply_to:, correlation_id:, message_context:, model:, request_type:, error:) ⇒ Object
Publish a fleet error to the caller’s reply_to queue.
-
.publish_reply(reply_to:, correlation_id:, message_context:, model:, request_type:, result:, received_at:, returned_at:) ⇒ Object
Publish a successful fleet response to the caller’s reply_to queue.
Class Method Details
.dispatch(model:, request_type:, **payload) ⇒ Hash
Dispatch to the correct Ollama client method by request_type.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/legion/extensions/ollama/runners/fleet.rb', line 65 def dispatch(model:, request_type:, **payload) host = ollama_host ollama = Legion::Extensions::Ollama::Client.new(host: host) case request_type.to_s when 'embed' input = payload[:input] || payload[:text] ollama.(model: model, input: input, **payload.slice(:truncate, :options, :keep_alive, :dimensions)) when 'generate' ollama.generate(model: model, prompt: payload[:prompt], **payload.slice(:images, :format, :options, :system, :keep_alive)) else ollama.chat(model: model, messages: payload[:messages], **payload.slice(:tools, :format, :options, :keep_alive, :think)) end rescue StandardError => e { result: nil, usage: {}, status: 500, error: e. } end |
.handle_request(model:, request_type: 'chat', reply_to: nil, correlation_id: nil, message_context: {}, **payload) ⇒ Object
Primary entry point called by the subscription actor.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/legion/extensions/ollama/runners/fleet.rb', line 21 def handle_request(model:, request_type: 'chat', reply_to: nil, correlation_id: nil, message_context: {}, **payload) received_at = Time.now.utc if payload[:stream] publish_error( reply_to: reply_to, correlation_id: correlation_id, message_context: , model: model, request_type: request_type, error: { code: 'unsupported_streaming', message: 'Streaming over the fleet AMQP bus is not supported in v1', retriable: false, category: 'validation', provider: 'ollama' } ) return { result: nil, status: 422, error: 'unsupported_streaming' } end result = dispatch(model: model, request_type: request_type, **payload) returned_at = Time.now.utc if reply_to publish_reply( reply_to: reply_to, correlation_id: correlation_id, message_context: , model: model, request_type: request_type, result: result, received_at: received_at, returned_at: returned_at ) end result end |
.publish_error(reply_to:, correlation_id:, message_context:, model:, request_type:, error:) ⇒ Object
Publish a fleet error to the caller’s reply_to queue. Errors are swallowed so the AMQP ack path is never blocked.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/legion/extensions/ollama/runners/fleet.rb', line 121 def publish_error(reply_to:, correlation_id:, message_context:, model:, request_type:, error:) return unless reply_to return unless defined?(Legion::Transport) Legion::LLM::Fleet::Error.new( reply_to: reply_to, fleet_correlation_id: correlation_id, message_context: , provider: 'ollama', model: model, request_type: request_type, app_id: 'lex-ollama', error: error, worker_node: node_identity ).publish rescue StandardError nil end |
.publish_reply(reply_to:, correlation_id:, message_context:, model:, request_type:, result:, received_at:, returned_at:) ⇒ Object
Publish a successful fleet response to the caller’s reply_to queue. Errors are swallowed so the AMQP ack path is never blocked by a broken reply.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/legion/extensions/ollama/runners/fleet.rb', line 87 def publish_reply(reply_to:, correlation_id:, message_context:, model:, request_type:, result:, received_at:, returned_at:) return unless defined?(Legion::Transport) body = result[:result] || {} usage = result[:usage] || {} status = result[:status] || 200 latency_ms = ((returned_at - received_at) * 1000).round Transport::Messages::LlmResponse.new( reply_to: reply_to, fleet_correlation_id: correlation_id, message_context: , provider: 'ollama', model: model, request_type: request_type, app_id: 'lex-ollama', **build_response_body( request_type: request_type, body: body, usage: usage, status: status, model: model, latency_ms: latency_ms, received_at: received_at, returned_at: returned_at ) ).publish rescue StandardError nil end |