Class: Legion::Extensions::Ollama::Actor::ModelWorker

Inherits:
Actors::Subscription
  • Object
show all
Defined in:
lib/legion/extensions/ollama/actors/model_worker.rb

Overview

Subscription actor that listens on a model-scoped queue and forwards inbound LLM request messages to Runners::Fleet#handle_request.

One instance is created per (request_type, model) entry in settings:

legion:
  ollama:
    fleet:
      consumer_priority: 10
    subscriptions:
      - type: embed
        model: nomic-embed-text
      - type: chat
        model: "qwen3.5:27b"

The queue name and routing key both follow shared fleet lane schemas:

llm.fleet.embed.<model>
llm.fleet.inference.<model>.ctx<context_window>

when an inference context window is known.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(request_type:, model:, context_window: nil) ⇒ ModelWorker

Returns a new instance of ModelWorker.



29
30
31
32
33
34
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 29

def initialize(request_type:, model:, context_window: nil, **)
  @request_type    = request_type.to_s
  @model_name      = model.to_s
  @context_window  = context_window&.to_i
  super(**)
end

Instance Attribute Details

#context_windowObject (readonly)

Returns the value of attribute context_window.



27
28
29
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 27

def context_window
  @context_window
end

#model_nameObject (readonly)

Returns the value of attribute model_name.



27
28
29
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 27

def model_name
  @model_name
end

#request_typeObject (readonly)

Returns the value of attribute request_type.



27
28
29
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 27

def request_type
  @request_type
end

Class Method Details

.fallback_queue_options(settings) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 114

def self.fallback_queue_options(settings)
  {
    durable:     true,
    auto_delete: false,
    arguments:   {
      'x-queue-type'           => 'quorum',
      'x-queue-leader-locator' => 'balanced',
      'x-expires'              => settings.fetch(:queue_expires_ms),
      'x-message-ttl'          => settings.fetch(:message_ttl_ms),
      'x-overflow'             => 'reject-publish',
      'x-max-length'           => settings.fetch(:queue_max_length),
      'x-delivery-limit'       => settings.fetch(:delivery_limit),
      'x-consumer-timeout'     => settings.fetch(:consumer_ack_timeout_ms)
    }
  }
end

.queue_class_for(request_type:, model:, context_window: nil, queue_config: {}) ⇒ Object



106
107
108
109
110
111
112
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 106

def self.queue_class_for(request_type:, model:, context_window: nil, queue_config: {})
  worker = allocate
  worker.instance_variable_set(:@request_type, request_type.to_s)
  worker.instance_variable_set(:@model_name, model.to_s)
  worker.instance_variable_set(:@context_window, context_window&.to_i)
  worker.send(:build_queue_class, queue_config)
end

Instance Method Details

#consumer_ack_timeout_msObject



83
84
85
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 83

def consumer_ack_timeout_ms
  setting_value(fleet_settings, :consumer_ack_timeout_ms) || 300_000
end

#consumer_priorityObject

Consumer priority from settings. Tells RabbitMQ to prefer this consumer over lower-priority ones on the same queue when multiple consumers are idle. Standard scale: GPU server = 10, Mac Studio = 5, developer laptop = 1. Defaults to 0 (equal priority) if not configured.



63
64
65
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 63

def consumer_priority
  setting_value(fleet_settings, :consumer_priority) || 0
end

#delivery_limitObject



79
80
81
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 79

def delivery_limit
  setting_value(fleet_settings, :delivery_limit) || 3
end

#message_ttl_msObject



71
72
73
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 71

def message_ttl_ms
  setting_value(fleet_settings, :message_ttl_ms) || 120_000
end

#prefetchObject

prefetch(1) is required for consumer priority to work correctly: without it, a high-priority consumer can hold multiple messages while lower-priority consumers sit idle. With prefetch=1, each consumer completes one message before RabbitMQ delivers the next, and priority determines which idle consumer gets it.



55
56
57
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 55

def prefetch
  1
end

#process_message(payload, metadata, delivery_info) ⇒ Object

Enrich every inbound message with the worker’s own request_type and model so Runners::Fleet#handle_request always has them, even if the sender omitted them. Also defaults message_context to {} if absent.



140
141
142
143
144
145
146
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 140

def process_message(payload, , delivery_info)
  msg = super
  msg[:request_type]    ||= @request_type
  msg[:model]           ||= @model_name
  msg[:message_context] ||= {}
  msg
end

#queueObject

Returns a queue CLASS (not instance) bound to the llm.fleet exchange with the routing key for this worker’s model offering lane. The Subscription base class calls queue.new in initialize, so this must return a class, not an instance.



102
103
104
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 102

def queue
  @queue ||= build_queue_class
end

#queue_expires_msObject



67
68
69
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 67

def queue_expires_ms
  setting_value(fleet_settings, :queue_expires_ms) || 60_000
end

#queue_max_lengthObject



75
76
77
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 75

def queue_max_length
  setting_value(fleet_settings, :queue_max_length) || 100
end

#routing_keyObject



131
132
133
134
135
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 131

def routing_key
  parts = ['llm.fleet', lane_kind, sanitized_model]
  parts << "ctx#{@context_window}" if lane_kind == 'inference' && @context_window
  parts.join('.')
end

#runner_classObject



36
37
38
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 36

def runner_class
  Legion::Extensions::Ollama::Runners::Fleet
end

#runner_functionObject



40
41
42
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 40

def runner_function
  'handle_request'
end

#subscribe_optionsObject

Subscribe options include x-priority argument so RabbitMQ can honour consumer priority when dispatching to competing consumers.



89
90
91
92
93
94
95
96
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 89

def subscribe_options
  base = begin
    super
  rescue NoMethodError
    {}
  end
  base.merge(arguments: { 'x-priority' => consumer_priority })
end

#use_runner?Boolean

Bypass Legion::Runner — call the runner module directly so we don’t need a task record in the database for every LLM inference hop.

Returns:

  • (Boolean)


46
47
48
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 46

def use_runner?
  false
end