Class: Pgtk::Pool::IterableQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/pgtk/pool.rb

Overview

Thread-safe queue implementation that supports iteration. Unlike Ruby’s Queue class, this implementation allows safe iteration over all elements while maintaining thread safety for concurrent access.

This class is used internally by Pool to store database connections and provide the ability to iterate over them for inspection purposes.

The queue is bounded by size. When an item is taken out, it remains in the internal array but is marked as “taken”. When returned, it’s placed back in its original slot and marked as available.

Instance Method Summary collapse

Constructor Details

#initialize(size, timeout) ⇒ IterableQueue

Returns a new instance of IterableQueue.



207
208
209
210
211
212
213
214
215
# File 'lib/pgtk/pool.rb', line 207

def initialize(size, timeout)
  @size = size
  @timeout = timeout
  @items = []
  @taken = []
  @free = []
  @mutex = Mutex.new
  @condition = ConditionVariable.new
end

Instance Method Details

#mapObject



257
258
259
260
261
# File 'lib/pgtk/pool.rb', line 257

def map(&)
  @mutex.synchronize do
    @items.map(&)
  end
end

#popObject



237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/pgtk/pool.rb', line 237

def pop
  @mutex.synchronize do
    deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @timeout
    while @free.empty?
      remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
      raise(Busy, "No free connection appeared in the pool after #{@timeout}s of waiting") if remaining <= 0
      @condition.wait(@mutex, remaining)
    end
    index = @free.shift
    @taken[index] = true
    @items[index]
  end
end

#push(item) ⇒ Object



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/pgtk/pool.rb', line 217

def push(item)
  @mutex.synchronize do
    if @items.size < @size
      @items << item
      @taken << false
      @free << (@items.size - 1)
    else
      index = @items.index(item)
      if index.nil?
        index = @taken.index(true)
        raise(StandardError, 'No taken slot found') if index.nil?
        @items[index] = item
      end
      @taken[index] = false
      @free << index
    end
    @condition.signal
  end
end

#sizeObject



251
252
253
254
255
# File 'lib/pgtk/pool.rb', line 251

def size
  @mutex.synchronize do
    @items.size
  end
end