Class: Phronomy::Concurrency::AsyncQueue Private

Inherits:
Object
  • Object
show all
Defined in:
lib/phronomy/concurrency/async_queue.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

A thread-safe FIFO queue for passing values between concurrent tasks.

Wraps +Thread::Queue+ so that callers do not need to reference the Ruby standard-library type directly. A future implementation may replace the backing primitive without changing call sites.

Examples:

Producer / consumer

queue = Phronomy::Concurrency::AsyncQueue.new
Runtime.instance.spawn { queue.push(expensive_io()) }
value = queue.pop   # blocks until the producer pushes

Instance Method Summary collapse

Constructor Details

#initialize(max_size: nil) ⇒ AsyncQueue

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of AsyncQueue.

Parameters:

  • max_size (Integer, nil) (defaults to: nil)

    optional upper bound on queue depth. When set, #push blocks the caller until a slot is available.



20
21
22
23
# File 'lib/phronomy/concurrency/async_queue.rb', line 20

def initialize(max_size: nil)
  @queue = max_size ? SizedQueue.new(max_size) : Thread::Queue.new
  @max_size = max_size
end

Instance Method Details

#closeself

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Closes the queue. Subsequent #pop calls raise +ClosedQueueError+.

Returns:

  • (self)


101
102
103
104
# File 'lib/phronomy/concurrency/async_queue.rb', line 101

def close
  @queue.close
  self
end

#empty?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns +true+ when the queue contains no items.

Returns:

  • (Boolean)


94
95
96
# File 'lib/phronomy/concurrency/async_queue.rb', line 94

def empty?
  @queue.empty?
end

#pop(timeout: nil) ⇒ Object?

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

The :fiber backend is EXPERIMENTAL. Real-time timeout behaviour in production workloads is not guaranteed and may differ from wall-clock expectations.

Note:

Cooperative timeout limitation: on the cooperative path, the deadline is re-checked after a wake-up signal arrives. If virtual time has already passed the deadline when the consumer is woken by a producer push, the consumer returns +nil+ rather than the pushed item. Without any wake-up signal the waiting Fiber remains suspended even after +scheduler.advance+ — the timeout does not self-fire.

Dequeues and returns the next item. In a cooperative scheduler context, suspends the current Fiber (yielding control back to the scheduler) rather than blocking the OS thread.

When +timeout+ is given the semantics depend on the active backend:

  • Thread backend (:thread) — uses real wall-clock time via +Thread::Queue#pop(timeout:)+. Requires Ruby 3.2+. Returns +nil+ if no item arrives within the specified number of real seconds.
  • DeterministicScheduler / :fiber backend — uses the scheduler's virtual time (+scheduler.virtual_time+). The timeout elapses only when the virtual clock is advanced (e.g. via Testing::FakeClock#advance). In tests this means the timeout is fully deterministic and does not depend on actual elapsed wall time. However, in production :fiber mode the timeout may never expire unless the scheduler explicitly advances virtual time.

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    seconds to wait before returning +nil+. Semantics are wall-clock on :thread and virtual-time on :fiber.

Returns:

  • (Object, nil)

    the next item, or +nil+ when timeout expires



73
74
75
76
77
78
79
80
81
82
# File 'lib/phronomy/concurrency/async_queue.rb', line 73

def pop(timeout: nil)
  scheduler = Phronomy::Runtime::Scheduler.current
  if scheduler
    _pop_cooperative(scheduler, timeout: timeout)
  elsif timeout
    @queue.pop(timeout: timeout)
  else
    @queue.pop
  end
end

#push(item) ⇒ self

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Enqueues +item+. In a cooperative scheduler context with a bounded queue (max_size:), suspends the current Fiber via a scheduler signal when the queue is full rather than blocking the OS thread. Without a scheduler, falls back to the standard SizedQueue blocking behaviour.

Parameters:

  • item (Object)

    value to enqueue

Returns:

  • (self)


33
34
35
36
37
38
39
40
41
42
# File 'lib/phronomy/concurrency/async_queue.rb', line 33

def push(item)
  scheduler = Phronomy::Runtime::Scheduler.current
  if scheduler && @max_size
    _push_cooperative(scheduler, item)
  else
    @queue.push(item)
    scheduler.raise_signal(@coop_signal) if scheduler && @coop_signal
  end
  self
end

#sizeInteger

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the current number of items in the queue.

Returns:

  • (Integer)


87
88
89
# File 'lib/phronomy/concurrency/async_queue.rb', line 87

def size
  @queue.size
end