Class: Legion::Extensions::Ollama::Actor::ModelWorker
- Inherits:
-
Actors::Subscription
- Object
- Actors::Subscription
- Legion::Extensions::Ollama::Actor::ModelWorker
- Defined in:
- lib/legion/extensions/ollama/actors/model_worker.rb
Overview
Fleet actor that listens on a model-scoped queue and forwards inbound LLM request messages to Runners::Fleet#handle_request. Endpoint workers default to explicit basic_get polling so a local one-model-at-a-time device does not reserve messages from every lane. Set legion.ollama.fleet.scheduler to :subscription for GPU/datacenter workers that should use RabbitMQ consumer priority and prefetch.
One instance is created per (request_type, model) entry in settings:
legion:
ollama:
fleet:
consumer_priority: 10
subscriptions:
- type: embed
model: nomic-embed-text
- type: chat
model: "qwen3.5:27b"
Queue names and routing keys follow the shared fleet lane schema:
llm.fleet.embed.<model-slug>
llm.fleet.inference.<model-slug>.ctx<context-window>
or, when explicitly enabled, exact offering lanes:
llm.fleet.offering.<instance>.<model-slug>.<operation>
Constant Summary collapse
- POLLING_SCHEDULERS =
%i[basic_get poll polling].freeze
- SUBSCRIPTION_SCHEDULERS =
%i[subscribe subscription basic_consume consumer].freeze
- POLL_LOCK =
Mutex.new
- REGISTRY_HEARTBEAT_INTERVAL =
30.0
Instance Attribute Summary collapse
-
#context_window ⇒ Object
readonly
Returns the value of attribute context_window.
-
#model_name ⇒ Object
readonly
Returns the value of attribute model_name.
-
#offering_instance_id ⇒ Object
readonly
Returns the value of attribute offering_instance_id.
-
#request_type ⇒ Object
readonly
Returns the value of attribute request_type.
Class Method Summary collapse
- .fallback_queue_options(settings) ⇒ Object
- .queue_class_for(request_type:, model:, context_window: nil, queue_config: {}, lane_style: :shared, offering_instance_id: nil) ⇒ Object
Instance Method Summary collapse
- #activate ⇒ Object
- #cancel ⇒ Object
- #consumer_ack_timeout_ms ⇒ Object
-
#consumer_priority ⇒ Object
Consumer priority from settings.
- #delivery_limit ⇒ Object
- #endpoint_polling? ⇒ Boolean
-
#initialize(request_type:, model:, context_window: nil, lane_style: :shared, offering_instance_id: nil) ⇒ ModelWorker
constructor
A new instance of ModelWorker.
- #lane_key ⇒ Object (also: #routing_key)
- #message_ttl_ms ⇒ Object
-
#prefetch ⇒ Object
prefetch(1) is required for consumer priority to work correctly: without it, a high-priority consumer can hold multiple messages while lower-priority consumers sit idle.
- #prepare ⇒ Object
-
#process_message(payload, metadata, delivery_info) ⇒ Object
Enrich every inbound message with the worker’s own request_type and model so Runners::Fleet#handle_request always has them, even if the sender omitted them.
- #pull_one_message ⇒ Object
-
#queue ⇒ Object
Returns a queue CLASS (not instance) bound to the llm.fleet exchange with the routing key for this worker’s model lane.
- #queue_expires_ms ⇒ Object
- #queue_max_length ⇒ Object
- #run_basic_get_loop ⇒ Object
- #runner_class ⇒ Object
- #runner_function ⇒ Object
-
#subscribe_options ⇒ Object
Subscribe options include x-priority argument so RabbitMQ can honour consumer priority when dispatching to competing consumers.
-
#use_runner? ⇒ Boolean
Bypass Legion::Runner — call the runner module directly so we don’t need a task record in the database for every LLM inference hop.
Constructor Details
#initialize(request_type:, model:, context_window: nil, lane_style: :shared, offering_instance_id: nil) ⇒ ModelWorker
Returns a new instance of ModelWorker.
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 39 def initialize(request_type:, model:, context_window: nil, lane_style: :shared, offering_instance_id: nil, **) @request_type = request_type.to_s @model_name = model.to_s @context_window = normalize_context_window(context_window) @lane_style = lane_style.to_s @offering_instance_id = offering_instance_id&.to_s @polling = false super(**) end |
Instance Attribute Details
#context_window ⇒ Object (readonly)
Returns the value of attribute context_window.
37 38 39 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 37 def context_window @context_window end |
#model_name ⇒ Object (readonly)
Returns the value of attribute model_name.
37 38 39 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 37 def model_name @model_name end |
#offering_instance_id ⇒ Object (readonly)
Returns the value of attribute offering_instance_id.
37 38 39 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 37 def offering_instance_id @offering_instance_id end |
#request_type ⇒ Object (readonly)
Returns the value of attribute request_type.
37 38 39 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 37 def request_type @request_type end |
Class Method Details
.fallback_queue_options(settings) ⇒ Object
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 203 def self.(settings) { durable: true, auto_delete: false, arguments: { 'x-queue-type' => 'quorum', 'x-queue-leader-locator' => 'balanced', 'x-expires' => settings.fetch(:queue_expires_ms), 'x-message-ttl' => settings.fetch(:message_ttl_ms), 'x-overflow' => 'reject-publish', 'x-max-length' => settings.fetch(:queue_max_length), 'x-delivery-limit' => settings.fetch(:delivery_limit), 'x-consumer-timeout' => settings.fetch(:consumer_ack_timeout_ms) } } end |
.queue_class_for(request_type:, model:, context_window: nil, queue_config: {}, lane_style: :shared, offering_instance_id: nil) ⇒ Object
192 193 194 195 196 197 198 199 200 201 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 192 def self.queue_class_for(request_type:, model:, context_window: nil, queue_config: {}, lane_style: :shared, offering_instance_id: nil) worker = allocate worker.instance_variable_set(:@request_type, request_type.to_s) worker.instance_variable_set(:@model_name, model.to_s) worker.instance_variable_set(:@context_window, context_window&.to_i) worker.instance_variable_set(:@lane_style, lane_style.to_s) worker.instance_variable_set(:@offering_instance_id, offering_instance_id&.to_s) worker.send(:build_queue_class, queue_config) end |
Instance Method Details
#activate ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 122 def activate result = if endpoint_polling? @polling = true @poll_task = async.run_basic_get_loop log.info "[ModelWorker] activated polling lane #{lane_key}" if defined?(log) @poll_task else super end publish_registry_event_async(:available) start_registry_heartbeat result rescue StandardError => e publish_registry_event_async(:degraded, error: e) handle_exception(e, level: :fatal) end |
#cancel ⇒ Object
139 140 141 142 143 144 145 146 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 139 def cancel @polling = false stop_registry_heartbeat publish_registry_event_async(:unavailable) return true unless instance_variable_defined?(:@consumer) && @consumer super end |
#consumer_ack_timeout_ms ⇒ Object
97 98 99 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 97 def consumer_ack_timeout_ms setting_value(fleet_settings, :consumer_ack_timeout_ms) || 300_000 end |
#consumer_priority ⇒ Object
Consumer priority from settings. Tells RabbitMQ to prefer this consumer over lower-priority ones on the same queue when multiple consumers are idle. Standard scale: GPU server = 10, Mac Studio = 5, developer laptop = 1. Defaults to 0 (equal priority) if not configured.
77 78 79 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 77 def consumer_priority setting_value(fleet_settings, :consumer_priority) || 0 end |
#delivery_limit ⇒ Object
93 94 95 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 93 def delivery_limit setting_value(fleet_settings, :delivery_limit) || 3 end |
#endpoint_polling? ⇒ Boolean
148 149 150 151 152 153 154 155 156 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 148 def endpoint_polling? scheduler = fleet_scheduler return true if POLLING_SCHEDULERS.include?(scheduler) return false if SUBSCRIPTION_SCHEDULERS.include?(scheduler) nested_setting(settings, :fleet, :endpoint, :enabled) == true rescue StandardError false end |
#lane_key ⇒ Object Also known as: routing_key
158 159 160 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 158 def lane_key @lane_key ||= offering_lane? ? offering_lane_key : shared_lane_key end |
#message_ttl_ms ⇒ Object
85 86 87 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 85 def setting_value(fleet_settings, :message_ttl_ms) || 120_000 end |
#prefetch ⇒ Object
prefetch(1) is required for consumer priority to work correctly: without it, a high-priority consumer can hold multiple messages while lower-priority consumers sit idle. With prefetch=1, each consumer completes one message before RabbitMQ delivers the next, and priority determines which idle consumer gets it.
69 70 71 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 69 def prefetch 1 end |
#prepare ⇒ Object
112 113 114 115 116 117 118 119 120 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 112 def prepare return super unless endpoint_polling? @queue = queue.new @polling = true log.info "[ModelWorker] prepared polling lane #{lane_key}" if defined?(log) rescue StandardError => e handle_exception(e, level: :fatal) end |
#process_message(payload, metadata, delivery_info) ⇒ Object
Enrich every inbound message with the worker’s own request_type and model so Runners::Fleet#handle_request always has them, even if the sender omitted them. Also defaults message_context to {} if absent.
223 224 225 226 227 228 229 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 223 def (payload, , delivery_info) msg = super msg[:request_type] ||= @request_type msg[:model] ||= @model_name msg[:message_context] ||= {} msg end |
#pull_one_message ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 172 def delivery_info, , payload = @queue.pop(manual_ack: manual_ack) return false unless delivery_info handle_delivery(delivery_info, , payload) true rescue StandardError => e handle_exception(e) reject_or_retry(delivery_info, , payload) if manual_ack && delivery_info true end |
#queue ⇒ Object
Returns a queue CLASS (not instance) bound to the llm.fleet exchange with the routing key for this worker’s model lane. The Subscription base class calls queue.new in initialize, so this must return a class, not an instance.
188 189 190 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 188 def queue @queue ||= build_queue_class end |
#queue_expires_ms ⇒ Object
81 82 83 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 81 def queue_expires_ms setting_value(fleet_settings, :queue_expires_ms) || 60_000 end |
#queue_max_length ⇒ Object
89 90 91 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 89 def queue_max_length setting_value(fleet_settings, :queue_max_length) || 100 end |
#run_basic_get_loop ⇒ Object
163 164 165 166 167 168 169 170 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 163 def run_basic_get_loop consecutive_pulls = 0 while @polling && !shutting_down? pulled = POLL_LOCK.synchronize { } consecutive_pulls = pulled ? consecutive_pulls + 1 : 0 sleep(pulled ? post_pull_backoff(consecutive_pulls) : empty_lane_backoff) end end |
#runner_class ⇒ Object
50 51 52 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 50 def runner_class Legion::Extensions::Ollama::Runners::Fleet end |
#runner_function ⇒ Object
54 55 56 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 54 def runner_function 'handle_request' end |
#subscribe_options ⇒ Object
Subscribe options include x-priority argument so RabbitMQ can honour consumer priority when dispatching to competing consumers.
103 104 105 106 107 108 109 110 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 103 def base = begin super rescue NoMethodError {} end base.merge(arguments: { 'x-priority' => consumer_priority }) end |
#use_runner? ⇒ Boolean
Bypass Legion::Runner — call the runner module directly so we don’t need a task record in the database for every LLM inference hop.
60 61 62 |
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 60 def use_runner? false end |