Class: Legion::Extensions::Ollama::Actor::EndpointPuller
- Inherits:
-
Actors::Every
- Object
- Actors::Every
- Legion::Extensions::Ollama::Actor::EndpointPuller
- Defined in:
- lib/legion/extensions/ollama/actors/endpoint_puller.rb
Overview
Polls configured fleet queues with basic_get so endpoint machines choose when they are ready for work instead of holding prefetched messages.
Instance Method Summary collapse
- #action ⇒ Object
- #check_subtask? ⇒ Boolean
- #context_limit(subscription) ⇒ Object
- #decode_payload(payload, metadata) ⇒ Object
- #drain_lane(subscription) ⇒ Object
- #embed_type?(type) ⇒ Boolean
- #empty_lane_backoff_seconds ⇒ Object
- #enabled? ⇒ Boolean
- #endpoint_enabled? ⇒ Boolean
- #endpoint_settings ⇒ Object
- #finite_context_limit(subscription) ⇒ Object
- #fleet_scheduler ⇒ Object
- #generate_task? ⇒ Boolean
- #lane_backed_off?(subscription, now) ⇒ Boolean
- #lane_key(subscription) ⇒ Object
- #mark_lane_empty(subscription) ⇒ Object
- #max_consecutive_pulls_per_lane ⇒ Object
- #metadata_header(metadata, key) ⇒ Object
- #monotonic_time ⇒ Object
- #nested_setting(hash, *keys) ⇒ Object
- #ordered_subscriptions ⇒ Object
- #process_payload(payload, metadata, delivery_info, subscription) ⇒ Object
- #pull_one(queue, subscription) ⇒ Object
- #queue_config ⇒ Object
- #queue_for(subscription) ⇒ Object
- #runner_class ⇒ Object
- #runner_function ⇒ Object
- #setting_value(hash, key) ⇒ Object
- #subscriptions ⇒ Object
- #time ⇒ Object
- #use_runner? ⇒ Boolean
Instance Method Details
#action ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 41 def action return unless enabled? now = monotonic_time ordered_subscriptions.each do |sub| next if lane_backed_off?(sub, now) pulled = drain_lane(sub) mark_lane_empty(sub) if pulled.zero? end end |
#check_subtask? ⇒ Boolean
22 23 24 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 22 def check_subtask? false end |
#context_limit(subscription) ⇒ Object
186 187 188 189 190 191 192 193 194 195 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 186 def context_limit(subscription) raw = setting_value(subscription, :max_context_size) || setting_value(subscription, :context_window) || setting_value(subscription, :max_input_tokens) || setting_value(subscription, :context) || setting_value(subscription, :ctx) Integer(raw || Float::INFINITY) rescue ArgumentError, TypeError, FloatDomainError Float::INFINITY end |
#decode_payload(payload, metadata) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 107 def decode_payload(payload, ) decoded = if &.content_encoding == 'encrypted/cs' Legion::Crypt.decrypt(payload, (, :iv)) elsif &.content_encoding == 'encrypted/pk' Legion::Crypt.decrypt_from_keypair((, :public_key), payload) else payload end if &.content_type == 'application/json' Legion::JSON.load(decoded) else { value: decoded } end end |
#drain_lane(subscription) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 60 def drain_lane(subscription) pulls = 0 queue = queue_for(subscription) loop do break if max_consecutive_pulls_per_lane.positive? && pulls >= max_consecutive_pulls_per_lane break unless pull_one(queue, subscription) pulls += 1 end pulls end |
#embed_type?(type) ⇒ Boolean
202 203 204 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 202 def (type) %w[embed embedding embeddings].include?(type) end |
#empty_lane_backoff_seconds ⇒ Object
159 160 161 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 159 def empty_lane_backoff_seconds (setting_value(endpoint_settings, :empty_lane_backoff_ms) || 250).to_f / 1000 end |
#enabled? ⇒ Boolean
30 31 32 33 34 35 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 30 def enabled? fleet_scheduler == :basic_get && endpoint_enabled? && subscriptions.any? rescue StandardError => e handle_exception(e, level: :warn, handled: true) false end |
#endpoint_enabled? ⇒ Boolean
151 152 153 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 151 def endpoint_enabled? setting_value(endpoint_settings, :enabled) == true end |
#endpoint_settings ⇒ Object
147 148 149 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 147 def endpoint_settings nested_setting(settings, :fleet, :endpoint) || {} end |
#finite_context_limit(subscription) ⇒ Object
197 198 199 200 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 197 def finite_context_limit(subscription) context = context_limit(subscription) context.finite? ? context : nil end |
#fleet_scheduler ⇒ Object
182 183 184 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 182 def fleet_scheduler (nested_setting(settings, :fleet, :scheduler) || :basic_get).to_sym end |
#generate_task? ⇒ Boolean
26 27 28 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 26 def generate_task? false end |
#lane_backed_off?(subscription, now) ⇒ Boolean
163 164 165 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 163 def lane_backed_off?(subscription, now) (@empty_lanes ||= {}).fetch(lane_key(subscription), 0) > now end |
#lane_key(subscription) ⇒ Object
171 172 173 174 175 176 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 171 def lane_key(subscription) type = subscription[:type] model = subscription[:model] context = context_limit(subscription) context.finite? ? "#{type}:#{model}:ctx#{context}" : "#{type}:#{model}" end |
#mark_lane_empty(subscription) ⇒ Object
167 168 169 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 167 def mark_lane_empty(subscription) (@empty_lanes ||= {})[lane_key(subscription)] = monotonic_time + empty_lane_backoff_seconds end |
#max_consecutive_pulls_per_lane ⇒ Object
155 156 157 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 155 def max_consecutive_pulls_per_lane setting_value(endpoint_settings, :max_consecutive_pulls_per_lane) || 0 end |
#metadata_header(metadata, key) ⇒ Object
206 207 208 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 206 def (, key) setting_value(&.headers || {}, key) end |
#monotonic_time ⇒ Object
178 179 180 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 178 def monotonic_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#nested_setting(hash, *keys) ⇒ Object
210 211 212 213 214 215 216 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 210 def nested_setting(hash, *keys) keys.reduce(hash) do |current, key| return nil unless current.respond_to?(:key?) setting_value(current, key) end end |
#ordered_subscriptions ⇒ Object
53 54 55 56 57 58 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 53 def ordered_subscriptions subscriptions.sort_by do |sub| type = sub[:type].to_s [(type) ? 0 : 1, context_limit(sub), sub[:model].to_s] end end |
#process_payload(payload, metadata, delivery_info, subscription) ⇒ Object
97 98 99 100 101 102 103 104 105 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 97 def process_payload(payload, , delivery_info, subscription) = decode_payload(payload, ) = .merge(.headers.transform_keys(&:to_sym)) if &.headers [:routing_key] = delivery_info.routing_key if delivery_info.respond_to?(:routing_key) [:request_type] ||= subscription[:type].to_s [:model] ||= subscription[:model].to_s [:message_context] ||= {} end |
#pull_one(queue, subscription) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 73 def pull_one(queue, subscription) delivery_info, , payload = queue.pop(manual_ack: true) return false unless delivery_info = process_payload(payload, , delivery_info, subscription) Legion::Extensions::Ollama::Runners::Fleet.handle_request(**) queue.acknowledge(delivery_info.delivery_tag) true rescue StandardError => e handle_exception(e, lex: lex_name, routing_key: delivery_info&.routing_key) queue.reject(delivery_info.delivery_tag, requeue: false) if delivery_info true end |
#queue_config ⇒ Object
137 138 139 140 141 142 143 144 145 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 137 def queue_config { queue_expires_ms: nested_setting(settings, :fleet, :queue_expires_ms), message_ttl_ms: nested_setting(settings, :fleet, :message_ttl_ms), queue_max_length: nested_setting(settings, :fleet, :queue_max_length), delivery_limit: nested_setting(settings, :fleet, :delivery_limit), consumer_ack_timeout_ms: nested_setting(settings, :fleet, :consumer_ack_timeout_ms) }.compact end |
#queue_for(subscription) ⇒ Object
87 88 89 90 91 92 93 94 95 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 87 def queue_for(subscription) @queues ||= {} @queues[lane_key(subscription)] ||= ModelWorker.queue_class_for( request_type: subscription[:type], model: subscription[:model], context_window: finite_context_limit(subscription), queue_config: queue_config ).new end |
#runner_class ⇒ Object
10 11 12 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 10 def runner_class self.class end |
#runner_function ⇒ Object
14 15 16 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 14 def runner_function 'action' end |
#setting_value(hash, key) ⇒ Object
218 219 220 221 222 223 224 225 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 218 def setting_value(hash, key) return nil unless hash.respond_to?(:key?) string_key = key.to_s return hash[string_key] if hash.key?(string_key) hash[key] if hash.key?(key) end |
#subscriptions ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 123 def subscriptions configured = setting_value(settings, :subscriptions) return [] unless configured.is_a?(Array) configured.filter_map do |sub| next unless sub.is_a?(Hash) normalized = sub.transform_keys(&:to_sym) next unless normalized[:type] && normalized[:model] normalized end end |
#time ⇒ Object
37 38 39 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 37 def time (setting_value(endpoint_settings, :idle_backoff_ms) || 1_000).to_f / 1000 end |
#use_runner? ⇒ Boolean
18 19 20 |
# File 'lib/legion/extensions/ollama/actors/endpoint_puller.rb', line 18 def use_runner? false end |