Class: Pgtk::Pool::IterableQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/pgtk/pool.rb

Overview

Thread-safe queue implementation that supports iteration. Unlike Ruby’s Queue class, this implementation allows safe iteration over all elements while maintaining thread safety for concurrent access.

This class is used internally by Pool to store database connections and provide the ability to iterate over them for inspection purposes.

The queue is bounded by size. When an item is taken out, it remains in the internal array but is marked as “taken”. When returned, it’s placed back in its original slot and marked as available.

Instance Method Summary collapse

Constructor Details

#initialize(size, timeout) ⇒ IterableQueue

Returns a new instance of IterableQueue.



245
246
247
248
249
250
251
252
# File 'lib/pgtk/pool.rb', line 245

def initialize(size, timeout)
  @size = size
  @timeout = timeout
  @items = []
  @taken = []
  @mutex = Mutex.new
  @condition = ConditionVariable.new
end

Instance Method Details

#mapObject



292
293
294
295
296
# File 'lib/pgtk/pool.rb', line 292

def map(&)
  @mutex.synchronize do
    @items.map(&)
  end
end

#popObject



272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/pgtk/pool.rb', line 272

def pop
  @mutex.synchronize do
    deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @timeout
    while @taken.all? || @items.empty?
      remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
      raise(Busy, "No free connection appeared in the pool after #{@timeout}s of waiting") if remaining <= 0
      @condition.wait(@mutex, remaining)
    end
    index = @taken.index(false)
    @taken[index] = true
    @items[index]
  end
end

#push(item) ⇒ Object



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/pgtk/pool.rb', line 254

def push(item)
  @mutex.synchronize do
    if @items.size < @size
      @items << item
      @taken << false
    else
      index = @items.index(item)
      if index.nil?
        index = @taken.index(true)
        raise(StandardError, 'No taken slot found') if index.nil?
        @items[index] = item
      end
      @taken[index] = false
    end
    @condition.signal
  end
end

#sizeObject



286
287
288
289
290
# File 'lib/pgtk/pool.rb', line 286

def size
  @mutex.synchronize do
    @items.size
  end
end