Module: Legion::Extensions::Llm::Transport::FleetLane
- Defined in:
- lib/legion/extensions/llm/transport/fleet_lane.rb
Overview
Shared RabbitMQ live-work lane defaults for provider fleet workers.
Constant Summary collapse
- DEFAULTS =
{ queue_expires_ms: 60_000, message_ttl_ms: 120_000, queue_max_length: 100, delivery_limit: 3, consumer_ack_timeout_ms: 300_000 }.freeze
Class Method Summary collapse
- .build_queue_class(queue_name:, exchange_class:, routing_key: queue_name, base_queue_class: nil, settings: {}) ⇒ Object
- .legion_queue_class ⇒ Object
- .queue_arguments(config) ⇒ Object
- .queue_options(settings = {}) ⇒ Object
Class Method Details
.build_queue_class(queue_name:, exchange_class:, routing_key: queue_name, base_queue_class: nil, settings: {}) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/legion/extensions/llm/transport/fleet_lane.rb', line 41 def build_queue_class(queue_name:, exchange_class:, routing_key: queue_name, base_queue_class: nil, settings: {}) parent = base_queue_class || legion_queue_class unless parent raise ArgumentError, 'base_queue_class is required when Legion::Transport::Queue is not loaded' end = (settings) Class.new(parent) do define_method(:queue_name) { queue_name } define_method(:queue_options) { } define_method(:dlx_enabled) { false } define_method(:initialize) do super() bind(exchange_class.new, routing_key: routing_key) end end end |
.legion_queue_class ⇒ Object
61 62 63 64 65 |
# File 'lib/legion/extensions/llm/transport/fleet_lane.rb', line 61 def legion_queue_class return nil unless defined?(::Legion::Transport::Queue) ::Legion::Transport::Queue end |
.queue_arguments(config) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/legion/extensions/llm/transport/fleet_lane.rb', line 28 def queue_arguments(config) { 'x-queue-type' => 'quorum', 'x-queue-leader-locator' => 'balanced', 'x-expires' => config.fetch(:queue_expires_ms), 'x-message-ttl' => config.fetch(:message_ttl_ms), 'x-overflow' => 'reject-publish', 'x-max-length' => config.fetch(:queue_max_length), 'x-delivery-limit' => config.fetch(:delivery_limit), 'x-consumer-timeout' => config.fetch(:consumer_ack_timeout_ms) } end |
.queue_options(settings = {}) ⇒ Object
19 20 21 22 23 24 25 26 |
# File 'lib/legion/extensions/llm/transport/fleet_lane.rb', line 19 def (settings = {}) config = DEFAULTS.merge((settings || {}).compact.transform_keys(&:to_sym)) { durable: true, auto_delete: false, arguments: queue_arguments(config) } end |