Class: Legion::Extensions::Ollama::Actor::ModelWorker

Inherits:
Actors::Subscription
  • Object
show all
Defined in:
lib/legion/extensions/ollama/actors/model_worker.rb

Overview

Subscription actor that listens on a model-scoped queue and forwards inbound LLM request messages to Runners::Fleet#handle_request.

One instance is created per (request_type, model) entry in settings:

legion:
  ollama:
    fleet:
      consumer_priority: 10
    subscriptions:
      - type: embed
        model: nomic-embed-text
      - type: chat
        model: "qwen3.5:27b"

The queue name and routing key both follow the schema:

llm.request.ollama.<type>.<model>

where model colons are converted to dots (AMQP topic word separator).

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(request_type:, model:) ⇒ ModelWorker

Returns a new instance of ModelWorker.



28
29
30
31
32
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 28

def initialize(request_type:, model:, **)
  @request_type = request_type.to_s
  @model_name   = model.to_s
  super(**)
end

Instance Attribute Details

#model_nameObject (readonly)

Returns the value of attribute model_name.



26
27
28
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 26

def model_name
  @model_name
end

#request_typeObject (readonly)

Returns the value of attribute request_type.



26
27
28
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 26

def request_type
  @request_type
end

Instance Method Details

#consumer_priorityObject

Consumer priority from settings. Tells RabbitMQ to prefer this consumer over lower-priority ones on the same queue when multiple consumers are idle. Standard scale: GPU server = 10, Mac Studio = 5, developer laptop = 1. Defaults to 0 (equal priority) if not configured.



61
62
63
64
65
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 61

def consumer_priority
  return 0 unless defined?(Legion::Settings)

  Legion::Settings.dig(:ollama, :fleet, :consumer_priority) || 0
end

#prefetchObject

prefetch(1) is required for consumer priority to work correctly: without it, a high-priority consumer can hold multiple messages while lower-priority consumers sit idle. With prefetch=1, each consumer completes one message before RabbitMQ delivers the next, and priority determines which idle consumer gets it.



53
54
55
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 53

def prefetch
  1
end

#process_message(payload, metadata, delivery_info) ⇒ Object

Enrich every inbound message with the worker’s own request_type and model so Runners::Fleet#handle_request always has them, even if the sender omitted them. Also defaults message_context to {} if absent.



89
90
91
92
93
94
95
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 89

def process_message(payload, , delivery_info)
  msg = super
  msg[:request_type]    ||= @request_type
  msg[:model]           ||= @model_name
  msg[:message_context] ||= {}
  msg
end

#queueObject

Returns a queue CLASS (not instance) bound to the llm.request exchange with the routing key for this worker’s (type, model) pair. The Subscription base class calls queue.new in initialize, so this must return a class, not an instance.



82
83
84
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 82

def queue
  @queue ||= build_queue_class
end

#runner_classObject



34
35
36
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 34

def runner_class
  Legion::Extensions::Ollama::Runners::Fleet
end

#runner_functionObject



38
39
40
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 38

def runner_function
  'handle_request'
end

#subscribe_optionsObject

Subscribe options include x-priority argument so RabbitMQ can honour consumer priority when dispatching to competing consumers.



69
70
71
72
73
74
75
76
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 69

def subscribe_options
  base = begin
    super
  rescue NoMethodError
    {}
  end
  base.merge(arguments: { 'x-priority' => consumer_priority })
end

#use_runner?Boolean

Bypass Legion::Runner — call the runner module directly so we don’t need a task record in the database for every LLM inference hop.

Returns:

  • (Boolean)


44
45
46
# File 'lib/legion/extensions/ollama/actors/model_worker.rb', line 44

def use_runner?
  false
end