Class: Legion::Gaia::NotificationGate::DelayQueue
- Inherits:
-
Object
- Object
- Legion::Gaia::NotificationGate::DelayQueue
- Defined in:
- lib/legion/gaia/notification_gate/delay_queue.rb
Instance Attribute Summary collapse
-
#max_delay ⇒ Object
readonly
Returns the value of attribute max_delay.
-
#max_size ⇒ Object
readonly
Returns the value of attribute max_size.
Instance Method Summary collapse
- #clear ⇒ Object
- #drain_expired ⇒ Object
- #enqueue(frame) ⇒ Object
- #flush ⇒ Object
-
#initialize(max_size: 100, max_delay: 14_400) ⇒ DelayQueue
constructor
A new instance of DelayQueue.
- #pending ⇒ Object
- #requeue(entry) ⇒ Object
- #size ⇒ Object
Constructor Details
#initialize(max_size: 100, max_delay: 14_400) ⇒ DelayQueue
Returns a new instance of DelayQueue.
9 10 11 12 13 14 |
# File 'lib/legion/gaia/notification_gate/delay_queue.rb', line 9 def initialize(max_size: 100, max_delay: 14_400) @max_size = max_size @max_delay = max_delay @entries = [] @mutex = Mutex.new end |
Instance Attribute Details
#max_delay ⇒ Object (readonly)
Returns the value of attribute max_delay.
7 8 9 |
# File 'lib/legion/gaia/notification_gate/delay_queue.rb', line 7 def max_delay @max_delay end |
#max_size ⇒ Object (readonly)
Returns the value of attribute max_size.
7 8 9 |
# File 'lib/legion/gaia/notification_gate/delay_queue.rb', line 7 def max_size @max_size end |
Instance Method Details
#clear ⇒ Object
58 59 60 |
# File 'lib/legion/gaia/notification_gate/delay_queue.rb', line 58 def clear @mutex.synchronize { @entries.clear } end |
#drain_expired ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/legion/gaia/notification_gate/delay_queue.rb', line 42 def drain_expired @mutex.synchronize do cutoff = Time.now.utc - @max_delay expired, @entries = @entries.partition { |e| e[:queued_at] < cutoff } expired end end |
#enqueue(frame) ⇒ Object
16 17 18 19 20 21 22 23 |
# File 'lib/legion/gaia/notification_gate/delay_queue.rb', line 16 def enqueue(frame) @mutex.synchronize do evicted = nil evicted = @entries.shift if @entries.size >= @max_size @entries << { frame: frame, queued_at: Time.now.utc, retry_count: 0 } evicted end end |
#flush ⇒ Object
50 51 52 53 54 55 56 |
# File 'lib/legion/gaia/notification_gate/delay_queue.rb', line 50 def flush @mutex.synchronize do all = @entries.dup @entries.clear all end end |
#pending ⇒ Object
38 39 40 |
# File 'lib/legion/gaia/notification_gate/delay_queue.rb', line 38 def pending @mutex.synchronize { @entries.dup } end |
#requeue(entry) ⇒ Object
25 26 27 28 29 30 31 32 |
# File 'lib/legion/gaia/notification_gate/delay_queue.rb', line 25 def requeue(entry) @mutex.synchronize do evicted = nil evicted = @entries.shift if @entries.size >= @max_size @entries << entry.merge(retry_count: entry[:retry_count].to_i + 1) evicted end end |
#size ⇒ Object
34 35 36 |
# File 'lib/legion/gaia/notification_gate/delay_queue.rb', line 34 def size @mutex.synchronize { @entries.size } end |