Class: Philiprehberger::QueueStack::Queue

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/philiprehberger/queue_stack/queue.rb

Overview

Thread-safe FIFO queue with optional capacity limit and blocking operations.

Examples:

q = Queue.new(capacity: 10)
q.enqueue('item')
q.dequeue  # => 'item'

Instance Method Summary collapse

Constructor Details

#initialize(capacity: nil) ⇒ Queue

Create a new queue.

Parameters:

  • capacity (Integer, nil) (defaults to: nil)

    maximum number of items (nil for unlimited)



17
18
19
20
21
22
23
24
# File 'lib/philiprehberger/queue_stack/queue.rb', line 17

def initialize(capacity: nil)
  @items = []
  @capacity = capacity
  @closed = false
  @mutex = Mutex.new
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
end

Instance Method Details

#clearvoid

This method returns an undefined value.

Remove all items without returning them. Signals any blocked producers.



92
93
94
95
96
97
# File 'lib/philiprehberger/queue_stack/queue.rb', line 92

def clear
  @mutex.synchronize do
    @items.clear
    @not_full.broadcast
  end
end

#closevoid

This method returns an undefined value.

Mark the queue as closed. New enqueue calls will raise ClosedError. Existing items can still be dequeued. Wakes all waiting threads.



156
157
158
159
160
161
162
# File 'lib/philiprehberger/queue_stack/queue.rb', line 156

def close
  @mutex.synchronize do
    @closed = true
    @not_empty.broadcast
    @not_full.broadcast
  end
end

#closed?Boolean

Whether the queue has been closed.

Returns:

  • (Boolean)


167
168
169
# File 'lib/philiprehberger/queue_stack/queue.rb', line 167

def closed?
  @mutex.synchronize { @closed }
end

#dequeueObject?

Remove and return the front item. Blocks if empty (returns nil if closed and empty).

Returns:

  • (Object, nil)

    the dequeued item or nil if closed and empty



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/philiprehberger/queue_stack/queue.rb', line 44

def dequeue
  @mutex.synchronize do
    while @items.empty?
      return nil if @closed

      @not_empty.wait(@mutex)
    end
    item = @items.shift
    @not_full.signal
    item
  end
end

#drainArray

Remove and return all items as an array (FIFO order). Non-blocking.

Returns:

  • (Array)

    all items in FIFO order



123
124
125
126
127
128
129
130
# File 'lib/philiprehberger/queue_stack/queue.rb', line 123

def drain
  @mutex.synchronize do
    result = @items.dup
    @items.clear
    @not_full.broadcast
    result
  end
end

#each {|item| ... } ⇒ Enumerator, self

Iterate items without removing them (snapshot of current state, FIFO order). Returns an Enumerator if no block is given.

Yields:

  • (item)

    each item in FIFO order

Returns:

  • (Enumerator, self)


137
138
139
140
141
142
143
# File 'lib/philiprehberger/queue_stack/queue.rb', line 137

def each(&block)
  snapshot = @mutex.synchronize { @items.dup }
  return snapshot.each unless block

  snapshot.each(&block)
  self
end

#empty?Boolean

Whether the queue is empty.

Returns:

  • (Boolean)


200
201
202
# File 'lib/philiprehberger/queue_stack/queue.rb', line 200

def empty?
  @mutex.synchronize { @items.empty? }
end

#enqueue(item) ⇒ void

This method returns an undefined value.

Add an item to the back of the queue. Blocks if at capacity.

Parameters:

  • item (Object)

    the item to enqueue

Raises:



31
32
33
34
35
36
37
38
39
# File 'lib/philiprehberger/queue_stack/queue.rb', line 31

def enqueue(item)
  @mutex.synchronize do
    raise ClosedError, 'cannot enqueue on a closed queue' if @closed

    @not_full.wait(@mutex) while @capacity && @items.length >= @capacity
    @items.push(item)
    @not_empty.signal
  end
end

#full?Boolean

Whether the queue is at capacity.

Returns:

  • (Boolean)


207
208
209
# File 'lib/philiprehberger/queue_stack/queue.rb', line 207

def full?
  @mutex.synchronize { @capacity ? @items.length >= @capacity : false }
end

#peekObject?

Peek at the front item without removing it.

Returns:

  • (Object, nil)

    the front item or nil if empty



174
175
176
# File 'lib/philiprehberger/queue_stack/queue.rb', line 174

def peek
  @mutex.synchronize { @items.first }
end

#peek_at(index) ⇒ Object?

Peek at the item at the given position without removing it.

Indexing follows Array#[] semantics: 0 is the front of the queue, size - 1 is the back, and negative indices count from the back (-1 is the last item). Returns nil when the index is out of range.

Parameters:

  • index (Integer)

    position in the queue (supports negative indices)

Returns:

  • (Object, nil)

    the item at the position or nil if out of range



186
187
188
# File 'lib/philiprehberger/queue_stack/queue.rb', line 186

def peek_at(index)
  @mutex.synchronize { @items[index] }
end

#sizeInteger

Return the number of items in the queue.

Returns:

  • (Integer)


193
194
195
# File 'lib/philiprehberger/queue_stack/queue.rb', line 193

def size
  @mutex.synchronize { @items.length }
end

#to_aArray

Return a snapshot of items as an array (FIFO order).

Returns:

  • (Array)


148
149
150
# File 'lib/philiprehberger/queue_stack/queue.rb', line 148

def to_a
  @mutex.synchronize { @items.dup }
end

#try_dequeue(timeout:) ⇒ Object?

Try to dequeue an item with a timeout.

Parameters:

  • timeout (Numeric)

    seconds to wait

Returns:

  • (Object, nil)

    the dequeued item or nil on timeout



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/philiprehberger/queue_stack/queue.rb', line 103

def try_dequeue(timeout:)
  deadline = Time.now + timeout
  @mutex.synchronize do
    while @items.empty?
      return nil if @closed

      remaining = deadline - Time.now
      return nil if remaining <= 0

      @not_empty.wait(@mutex, remaining)
    end
    item = @items.shift
    @not_full.signal
    item
  end
end

#try_enqueue(item, timeout: nil) ⇒ Boolean

Try to enqueue an item without blocking indefinitely.

With timeout: nil, returns immediately. With a numeric timeout, waits up to that many seconds for space to become available.

Parameters:

  • item (Object)

    the item to enqueue

  • timeout (Numeric, nil) (defaults to: nil)

    seconds to wait, or nil for non-blocking

Returns:

  • (Boolean)

    true if enqueued, false if full (or timed out)

Raises:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/philiprehberger/queue_stack/queue.rb', line 66

def try_enqueue(item, timeout: nil)
  @mutex.synchronize do
    raise ClosedError, 'cannot enqueue on a closed queue' if @closed

    if @capacity && @items.length >= @capacity
      return false if timeout.nil? || timeout <= 0

      deadline = Time.now + timeout
      while @items.length >= @capacity
        remaining = deadline - Time.now
        return false if remaining <= 0

        @not_full.wait(@mutex, remaining)
        raise ClosedError, 'cannot enqueue on a closed queue' if @closed
      end
    end

    @items.push(item)
    @not_empty.signal
    true
  end
end