Class: Pulsar::Internal::BoundedQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar/internal/bounded_queue.rb

Overview

Sized queue wrapper with close and timeout behavior.

Instance Method Summary collapse

Constructor Details

#initialize(capacity:) ⇒ BoundedQueue

Returns a new instance of BoundedQueue.

Raises:

  • (ArgumentError)


7
8
9
10
11
12
13
# File 'lib/pulsar/internal/bounded_queue.rb', line 7

def initialize(capacity:)
  raise ArgumentError, 'capacity must be positive' unless capacity.positive?

  @queue = SizedQueue.new(capacity)
  @mutex = Mutex.new
  @closed = false
end

Instance Method Details

#closeObject



43
44
45
46
# File 'lib/pulsar/internal/bounded_queue.rb', line 43

def close
  @mutex.synchronize { @closed = true }
  nil
end

#pop(timeout:) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/pulsar/internal/bounded_queue.rb', line 27

def pop(timeout:)
  ensure_open!

  deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout

  loop do
    ensure_open!
    return @queue.pop(true)
  rescue ThreadError
    remaining = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
    raise TimeoutError, 'operation timed out' if remaining <= 0

    sleep([remaining, 0.001].min)
  end
end

#push(value, timeout: nil) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/pulsar/internal/bounded_queue.rb', line 15

def push(value, timeout: nil)
  ensure_open!

  if timeout
    push_with_timeout(value, timeout)
  else
    push_until_available(value)
  end

  nil
end