Class: Philiprehberger::QueueStack::Queue
- Inherits:
-
Object
- Object
- Philiprehberger::QueueStack::Queue
- Includes:
- Enumerable
- Defined in:
- lib/philiprehberger/queue_stack/queue.rb
Overview
Thread-safe FIFO queue with optional capacity limit and blocking operations.
Instance Method Summary collapse
-
#clear ⇒ void
Remove all items without returning them.
-
#close ⇒ void
Mark the queue as closed.
-
#closed? ⇒ Boolean
Whether the queue has been closed.
-
#dequeue ⇒ Object?
Remove and return the front item.
-
#drain ⇒ Array
Remove and return all items as an array (FIFO order).
-
#each {|item| ... } ⇒ Enumerator, self
Iterate items without removing them (snapshot of current state, FIFO order).
-
#empty? ⇒ Boolean
Whether the queue is empty.
-
#enqueue(item) ⇒ void
Add an item to the back of the queue.
-
#full? ⇒ Boolean
Whether the queue is at capacity.
-
#initialize(capacity: nil) ⇒ Queue
constructor
Create a new queue.
-
#peek ⇒ Object?
Peek at the front item without removing it.
-
#peek_at(index) ⇒ Object?
Peek at the item at the given position without removing it.
-
#size ⇒ Integer
Return the number of items in the queue.
-
#to_a ⇒ Array
Return a snapshot of items as an array (FIFO order).
-
#try_dequeue(timeout:) ⇒ Object?
Try to dequeue an item with a timeout.
-
#try_enqueue(item, timeout: nil) ⇒ Boolean
Try to enqueue an item without blocking indefinitely.
Constructor Details
#initialize(capacity: nil) ⇒ Queue
Create a new queue.
17 18 19 20 21 22 23 24 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 17 def initialize(capacity: nil) @items = [] @capacity = capacity @closed = false @mutex = Mutex.new @not_empty = ConditionVariable.new @not_full = ConditionVariable.new end |
Instance Method Details
#clear ⇒ void
This method returns an undefined value.
Remove all items without returning them. Signals any blocked producers.
92 93 94 95 96 97 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 92 def clear @mutex.synchronize do @items.clear @not_full.broadcast end end |
#close ⇒ void
This method returns an undefined value.
Mark the queue as closed. New enqueue calls will raise ClosedError. Existing items can still be dequeued. Wakes all waiting threads.
156 157 158 159 160 161 162 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 156 def close @mutex.synchronize do @closed = true @not_empty.broadcast @not_full.broadcast end end |
#closed? ⇒ Boolean
Whether the queue has been closed.
167 168 169 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 167 def closed? @mutex.synchronize { @closed } end |
#dequeue ⇒ Object?
Remove and return the front item. Blocks if empty (returns nil if closed and empty).
44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 44 def dequeue @mutex.synchronize do while @items.empty? return nil if @closed @not_empty.wait(@mutex) end item = @items.shift @not_full.signal item end end |
#drain ⇒ Array
Remove and return all items as an array (FIFO order). Non-blocking.
123 124 125 126 127 128 129 130 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 123 def drain @mutex.synchronize do result = @items.dup @items.clear @not_full.broadcast result end end |
#each {|item| ... } ⇒ Enumerator, self
Iterate items without removing them (snapshot of current state, FIFO order). Returns an Enumerator if no block is given.
137 138 139 140 141 142 143 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 137 def each(&block) snapshot = @mutex.synchronize { @items.dup } return snapshot.each unless block snapshot.each(&block) self end |
#empty? ⇒ Boolean
Whether the queue is empty.
200 201 202 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 200 def empty? @mutex.synchronize { @items.empty? } end |
#enqueue(item) ⇒ void
This method returns an undefined value.
Add an item to the back of the queue. Blocks if at capacity.
31 32 33 34 35 36 37 38 39 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 31 def enqueue(item) @mutex.synchronize do raise ClosedError, 'cannot enqueue on a closed queue' if @closed @not_full.wait(@mutex) while @capacity && @items.length >= @capacity @items.push(item) @not_empty.signal end end |
#full? ⇒ Boolean
Whether the queue is at capacity.
207 208 209 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 207 def full? @mutex.synchronize { @capacity ? @items.length >= @capacity : false } end |
#peek ⇒ Object?
Peek at the front item without removing it.
174 175 176 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 174 def peek @mutex.synchronize { @items.first } end |
#peek_at(index) ⇒ Object?
Peek at the item at the given position without removing it.
Indexing follows Array#[] semantics: 0 is the front of the queue, size - 1 is the back, and negative indices count from the back (-1 is the last item). Returns nil when the index is out of range.
186 187 188 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 186 def peek_at(index) @mutex.synchronize { @items[index] } end |
#size ⇒ Integer
Return the number of items in the queue.
193 194 195 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 193 def size @mutex.synchronize { @items.length } end |
#to_a ⇒ Array
Return a snapshot of items as an array (FIFO order).
148 149 150 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 148 def to_a @mutex.synchronize { @items.dup } end |
#try_dequeue(timeout:) ⇒ Object?
Try to dequeue an item with a timeout.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 103 def try_dequeue(timeout:) deadline = Time.now + timeout @mutex.synchronize do while @items.empty? return nil if @closed remaining = deadline - Time.now return nil if remaining <= 0 @not_empty.wait(@mutex, remaining) end item = @items.shift @not_full.signal item end end |
#try_enqueue(item, timeout: nil) ⇒ Boolean
Try to enqueue an item without blocking indefinitely.
With timeout: nil, returns immediately. With a numeric timeout, waits up to that many seconds for space to become available.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/philiprehberger/queue_stack/queue.rb', line 66 def try_enqueue(item, timeout: nil) @mutex.synchronize do raise ClosedError, 'cannot enqueue on a closed queue' if @closed if @capacity && @items.length >= @capacity return false if timeout.nil? || timeout <= 0 deadline = Time.now + timeout while @items.length >= @capacity remaining = deadline - Time.now return false if remaining <= 0 @not_full.wait(@mutex, remaining) raise ClosedError, 'cannot enqueue on a closed queue' if @closed end end @items.push(item) @not_empty.signal true end end |