Class: OMQ::DropQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/drop_queue.rb

Overview

A bounded queue that drops messages when full instead of blocking.

Two drop strategies:

:drop_newest — discard the incoming message (tail drop)
:drop_oldest — discard the head, then enqueue the new message

Used by SUB/XSUB/DISH recv queues when on_mute is a drop strategy.

Instance Method Summary collapse

Constructor Details

#initialize(limit, strategy: :drop_newest) ⇒ DropQueue

Returns a new instance of DropQueue.

Parameters:

  • limit (Integer)

    maximum number of items

  • strategy (Symbol) (defaults to: :drop_newest)

    :drop_newest or :drop_oldest



16
17
18
19
# File 'lib/omq/drop_queue.rb', line 16

def initialize(limit, strategy: :drop_newest)
  @queue    = Thread::SizedQueue.new(limit)
  @strategy = strategy
end

Instance Method Details

#dequeue(timeout: nil) ⇒ Object

Removes and returns the next item, blocking if empty.

Returns:

  • (Object)


42
43
44
45
46
47
48
# File 'lib/omq/drop_queue.rb', line 42

def dequeue(timeout: nil)
  if timeout
    @queue.pop(timeout: timeout)
  else
    @queue.pop
  end
end

#empty?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/omq/drop_queue.rb', line 53

def empty?
  @queue.empty?
end

#enqueue(item) ⇒ void

This method returns an undefined value.

Enqueues an item. Drops according to the configured strategy if full.

Parameters:

  • item (Object)


27
28
29
30
31
32
33
34
35
# File 'lib/omq/drop_queue.rb', line 27

def enqueue(item)
  @queue.push(item, true)
rescue ThreadError
  return if @strategy == :drop_newest

  # :drop_oldest — discard head, enqueue new
  @queue.pop(true) rescue nil
  retry
end