Class: Legion::Extensions::Ollama::Actor::ModelWorker

Inherits:
Actors::Subscription
  • Object
show all
Defined in:
lib/legion/extensions/ollama/actors/model_worker.rb

Overview

Fleet actor that listens on a model-scoped queue and forwards inbound LLM request messages to Runners::Fleet#handle_request. Endpoint workers default to explicit basic_get polling so a local one-model-at-a-time device does not reserve messages from every lane. Set legion.ollama.fleet.scheduler to :subscription for GPU/datacenter workers that should use RabbitMQ consumer priority and prefetch.

One instance is created per (request_type, model) entry in settings:

legion:
  ollama:
    fleet:
      consumer_priority: 10
    subscriptions:
      - type: embed
        model: nomic-embed-text
      - type: chat
        model: "qwen3.5:27b"

Queue names and routing keys follow the shared fleet lane schema:

llm.fleet.embed.<model-slug>
llm.fleet.inference.<model-slug>.ctx<context-window>

or, when explicitly enabled, exact offering lanes:

llm.fleet.offering.<instance>.<model-slug>.<operation>

Constant Summary collapse

POLLING_SCHEDULERS =
%i[basic_get poll polling].freeze
SUBSCRIPTION_SCHEDULERS =
%i[subscribe subscription basic_consume consumer].freeze
POLL_LOCK =
Mutex.new
REGISTRY_HEARTBEAT_INTERVAL =
30.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(request_type:, model:, context_window: nil, lane_style: :shared, offering_instance_id: nil) ⇒ ModelWorker

Returns a new instance of ModelWorker.



39
40
41
42
43
44
45
46
47
48
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 39

def initialize(request_type:, model:, context_window: nil, lane_style: :shared,
               offering_instance_id: nil, **)
  @request_type = request_type.to_s
  @model_name = model.to_s
  @context_window = normalize_context_window(context_window)
  @lane_style = lane_style.to_s
  @offering_instance_id = offering_instance_id&.to_s
  @polling = false
  super(**)
end

Instance Attribute Details

#context_windowObject (readonly)

Returns the value of attribute context_window.



37
38
39
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 37

def context_window
  @context_window
end

#model_nameObject (readonly)

Returns the value of attribute model_name.



37
38
39
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 37

def model_name
  @model_name
end

#offering_instance_idObject (readonly)

Returns the value of attribute offering_instance_id.



37
38
39
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 37

def offering_instance_id
  @offering_instance_id
end

#request_typeObject (readonly)

Returns the value of attribute request_type.



37
38
39
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 37

def request_type
  @request_type
end

Class Method Details

.fallback_queue_options(settings) ⇒ Object



203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 203

def self.fallback_queue_options(settings)
  {
    durable:     true,
    auto_delete: false,
    arguments:   {
      'x-queue-type'           => 'quorum',
      'x-queue-leader-locator' => 'balanced',
      'x-expires'              => settings.fetch(:queue_expires_ms),
      'x-message-ttl'          => settings.fetch(:message_ttl_ms),
      'x-overflow'             => 'reject-publish',
      'x-max-length'           => settings.fetch(:queue_max_length),
      'x-delivery-limit'       => settings.fetch(:delivery_limit),
      'x-consumer-timeout'     => settings.fetch(:consumer_ack_timeout_ms)
    }
  }
end

.queue_class_for(request_type:, model:, context_window: nil, queue_config: {}, lane_style: :shared, offering_instance_id: nil) ⇒ Object



192
193
194
195
196
197
198
199
200
201
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 192

def self.queue_class_for(request_type:, model:, context_window: nil, queue_config: {},
                         lane_style: :shared, offering_instance_id: nil)
  worker = allocate
  worker.instance_variable_set(:@request_type, request_type.to_s)
  worker.instance_variable_set(:@model_name, model.to_s)
  worker.instance_variable_set(:@context_window, context_window&.to_i)
  worker.instance_variable_set(:@lane_style, lane_style.to_s)
  worker.instance_variable_set(:@offering_instance_id, offering_instance_id&.to_s)
  worker.send(:build_queue_class, queue_config)
end

Instance Method Details

#activateObject



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 122

def activate
  result = if endpoint_polling?
             @polling = true
             @poll_task = async.run_basic_get_loop
             log.info "[ModelWorker] activated polling lane #{lane_key}" if defined?(log)
             @poll_task
           else
             super
           end
  publish_registry_event_async(:available)
  start_registry_heartbeat
  result
rescue StandardError => e
  publish_registry_event_async(:degraded, error: e)
  handle_exception(e, level: :fatal)
end

#cancelObject



139
140
141
142
143
144
145
146
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 139

def cancel
  @polling = false
  stop_registry_heartbeat
  publish_registry_event_async(:unavailable)
  return true unless instance_variable_defined?(:@consumer) && @consumer

  super
end

#consumer_ack_timeout_msObject



97
98
99
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 97

def consumer_ack_timeout_ms
  setting_value(fleet_settings, :consumer_ack_timeout_ms) || 300_000
end

#consumer_priorityObject

Consumer priority from settings. Tells RabbitMQ to prefer this consumer over lower-priority ones on the same queue when multiple consumers are idle. Standard scale: GPU server = 10, Mac Studio = 5, developer laptop = 1. Defaults to 0 (equal priority) if not configured.



77
78
79
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 77

def consumer_priority
  setting_value(fleet_settings, :consumer_priority) || 0
end

#delivery_limitObject



93
94
95
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 93

def delivery_limit
  setting_value(fleet_settings, :delivery_limit) || 3
end

#endpoint_polling?Boolean

Returns:

  • (Boolean)


148
149
150
151
152
153
154
155
156
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 148

def endpoint_polling?
  scheduler = fleet_scheduler
  return true if POLLING_SCHEDULERS.include?(scheduler)
  return false if SUBSCRIPTION_SCHEDULERS.include?(scheduler)

  nested_setting(settings, :fleet, :endpoint, :enabled) == true
rescue StandardError
  false
end

#lane_keyObject Also known as: routing_key



158
159
160
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 158

def lane_key
  @lane_key ||= offering_lane? ? offering_lane_key : shared_lane_key
end

#message_ttl_msObject



85
86
87
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 85

def message_ttl_ms
  setting_value(fleet_settings, :message_ttl_ms) || 120_000
end

#prefetchObject

prefetch(1) is required for consumer priority to work correctly: without it, a high-priority consumer can hold multiple messages while lower-priority consumers sit idle. With prefetch=1, each consumer completes one message before RabbitMQ delivers the next, and priority determines which idle consumer gets it.



69
70
71
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 69

def prefetch
  1
end

#prepareObject



112
113
114
115
116
117
118
119
120
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 112

def prepare
  return super unless endpoint_polling?

  @queue = queue.new
  @polling = true
  log.info "[ModelWorker] prepared polling lane #{lane_key}" if defined?(log)
rescue StandardError => e
  handle_exception(e, level: :fatal)
end

#process_message(payload, metadata, delivery_info) ⇒ Object

Enrich every inbound message with the worker’s own request_type and model so Runners::Fleet#handle_request always has them, even if the sender omitted them. Also defaults message_context to {} if absent.



223
224
225
226
227
228
229
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 223

def process_message(payload, , delivery_info)
  msg = super
  msg[:request_type] ||= @request_type
  msg[:model] ||= @model_name
  msg[:message_context] ||= {}
  msg
end

#pull_one_messageObject



172
173
174
175
176
177
178
179
180
181
182
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 172

def pull_one_message
  delivery_info, , payload = @queue.pop(manual_ack: manual_ack)
  return false unless delivery_info

  handle_delivery(delivery_info, , payload)
  true
rescue StandardError => e
  handle_exception(e)
  reject_or_retry(delivery_info, , payload) if manual_ack && delivery_info
  true
end

#queueObject

Returns a queue CLASS (not instance) bound to the llm.fleet exchange with the routing key for this worker’s model lane. The Subscription base class calls queue.new in initialize, so this must return a class, not an instance.



188
189
190
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 188

def queue
  @queue ||= build_queue_class
end

#queue_expires_msObject



81
82
83
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 81

def queue_expires_ms
  setting_value(fleet_settings, :queue_expires_ms) || 60_000
end

#queue_max_lengthObject



89
90
91
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 89

def queue_max_length
  setting_value(fleet_settings, :queue_max_length) || 100
end

#run_basic_get_loopObject



163
164
165
166
167
168
169
170
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 163

def run_basic_get_loop
  consecutive_pulls = 0
  while @polling && !shutting_down?
    pulled = POLL_LOCK.synchronize { pull_one_message }
    consecutive_pulls = pulled ? consecutive_pulls + 1 : 0
    sleep(pulled ? post_pull_backoff(consecutive_pulls) : empty_lane_backoff)
  end
end

#runner_classObject



50
51
52
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 50

def runner_class
  Legion::Extensions::Ollama::Runners::Fleet
end

#runner_functionObject



54
55
56
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 54

def runner_function
  'handle_request'
end

#subscribe_optionsObject

Subscribe options include x-priority argument so RabbitMQ can honour consumer priority when dispatching to competing consumers.



103
104
105
106
107
108
109
110
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 103

def subscribe_options
  base = begin
    super
  rescue NoMethodError
    {}
  end
  base.merge(arguments: { 'x-priority' => consumer_priority })
end

#use_runner?Boolean

Bypass Legion::Runner — call the runner module directly so we don’t need a task record in the database for every LLM inference hop.

Returns:

  • (Boolean)


60
61
62
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 60

def use_runner?
  false
end