Module: Legion::Extensions::Llm::Transport::FleetLane

Defined in:
lib/legion/extensions/llm/transport/fleet_lane.rb

Overview

Shared RabbitMQ live-work lane defaults for provider fleet workers.

Constant Summary collapse

DEFAULTS =
{
  queue_expires_ms: 60_000,
  message_ttl_ms: 120_000,
  queue_max_length: 100,
  delivery_limit: 3,
  consumer_ack_timeout_ms: 300_000
}.freeze

Class Method Summary collapse

Class Method Details

.build_queue_class(queue_name:, exchange_class:, routing_key: queue_name, base_queue_class: nil, settings: {}) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/legion/extensions/llm/transport/fleet_lane.rb', line 41

def build_queue_class(queue_name:, exchange_class:, routing_key: queue_name, base_queue_class: nil,
                      settings: {})
  parent = base_queue_class || legion_queue_class
  unless parent
    raise ArgumentError,
          'base_queue_class is required when Legion::Transport::Queue is not loaded'
  end

  options = queue_options(settings)
  Class.new(parent) do
    define_method(:queue_name) { queue_name }
    define_method(:queue_options) { options }
    define_method(:dlx_enabled) { false }
    define_method(:initialize) do
      super()
      bind(exchange_class.new, routing_key: routing_key)
    end
  end
end

.legion_queue_classObject



61
62
63
64
65
# File 'lib/legion/extensions/llm/transport/fleet_lane.rb', line 61

def legion_queue_class
  return nil unless defined?(::Legion::Transport::Queue)

  ::Legion::Transport::Queue
end

.queue_arguments(config) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/legion/extensions/llm/transport/fleet_lane.rb', line 28

def queue_arguments(config)
  {
    'x-queue-type' => 'quorum',
    'x-queue-leader-locator' => 'balanced',
    'x-expires' => config.fetch(:queue_expires_ms),
    'x-message-ttl' => config.fetch(:message_ttl_ms),
    'x-overflow' => 'reject-publish',
    'x-max-length' => config.fetch(:queue_max_length),
    'x-delivery-limit' => config.fetch(:delivery_limit),
    'x-consumer-timeout' => config.fetch(:consumer_ack_timeout_ms)
  }
end

.queue_options(settings = {}) ⇒ Object



19
20
21
22
23
24
25
26
# File 'lib/legion/extensions/llm/transport/fleet_lane.rb', line 19

def queue_options(settings = {})
  config = DEFAULTS.merge((settings || {}).compact.transform_keys(&:to_sym))
  {
    durable: true,
    auto_delete: false,
    arguments: queue_arguments(config)
  }
end