Class: Legion::Extensions::Ollama::Actor::EndpointPuller

Inherits:
Actors::Every
  • Object
show all
Defined in:
lib/legion/extensions/ollama/actors/endpoint_puller.rb

Overview

Polls configured fleet queues with basic_get so endpoint machines choose when they are ready for work instead of holding prefetched messages.

Instance Method Summary collapse

Instance Method Details

#actionObject



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 41

def action
  return unless enabled?

  now = monotonic_time
  ordered_subscriptions.each do |sub|
    next if lane_backed_off?(sub, now)

    pulled = drain_lane(sub)
    mark_lane_empty(sub) if pulled.zero?
  end
end

#check_subtask?Boolean

Returns:

  • (Boolean)


22
23
24
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 22

def check_subtask?
  false
end

#context_limit(subscription) ⇒ Object



186
187
188
189
190
191
192
193
194
195
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 186

def context_limit(subscription)
  raw = setting_value(subscription, :max_context_size) ||
        setting_value(subscription, :context_window) ||
        setting_value(subscription, :max_input_tokens) ||
        setting_value(subscription, :context) ||
        setting_value(subscription, :ctx)
  Integer(raw || Float::INFINITY)
rescue ArgumentError, TypeError, FloatDomainError
  Float::INFINITY
end

#decode_payload(payload, metadata) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 107

def decode_payload(payload, )
  decoded = if &.content_encoding == 'encrypted/cs'
              Legion::Crypt.decrypt(payload, (, :iv))
            elsif &.content_encoding == 'encrypted/pk'
              Legion::Crypt.decrypt_from_keypair((, :public_key), payload)
            else
              payload
            end

  if &.content_type == 'application/json'
    Legion::JSON.load(decoded)
  else
    { value: decoded }
  end
end

#drain_lane(subscription) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 60

def drain_lane(subscription)
  pulls = 0
  queue = queue_for(subscription)

  loop do
    break if max_consecutive_pulls_per_lane.positive? && pulls >= max_consecutive_pulls_per_lane
    break unless pull_one(queue, subscription)

    pulls += 1
  end
  pulls
end

#embed_type?(type) ⇒ Boolean

Returns:

  • (Boolean)


202
203
204
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 202

def embed_type?(type)
  %w[embed embedding embeddings].include?(type)
end

#empty_lane_backoff_secondsObject



159
160
161
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 159

def empty_lane_backoff_seconds
  (setting_value(endpoint_settings, :empty_lane_backoff_ms) || 250).to_f / 1000
end

#enabled?Boolean

Returns:

  • (Boolean)


30
31
32
33
34
35
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 30

def enabled?
  fleet_scheduler == :basic_get && endpoint_enabled? && subscriptions.any?
rescue StandardError => e
  handle_exception(e, level: :warn, handled: true)
  false
end

#endpoint_enabled?Boolean

Returns:

  • (Boolean)


151
152
153
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 151

def endpoint_enabled?
  setting_value(endpoint_settings, :enabled) == true
end

#endpoint_settingsObject



147
148
149
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 147

def endpoint_settings
  nested_setting(settings, :fleet, :endpoint) || {}
end

#finite_context_limit(subscription) ⇒ Object



197
198
199
200
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 197

def finite_context_limit(subscription)
  context = context_limit(subscription)
  context.finite? ? context : nil
end

#fleet_schedulerObject



182
183
184
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 182

def fleet_scheduler
  (nested_setting(settings, :fleet, :scheduler) || :basic_get).to_sym
end

#generate_task?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 26

def generate_task?
  false
end

#lane_backed_off?(subscription, now) ⇒ Boolean

Returns:

  • (Boolean)


163
164
165
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 163

def lane_backed_off?(subscription, now)
  (@empty_lanes ||= {}).fetch(lane_key(subscription), 0) > now
end

#lane_key(subscription) ⇒ Object



171
172
173
174
175
176
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 171

def lane_key(subscription)
  type = subscription[:type]
  model = subscription[:model]
  context = context_limit(subscription)
  context.finite? ? "#{type}:#{model}:ctx#{context}" : "#{type}:#{model}"
end

#mark_lane_empty(subscription) ⇒ Object



167
168
169
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 167

def mark_lane_empty(subscription)
  (@empty_lanes ||= {})[lane_key(subscription)] = monotonic_time + empty_lane_backoff_seconds
end

#max_consecutive_pulls_per_laneObject



155
156
157
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 155

def max_consecutive_pulls_per_lane
  setting_value(endpoint_settings, :max_consecutive_pulls_per_lane) || 0
end

#metadata_header(metadata, key) ⇒ Object



206
207
208
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 206

def (, key)
  setting_value(&.headers || {}, key)
end

#monotonic_timeObject



178
179
180
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 178

def monotonic_time
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#nested_setting(hash, *keys) ⇒ Object



210
211
212
213
214
215
216
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 210

def nested_setting(hash, *keys)
  keys.reduce(hash) do |current, key|
    return nil unless current.respond_to?(:key?)

    setting_value(current, key)
  end
end

#ordered_subscriptionsObject



53
54
55
56
57
58
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 53

def ordered_subscriptions
  subscriptions.sort_by do |sub|
    type = sub[:type].to_s
    [embed_type?(type) ? 0 : 1, context_limit(sub), sub[:model].to_s]
  end
end

#process_payload(payload, metadata, delivery_info, subscription) ⇒ Object



97
98
99
100
101
102
103
104
105
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 97

def process_payload(payload, , delivery_info, subscription)
  message = decode_payload(payload, )
  message = message.merge(.headers.transform_keys(&:to_sym)) if &.headers
  message[:routing_key] = delivery_info.routing_key if delivery_info.respond_to?(:routing_key)
  message[:request_type] ||= subscription[:type].to_s
  message[:model] ||= subscription[:model].to_s
  message[:message_context] ||= {}
  message
end

#pull_one(queue, subscription) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 73

def pull_one(queue, subscription)
  delivery_info, , payload = queue.pop(manual_ack: true)
  return false unless delivery_info

  message = process_payload(payload, , delivery_info, subscription)
  Legion::Extensions::Ollama::Runners::Fleet.handle_request(**message)
  queue.acknowledge(delivery_info.delivery_tag)
  true
rescue StandardError => e
  handle_exception(e, lex: lex_name, routing_key: delivery_info&.routing_key)
  queue.reject(delivery_info.delivery_tag, requeue: false) if delivery_info
  true
end

#queue_configObject



137
138
139
140
141
142
143
144
145
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 137

def queue_config
  {
    queue_expires_ms:        nested_setting(settings, :fleet, :queue_expires_ms),
    message_ttl_ms:          nested_setting(settings, :fleet, :message_ttl_ms),
    queue_max_length:        nested_setting(settings, :fleet, :queue_max_length),
    delivery_limit:          nested_setting(settings, :fleet, :delivery_limit),
    consumer_ack_timeout_ms: nested_setting(settings, :fleet, :consumer_ack_timeout_ms)
  }.compact
end

#queue_for(subscription) ⇒ Object



87
88
89
90
91
92
93
94
95
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 87

def queue_for(subscription)
  @queues ||= {}
  @queues[lane_key(subscription)] ||= ModelWorker.queue_class_for(
    request_type:   subscription[:type],
    model:          subscription[:model],
    context_window: finite_context_limit(subscription),
    queue_config:   queue_config
  ).new
end

#runner_classObject



10
11
12
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 10

def runner_class
  self.class
end

#runner_functionObject



14
15
16
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 14

def runner_function
  'action'
end

#setting_value(hash, key) ⇒ Object



218
219
220
221
222
223
224
225
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 218

def setting_value(hash, key)
  return nil unless hash.respond_to?(:key?)

  string_key = key.to_s
  return hash[string_key] if hash.key?(string_key)

  hash[key] if hash.key?(key)
end

#subscriptionsObject



123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 123

def subscriptions
  configured = setting_value(settings, :subscriptions)
  return [] unless configured.is_a?(Array)

  configured.filter_map do |sub|
    next unless sub.is_a?(Hash)

    normalized = sub.transform_keys(&:to_sym)
    next unless normalized[:type] && normalized[:model]

    normalized
  end
end

#timeObject



37
38
39
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 37

def time
  (setting_value(endpoint_settings, :idle_backoff_ms) || 1_000).to_f / 1000
end

#use_runner?Boolean

Returns:

  • (Boolean)


18
19
20
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 18

def use_runner?
  false
end