Class: Phronomy::AsyncQueue Private

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/async_queue.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

A thread-safe FIFO queue for passing values between concurrent tasks.

Wraps +Thread::Queue+ so that callers do not need to reference the Ruby standard-library type directly. A future implementation may replace the backing primitive without changing call sites.

Examples:

Producer / consumer

queue = Phronomy::AsyncQueue.new
Runtime.instance.spawn { queue.push(expensive_io()) }
value = queue.pop   # blocks until the producer pushes

Instance Method Summary collapse

Constructor Details

#initialize(max_size: nil) ⇒ AsyncQueue

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of AsyncQueue.

Parameters:

  • max_size (Integer, nil) (defaults to: nil)

    optional upper bound on queue depth. When set, #push blocks the caller until a slot is available.



19
20
21
22
# File 'lib/phronomy/async_queue.rb', line 19

def initialize(max_size: nil)
  @queue = max_size ? SizedQueue.new(max_size) : Thread::Queue.new
  @max_size = max_size
end

Instance Method Details

#closeself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Closes the queue. Subsequent #pop calls raise +ClosedQueueError+.

Returns:

  • (self)


100
101
102
103
# File 'lib/phronomy/async_queue.rb', line 100

def close
  @queue.close
  self
end

#empty?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns +true+ when the queue contains no items.

Returns:

  • (Boolean)


93
94
95
# File 'lib/phronomy/async_queue.rb', line 93

def empty?
  @queue.empty?
end

#pop(timeout: nil) ⇒ Object?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

The :fiber backend is EXPERIMENTAL. Real-time timeout behaviour in production workloads is not guaranteed and may differ from wall-clock expectations.

Note:

Cooperative timeout limitation: on the cooperative path, the deadline is re-checked after a wake-up signal arrives. If virtual time has already passed the deadline when the consumer is woken by a producer push, the consumer returns +nil+ rather than the pushed item. Without any wake-up signal the waiting Fiber remains suspended even after +scheduler.advance+ — the timeout does not self-fire.

Dequeues and returns the next item. In a cooperative scheduler context, suspends the current Fiber (yielding control back to the scheduler) rather than blocking the OS thread.

When +timeout+ is given the semantics depend on the active backend:

  • Thread backend (:thread) — uses real wall-clock time via +Thread::Queue#pop(timeout:)+. Requires Ruby 3.2+. Returns +nil+ if no item arrives within the specified number of real seconds.
  • DeterministicScheduler / :fiber backend — uses the scheduler's virtual time (+scheduler.virtual_time+). The timeout elapses only when the virtual clock is advanced (e.g. via Testing::FakeClock#advance). In tests this means the timeout is fully deterministic and does not depend on actual elapsed wall time. However, in production :fiber mode the timeout may never expire unless the scheduler explicitly advances virtual time.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    seconds to wait before returning +nil+. Semantics are wall-clock on :thread and virtual-time on :fiber.

Returns:

  • (Object, nil)

    the next item, or +nil+ when timeout expires



72
73
74
75
76
77
78
79
80
81
# File 'lib/phronomy/async_queue.rb', line 72

def pop(timeout: nil)
  scheduler = Phronomy::Runtime::Scheduler.current
  if scheduler
    _pop_cooperative(scheduler, timeout: timeout)
  elsif timeout
    @queue.pop(timeout: timeout)
  else
    @queue.pop
  end
end

#push(item) ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Enqueues +item+. In a cooperative scheduler context with a bounded queue (max_size:), suspends the current Fiber via a scheduler signal when the queue is full rather than blocking the OS thread. Without a scheduler, falls back to the standard SizedQueue blocking behaviour.

Parameters:

  • item (Object)

    value to enqueue

Returns:

  • (self)


32
33
34
35
36
37
38
39
40
41
# File 'lib/phronomy/async_queue.rb', line 32

def push(item)
  scheduler = Phronomy::Runtime::Scheduler.current
  if scheduler && @max_size
    _push_cooperative(scheduler, item)
  else
    @queue.push(item)
    scheduler.raise_signal(@coop_signal) if scheduler && @coop_signal
  end
  self
end

#sizeInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the current number of items in the queue.

Returns:

  • (Integer)


86
87
88
# File 'lib/phronomy/async_queue.rb', line 86

def size
  @queue.size
end