Class: Legion::Extensions::Ollama::Actor::ModelWorker
- Inherits:
-
Actors::Subscription
- Object
- Actors::Subscription
- Legion::Extensions::Ollama::Actor::ModelWorker
- Defined in:
- lib/legion/extensions/ollama/actors/model_worker.rb
Overview
Subscription actor that listens on a model-scoped queue and forwards inbound LLM request messages to Runners::Fleet#handle_request.
One instance is created per (request_type, model) entry in settings:
legion:
ollama:
fleet:
consumer_priority: 10
subscriptions:
- type: embed
model: nomic-embed-text
- type: chat
model: "qwen3.5:27b"
The queue name and routing key both follow shared fleet lane schemas:
llm.fleet.embed.<model>
llm.fleet.inference.<model>.ctx<context_window>
when an inference context window is known.
Instance Attribute Summary collapse
-
#context_window ⇒ Object
readonly
Returns the value of attribute context_window.
-
#model_name ⇒ Object
readonly
Returns the value of attribute model_name.
-
#request_type ⇒ Object
readonly
Returns the value of attribute request_type.
Class Method Summary collapse
- .fallback_queue_options(settings) ⇒ Object
- .queue_class_for(request_type:, model:, context_window: nil, queue_config: {}) ⇒ Object
Instance Method Summary collapse
- #consumer_ack_timeout_ms ⇒ Object
-
#consumer_priority ⇒ Object
Consumer priority from settings.
- #delivery_limit ⇒ Object
-
#initialize(request_type:, model:, context_window: nil) ⇒ ModelWorker
constructor
A new instance of ModelWorker.
- #message_ttl_ms ⇒ Object
-
#prefetch ⇒ Object
prefetch(1) is required for consumer priority to work correctly: without it, a high-priority consumer can hold multiple messages while lower-priority consumers sit idle.
-
#process_message(payload, metadata, delivery_info) ⇒ Object
Enrich every inbound message with the worker’s own request_type and model so Runners::Fleet#handle_request always has them, even if the sender omitted them.
-
#queue ⇒ Object
Returns a queue CLASS (not instance) bound to the llm.fleet exchange with the routing key for this worker’s model offering lane.
- #queue_expires_ms ⇒ Object
- #queue_max_length ⇒ Object
- #routing_key ⇒ Object
- #runner_class ⇒ Object
- #runner_function ⇒ Object
-
#subscribe_options ⇒ Object
Subscribe options include x-priority argument so RabbitMQ can honour consumer priority when dispatching to competing consumers.
-
#use_runner? ⇒ Boolean
Bypass Legion::Runner — call the runner module directly so we don’t need a task record in the database for every LLM inference hop.
Constructor Details
#initialize(request_type:, model:, context_window: nil) ⇒ ModelWorker
Returns a new instance of ModelWorker.
29 30 31 32 33 34 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 29 def initialize(request_type:, model:, context_window: nil, **) @request_type = request_type.to_s @model_name = model.to_s @context_window = context_window&.to_i super(**) end |
Instance Attribute Details
#context_window ⇒ Object (readonly)
Returns the value of attribute context_window.
27 28 29 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 27 def context_window @context_window end |
#model_name ⇒ Object (readonly)
Returns the value of attribute model_name.
27 28 29 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 27 def model_name @model_name end |
#request_type ⇒ Object (readonly)
Returns the value of attribute request_type.
27 28 29 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 27 def request_type @request_type end |
Class Method Details
.fallback_queue_options(settings) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 114 def self.(settings) { durable: true, auto_delete: false, arguments: { 'x-queue-type' => 'quorum', 'x-queue-leader-locator' => 'balanced', 'x-expires' => settings.fetch(:queue_expires_ms), 'x-message-ttl' => settings.fetch(:message_ttl_ms), 'x-overflow' => 'reject-publish', 'x-max-length' => settings.fetch(:queue_max_length), 'x-delivery-limit' => settings.fetch(:delivery_limit), 'x-consumer-timeout' => settings.fetch(:consumer_ack_timeout_ms) } } end |
.queue_class_for(request_type:, model:, context_window: nil, queue_config: {}) ⇒ Object
106 107 108 109 110 111 112 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 106 def self.queue_class_for(request_type:, model:, context_window: nil, queue_config: {}) worker = allocate worker.instance_variable_set(:@request_type, request_type.to_s) worker.instance_variable_set(:@model_name, model.to_s) worker.instance_variable_set(:@context_window, context_window&.to_i) worker.send(:build_queue_class, queue_config) end |
Instance Method Details
#consumer_ack_timeout_ms ⇒ Object
83 84 85 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 83 def consumer_ack_timeout_ms setting_value(fleet_settings, :consumer_ack_timeout_ms) || 300_000 end |
#consumer_priority ⇒ Object
Consumer priority from settings. Tells RabbitMQ to prefer this consumer over lower-priority ones on the same queue when multiple consumers are idle. Standard scale: GPU server = 10, Mac Studio = 5, developer laptop = 1. Defaults to 0 (equal priority) if not configured.
63 64 65 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 63 def consumer_priority setting_value(fleet_settings, :consumer_priority) || 0 end |
#delivery_limit ⇒ Object
79 80 81 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 79 def delivery_limit setting_value(fleet_settings, :delivery_limit) || 3 end |
#message_ttl_ms ⇒ Object
71 72 73 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 71 def setting_value(fleet_settings, :message_ttl_ms) || 120_000 end |
#prefetch ⇒ Object
prefetch(1) is required for consumer priority to work correctly: without it, a high-priority consumer can hold multiple messages while lower-priority consumers sit idle. With prefetch=1, each consumer completes one message before RabbitMQ delivers the next, and priority determines which idle consumer gets it.
55 56 57 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 55 def prefetch 1 end |
#process_message(payload, metadata, delivery_info) ⇒ Object
Enrich every inbound message with the worker’s own request_type and model so Runners::Fleet#handle_request always has them, even if the sender omitted them. Also defaults message_context to {} if absent.
140 141 142 143 144 145 146 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 140 def (payload, , delivery_info) msg = super msg[:request_type] ||= @request_type msg[:model] ||= @model_name msg[:message_context] ||= {} msg end |
#queue ⇒ Object
Returns a queue CLASS (not instance) bound to the llm.fleet exchange with the routing key for this worker’s model offering lane. The Subscription base class calls queue.new in initialize, so this must return a class, not an instance.
102 103 104 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 102 def queue @queue ||= build_queue_class end |
#queue_expires_ms ⇒ Object
67 68 69 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 67 def queue_expires_ms setting_value(fleet_settings, :queue_expires_ms) || 60_000 end |
#queue_max_length ⇒ Object
75 76 77 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 75 def queue_max_length setting_value(fleet_settings, :queue_max_length) || 100 end |
#routing_key ⇒ Object
131 132 133 134 135 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 131 def routing_key parts = ['llm.fleet', lane_kind, sanitized_model] parts << "ctx#{@context_window}" if lane_kind == 'inference' && @context_window parts.join('.') end |
#runner_class ⇒ Object
36 37 38 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 36 def runner_class Legion::Extensions::Ollama::Runners::Fleet end |
#runner_function ⇒ Object
40 41 42 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 40 def runner_function 'handle_request' end |
#subscribe_options ⇒ Object
Subscribe options include x-priority argument so RabbitMQ can honour consumer priority when dispatching to competing consumers.
89 90 91 92 93 94 95 96 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 89 def base = begin super rescue NoMethodError {} end base.merge(arguments: { 'x-priority' => consumer_priority }) end |
#use_runner? ⇒ Boolean
Bypass Legion::Runner — call the runner module directly so we don’t need a task record in the database for every LLM inference hop.
46 47 48 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 46 def use_runner? false end |