Class: Phronomy::AsyncQueue Private
- Inherits:
-
Object
- Object
- Phronomy::AsyncQueue
- Defined in:
- lib/phronomy/async_queue.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
A thread-safe FIFO queue for passing values between concurrent tasks.
Wraps +Thread::Queue+ so that callers do not need to reference the Ruby standard-library type directly. A future implementation may replace the backing primitive without changing call sites.
Instance Method Summary collapse
-
#close ⇒ self
private
Closes the queue.
-
#empty? ⇒ Boolean
private
Returns +true+ when the queue contains no items.
-
#initialize(max_size: nil) ⇒ AsyncQueue
constructor
private
A new instance of AsyncQueue.
-
#pop(timeout: nil) ⇒ Object?
private
Dequeues and returns the next item.
-
#push(item) ⇒ self
private
Enqueues +item+.
-
#size ⇒ Integer
private
Returns the current number of items in the queue.
Constructor Details
#initialize(max_size: nil) ⇒ AsyncQueue
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of AsyncQueue.
19 20 21 22 |
# File 'lib/phronomy/async_queue.rb', line 19 def initialize(max_size: nil) @queue = max_size ? SizedQueue.new(max_size) : Thread::Queue.new @max_size = max_size end |
Instance Method Details
#close ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Closes the queue. Subsequent #pop calls raise +ClosedQueueError+.
100 101 102 103 |
# File 'lib/phronomy/async_queue.rb', line 100 def close @queue.close self end |
#empty? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns +true+ when the queue contains no items.
93 94 95 |
# File 'lib/phronomy/async_queue.rb', line 93 def empty? @queue.empty? end |
#pop(timeout: nil) ⇒ Object?
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
The :fiber backend is EXPERIMENTAL. Real-time timeout behaviour
in production workloads is not guaranteed and may differ from wall-clock
expectations.
Cooperative timeout limitation: on the cooperative path, the deadline is re-checked after a wake-up signal arrives. If virtual time has already passed the deadline when the consumer is woken by a producer push, the consumer returns +nil+ rather than the pushed item. Without any wake-up signal the waiting Fiber remains suspended even after +scheduler.advance+ — the timeout does not self-fire.
Dequeues and returns the next item. In a cooperative scheduler context, suspends the current Fiber (yielding control back to the scheduler) rather than blocking the OS thread.
When +timeout+ is given the semantics depend on the active backend:
- Thread backend (
:thread) — uses real wall-clock time via +Thread::Queue#pop(timeout:)+. Requires Ruby 3.2+. Returns +nil+ if no item arrives within the specified number of real seconds. - DeterministicScheduler /
:fiberbackend — uses the scheduler's virtual time (+scheduler.virtual_time+). The timeout elapses only when the virtual clock is advanced (e.g. via Testing::FakeClock#advance). In tests this means the timeout is fully deterministic and does not depend on actual elapsed wall time. However, in production:fibermode the timeout may never expire unless the scheduler explicitly advances virtual time.
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/phronomy/async_queue.rb', line 72 def pop(timeout: nil) scheduler = Phronomy::Runtime::Scheduler.current if scheduler _pop_cooperative(scheduler, timeout: timeout) elsif timeout @queue.pop(timeout: timeout) else @queue.pop end end |
#push(item) ⇒ self
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Enqueues +item+. In a cooperative scheduler context with a bounded queue (max_size:), suspends the current Fiber via a scheduler signal when the queue is full rather than blocking the OS thread. Without a scheduler, falls back to the standard SizedQueue blocking behaviour.
32 33 34 35 36 37 38 39 40 41 |
# File 'lib/phronomy/async_queue.rb', line 32 def push(item) scheduler = Phronomy::Runtime::Scheduler.current if scheduler && @max_size _push_cooperative(scheduler, item) else @queue.push(item) scheduler.raise_signal(@coop_signal) if scheduler && @coop_signal end self end |
#size ⇒ Integer
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the current number of items in the queue.
86 87 88 |
# File 'lib/phronomy/async_queue.rb', line 86 def size @queue.size end |