Class: OMQ::Routing::FairQueue
- Inherits:
-
Object
- Object
- OMQ::Routing::FairQueue
- Defined in:
- lib/omq/routing/fair_queue.rb
Overview
Per-connection recv queue aggregator.
Maintains one bounded queue per connected peer. #dequeue returns the next available message from any peer in fair round-robin order, blocking until one arrives.
Recv pumps do not enqueue directly — they write through a SignalingQueue wrapper, which also wakes a blocked #dequeue.
Instance Method Summary collapse
-
#add_queue(conn, q) ⇒ Object
Registers a per-connection queue.
-
#dequeue(timeout: nil) ⇒ Array<String>?
Returns the next message from any per-connection queue, in fair round-robin order.
- #empty? ⇒ Boolean
-
#initialize ⇒ FairQueue
constructor
Creates an empty fair queue with no per-connection queues.
-
#push(nil_sentinel) ⇒ Object
Injects a nil sentinel to unblock a waiting #dequeue.
-
#remove_queue(conn) ⇒ Object
Removes the per-connection queue for a disconnected peer.
-
#signal ⇒ Object
Wakes a blocked #dequeue.
Constructor Details
#initialize ⇒ FairQueue
Creates an empty fair queue with no per-connection queues.
17 18 19 20 21 22 23 24 25 |
# File 'lib/omq/routing/fair_queue.rb', line 17 def initialize @queues = [] # ordered list of per-connection inner queues @drain = [] # orphaned queues, drained before active queues @mapping = {} # connection => inner queue @cycle = @queues.cycle # live reference — sees adds/removes @condition = Async::Condition.new @pending = 0 # signals received before #dequeue waits @closed = false end |
Instance Method Details
#add_queue(conn, q) ⇒ Object
Registers a per-connection queue. Called when a connection is added.
33 34 35 36 |
# File 'lib/omq/routing/fair_queue.rb', line 33 def add_queue(conn, q) @mapping[conn] = q @queues << q end |
#dequeue(timeout: nil) ⇒ Array<String>?
Returns the next message from any per-connection queue, in fair round-robin order. Blocks until a message is available.
Pass timeout: 0 for a non-blocking poll (returns nil immediately if no messages are available).
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/omq/routing/fair_queue.rb', line 73 def dequeue(timeout: nil) return try_dequeue if timeout == 0 loop do if @closed && @drain.empty? && @queues.all? { |q| q.empty? } return nil end msg = try_dequeue return msg if msg if @pending > 0 @pending -= 1 next end @condition.wait end end |
#empty? ⇒ Boolean
105 106 107 |
# File 'lib/omq/routing/fair_queue.rb', line 105 def empty? @drain.empty? && @queues.all? { |q| q.empty? } end |
#push(nil_sentinel) ⇒ Object
Injects a nil sentinel to unblock a waiting #dequeue. Called by Engine on close or fatal error.
97 98 99 100 |
# File 'lib/omq/routing/fair_queue.rb', line 97 def push(nil_sentinel) @closed = true @condition.signal end |
#remove_queue(conn) ⇒ Object
Removes the per-connection queue for a disconnected peer.
If the queue still has pending messages it moves to the priority drain list so those messages are consumed before any active connection’s messages — preserving FIFO for sequential connections.
48 49 50 51 52 53 |
# File 'lib/omq/routing/fair_queue.rb', line 48 def remove_queue(conn) q = @mapping.delete(conn) return unless q @queues.delete(q) @drain << q unless q.empty? end |
#signal ⇒ Object
Wakes a blocked #dequeue. Called by SignalingQueue after each enqueue.
58 59 60 61 |
# File 'lib/omq/routing/fair_queue.rb', line 58 def signal @pending += 1 @condition.signal end |