Class: Philiprehberger::QueueStack::Queue
- Inherits:
-
Object
- Object
- Philiprehberger::QueueStack::Queue
- Includes:
- Enumerable
- Defined in:
- lib/philiprehberger/queue_stack/queue.rb
Overview
Thread-safe FIFO queue with optional capacity limit and blocking operations.
Instance Method Summary collapse
-
#clear ⇒ void
Remove all items without returning them.
-
#close ⇒ void
Mark the queue as closed.
-
#closed? ⇒ Boolean
Whether the queue has been closed.
-
#dequeue ⇒ Object?
Remove and return the front item.
-
#dequeue_if {|item| ... } ⇒ Object?
Conditionally dequeue the front item.
-
#drain ⇒ Array
Remove and return all items as an array (FIFO order).
-
#each {|item| ... } ⇒ Enumerator, self
Iterate items without removing them (snapshot of current state, FIFO order).
-
#empty? ⇒ Boolean
Whether the queue is empty.
-
#enqueue(item) ⇒ void
Add an item to the back of the queue.
-
#full? ⇒ Boolean
Whether the queue is at capacity.
-
#initialize(capacity: nil) ⇒ Queue
constructor
Create a new queue.
-
#peek ⇒ Object?
Peek at the front item without removing it.
-
#peek_at(index) ⇒ Object?
Peek at the item at the given position without removing it.
-
#size ⇒ Integer
Return the number of items in the queue.
-
#to_a ⇒ Array
Return a snapshot of items as an array (FIFO order).
-
#try_dequeue(timeout:) ⇒ Object?
Try to dequeue an item with a timeout.
-
#try_enqueue(item, timeout: nil) ⇒ Boolean
Try to enqueue an item without blocking indefinitely.
Constructor Details
#initialize(capacity: nil) ⇒ Queue
Create a new queue.
17 18 19 20 21 22 23 24 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 17 def initialize(capacity: nil) @items = [] @capacity = capacity @closed = false @mutex = Mutex.new @not_empty = ConditionVariable.new @not_full = ConditionVariable.new end |
Instance Method Details
#clear ⇒ void
This method returns an undefined value.
Remove all items without returning them. Signals any blocked producers.
92 93 94 95 96 97 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 92 def clear @mutex.synchronize do @items.clear @not_full.broadcast end end |
#close ⇒ void
This method returns an undefined value.
Mark the queue as closed. New enqueue calls will raise ClosedError. Existing items can still be dequeued. Wakes all waiting threads.
174 175 176 177 178 179 180 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 174 def close @mutex.synchronize do @closed = true @not_empty.broadcast @not_full.broadcast end end |
#closed? ⇒ Boolean
Whether the queue has been closed.
185 186 187 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 185 def closed? @mutex.synchronize { @closed } end |
#dequeue ⇒ Object?
Remove and return the front item. Blocks if empty (returns nil if closed and empty).
44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 44 def dequeue @mutex.synchronize do while @items.empty? return nil if @closed @not_empty.wait(@mutex) end item = @items.shift @not_full.signal item end end |
#dequeue_if {|item| ... } ⇒ Object?
Conditionally dequeue the front item. The block is called with the item that would be dequeued next. If the block returns truthy, the item is removed and returned. Otherwise the item is left in place and nil is returned. Returns nil immediately if the queue is empty (non-blocking).
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 106 def dequeue_if @mutex.synchronize do return nil if @items.empty? return nil unless yield(@items.first) item = @items.shift @not_full.signal item end end |
#drain ⇒ Array
Remove and return all items as an array (FIFO order). Non-blocking.
141 142 143 144 145 146 147 148 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 141 def drain @mutex.synchronize do result = @items.dup @items.clear @not_full.broadcast result end end |
#each {|item| ... } ⇒ Enumerator, self
Iterate items without removing them (snapshot of current state, FIFO order). Returns an Enumerator if no block is given.
155 156 157 158 159 160 161 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 155 def each(&block) snapshot = @mutex.synchronize { @items.dup } return snapshot.each unless block snapshot.each(&block) self end |
#empty? ⇒ Boolean
Whether the queue is empty.
218 219 220 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 218 def empty? @mutex.synchronize { @items.empty? } end |
#enqueue(item) ⇒ void
This method returns an undefined value.
Add an item to the back of the queue. Blocks if at capacity.
31 32 33 34 35 36 37 38 39 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 31 def enqueue(item) @mutex.synchronize do raise ClosedError, 'cannot enqueue on a closed queue' if @closed @not_full.wait(@mutex) while @capacity && @items.length >= @capacity @items.push(item) @not_empty.signal end end |
#full? ⇒ Boolean
Whether the queue is at capacity.
225 226 227 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 225 def full? @mutex.synchronize { @capacity ? @items.length >= @capacity : false } end |
#peek ⇒ Object?
Peek at the front item without removing it.
192 193 194 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 192 def peek @mutex.synchronize { @items.first } end |
#peek_at(index) ⇒ Object?
Peek at the item at the given position without removing it.
Indexing follows Array#[] semantics: 0 is the front of the queue, size - 1 is the back, and negative indices count from the back (-1 is the last item). Returns nil when the index is out of range.
204 205 206 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 204 def peek_at(index) @mutex.synchronize { @items[index] } end |
#size ⇒ Integer
Return the number of items in the queue.
211 212 213 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 211 def size @mutex.synchronize { @items.length } end |
#to_a ⇒ Array
Return a snapshot of items as an array (FIFO order).
166 167 168 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 166 def to_a @mutex.synchronize { @items.dup } end |
#try_dequeue(timeout:) ⇒ Object?
Try to dequeue an item with a timeout.
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 121 def try_dequeue(timeout:) deadline = Time.now + timeout @mutex.synchronize do while @items.empty? return nil if @closed remaining = deadline - Time.now return nil if remaining <= 0 @not_empty.wait(@mutex, remaining) end item = @items.shift @not_full.signal item end end |
#try_enqueue(item, timeout: nil) ⇒ Boolean
Try to enqueue an item without blocking indefinitely.
With timeout: nil, returns immediately. With a numeric timeout, waits up to that many seconds for space to become available.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 66 def try_enqueue(item, timeout: nil) @mutex.synchronize do raise ClosedError, 'cannot enqueue on a closed queue' if @closed if @capacity && @items.length >= @capacity return false if timeout.nil? || timeout <= 0 deadline = Time.now + timeout while @items.length >= @capacity remaining = deadline - Time.now return false if remaining <= 0 @not_full.wait(@mutex, remaining) raise ClosedError, 'cannot enqueue on a closed queue' if @closed end end @items.push(item) @not_empty.signal true end end |