Class: Legion::Extensions::Ollama::Actor::ModelWorker
- Inherits:
-
Actors::Subscription
- Object
- Actors::Subscription
- Legion::Extensions::Ollama::Actor::ModelWorker
- Defined in:
- lib/legion/extensions/ollama/actors/model_worker.rb
Overview
Subscription actor that listens on a model-scoped queue and forwards inbound LLM request messages to Runners::Fleet#handle_request.
One instance is created per (request_type, model) entry in settings:
legion:
ollama:
fleet:
consumer_priority: 10
subscriptions:
- type: embed
model: nomic-embed-text
- type: chat
model: "qwen3.5:27b"
The queue name and routing key both follow the schema:
llm.request.ollama.<type>.<model>
where model colons are converted to dots (AMQP topic word separator).
Instance Attribute Summary collapse
-
#model_name ⇒ Object
readonly
Returns the value of attribute model_name.
-
#request_type ⇒ Object
readonly
Returns the value of attribute request_type.
Instance Method Summary collapse
-
#consumer_priority ⇒ Object
Consumer priority from settings.
-
#initialize(request_type:, model:) ⇒ ModelWorker
constructor
A new instance of ModelWorker.
-
#prefetch ⇒ Object
prefetch(1) is required for consumer priority to work correctly: without it, a high-priority consumer can hold multiple messages while lower-priority consumers sit idle.
-
#process_message(payload, metadata, delivery_info) ⇒ Object
Enrich every inbound message with the worker’s own request_type and model so Runners::Fleet#handle_request always has them, even if the sender omitted them.
-
#queue ⇒ Object
Returns a queue CLASS (not instance) bound to the llm.request exchange with the routing key for this worker’s (type, model) pair.
- #runner_class ⇒ Object
- #runner_function ⇒ Object
-
#subscribe_options ⇒ Object
Subscribe options include x-priority argument so RabbitMQ can honour consumer priority when dispatching to competing consumers.
-
#use_runner? ⇒ Boolean
Bypass Legion::Runner — call the runner module directly so we don’t need a task record in the database for every LLM inference hop.
Constructor Details
#initialize(request_type:, model:) ⇒ ModelWorker
Returns a new instance of ModelWorker.
28 29 30 31 32 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 28 def initialize(request_type:, model:, **) @request_type = request_type.to_s @model_name = model.to_s super(**) end |
Instance Attribute Details
#model_name ⇒ Object (readonly)
Returns the value of attribute model_name.
26 27 28 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 26 def model_name @model_name end |
#request_type ⇒ Object (readonly)
Returns the value of attribute request_type.
26 27 28 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 26 def request_type @request_type end |
Instance Method Details
#consumer_priority ⇒ Object
Consumer priority from settings. Tells RabbitMQ to prefer this consumer over lower-priority ones on the same queue when multiple consumers are idle. Standard scale: GPU server = 10, Mac Studio = 5, developer laptop = 1. Defaults to 0 (equal priority) if not configured.
61 62 63 64 65 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 61 def consumer_priority return 0 unless defined?(Legion::Settings) Legion::Settings.dig(:ollama, :fleet, :consumer_priority) || 0 end |
#prefetch ⇒ Object
prefetch(1) is required for consumer priority to work correctly: without it, a high-priority consumer can hold multiple messages while lower-priority consumers sit idle. With prefetch=1, each consumer completes one message before RabbitMQ delivers the next, and priority determines which idle consumer gets it.
53 54 55 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 53 def prefetch 1 end |
#process_message(payload, metadata, delivery_info) ⇒ Object
Enrich every inbound message with the worker’s own request_type and model so Runners::Fleet#handle_request always has them, even if the sender omitted them. Also defaults message_context to {} if absent.
89 90 91 92 93 94 95 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 89 def (payload, , delivery_info) msg = super msg[:request_type] ||= @request_type msg[:model] ||= @model_name msg[:message_context] ||= {} msg end |
#queue ⇒ Object
Returns a queue CLASS (not instance) bound to the llm.request exchange with the routing key for this worker’s (type, model) pair. The Subscription base class calls queue.new in initialize, so this must return a class, not an instance.
82 83 84 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 82 def queue @queue ||= build_queue_class end |
#runner_class ⇒ Object
34 35 36 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 34 def runner_class Legion::Extensions::Ollama::Runners::Fleet end |
#runner_function ⇒ Object
38 39 40 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 38 def runner_function 'handle_request' end |
#subscribe_options ⇒ Object
Subscribe options include x-priority argument so RabbitMQ can honour consumer priority when dispatching to competing consumers.
69 70 71 72 73 74 75 76 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 69 def base = begin super rescue NoMethodError {} end base.merge(arguments: { 'x-priority' => consumer_priority }) end |
#use_runner? ⇒ Boolean
Bypass Legion::Runner — call the runner module directly so we don’t need a task record in the database for every LLM inference hop.
44 45 46 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 44 def use_runner? false end |