Class: OMQ::Routing::FairQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/routing/fair_queue.rb

Overview

Per-connection recv queue aggregator.

Maintains one bounded queue per connected peer. #dequeue returns the next available message from any peer in fair round-robin order, blocking until one arrives.

Recv pumps do not enqueue directly — they write through a SignalingQueue wrapper, which also wakes a blocked #dequeue.

Instance Method Summary collapse

Constructor Details

#initializeFairQueue

Creates an empty fair queue with no per-connection queues.



17
18
19
20
21
22
23
24
25
# File 'lib/omq/routing/fair_queue.rb', line 17

def initialize
  @queues    = []              # ordered list of per-connection inner queues
  @drain     = []              # orphaned queues, drained before active queues
  @mapping   = {}              # connection => inner queue
  @cycle     = @queues.cycle  # live reference — sees adds/removes
  @condition = Async::Condition.new
  @pending   = 0              # signals received before #dequeue waits
  @closed    = false
end

Instance Method Details

#add_queue(conn, q) ⇒ Object

Registers a per-connection queue. Called when a connection is added.

Parameters:

  • conn (Connection)
  • q (Async::LimitedQueue)


33
34
35
36
# File 'lib/omq/routing/fair_queue.rb', line 33

def add_queue(conn, q)
  @mapping[conn] = q
  @queues << q
end

#dequeue(timeout: nil) ⇒ Array<String>?

Returns the next message from any per-connection queue, in fair round-robin order. Blocks until a message is available.

Pass timeout: 0 for a non-blocking poll (returns nil immediately if no messages are available).

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    0 = non-blocking, nil = block forever

Returns:

  • (Array<String>, nil)


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/omq/routing/fair_queue.rb', line 73

def dequeue(timeout: nil)
  return try_dequeue if timeout == 0

  loop do
    if @closed && @drain.empty? && @queues.all? { |q| q.empty? }
      return nil
    end

    msg = try_dequeue
    return msg if msg

    if @pending > 0
      @pending -= 1
      next
    end

    @condition.wait
  end
end

#empty?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/omq/routing/fair_queue.rb', line 105

def empty?
  @drain.empty? && @queues.all? { |q| q.empty? }
end

#push(nil_sentinel) ⇒ Object

Injects a nil sentinel to unblock a waiting #dequeue. Called by Engine on close or fatal error.



97
98
99
100
# File 'lib/omq/routing/fair_queue.rb', line 97

def push(nil_sentinel)
  @closed = true
  @condition.signal
end

#remove_queue(conn) ⇒ Object

Removes the per-connection queue for a disconnected peer.

If the queue still has pending messages it moves to the priority drain list so those messages are consumed before any active connection’s messages — preserving FIFO for sequential connections.

Parameters:

  • conn (Connection)


48
49
50
51
52
53
# File 'lib/omq/routing/fair_queue.rb', line 48

def remove_queue(conn)
  q = @mapping.delete(conn)
  return unless q
  @queues.delete(q)
  @drain << q unless q.empty?
end

#signalObject

Wakes a blocked #dequeue. Called by SignalingQueue after each enqueue.



58
59
60
61
# File 'lib/omq/routing/fair_queue.rb', line 58

def signal
  @pending += 1
  @condition.signal
end