Class: AtomicRuby::AtomicConditionVariable
- Inherits:
-
Object
- Object
- AtomicRuby::AtomicConditionVariable
- Defined in:
- lib/atomic-ruby/atomic_condition_variable.rb
Overview
This class is NOT Ractor-safe as it parks ‘Thread` references, which cannot be shared across ractors.
Provides lock-free wait/signal coordination using atomic operations.
AtomicConditionVariable lets one or more threads park until another thread signals them, without the paired ‘Mutex` that Ruby’s ‘ConditionVariable` requires. Coordination is done by publishing the set of parked threads through an Atom, so all in-process synchronisation stays on the gem’s CAS path. Parking still uses ‘Thread.stop` and `Thread#wakeup`, which are the standard kernel-level primitives Ruby exposes for sleeping a thread.
The lost-wakeup race in a naive ‘check-then-park` consumer is avoided by the #wait contract: a waiter registers itself before re-evaluating the predicate, so any signal that fires after the producer makes the predicate true is guaranteed to see the waiter and wake it. Ruby also remembers pending wakeups across `Thread.stop`, so a wakeup that arrives between the predicate check and the actual park is not lost.
Instance Method Summary collapse
-
#broadcast ⇒ Integer
Wakes every parked waiter.
-
#initialize ⇒ AtomicConditionVariable
constructor
Creates a new condition variable with no parked threads.
-
#signal ⇒ true, false
Wakes one parked waiter, or no-ops if none are parked.
-
#wait ⇒ untyped
Blocks until the given block returns a truthy value, then returns that value.
-
#waiter_count ⇒ Integer
Returns the number of currently parked waiters.
Constructor Details
#initialize ⇒ AtomicConditionVariable
Creates a new condition variable with no parked threads.
54 55 56 |
# File 'lib/atomic-ruby/atomic_condition_variable.rb', line 54 def initialize @waiters = Atom.new([].freeze) end |
Instance Method Details
#broadcast ⇒ Integer
Wakes every parked waiter.
Each woken thread observes the wake the same way as with #signal.
115 116 117 118 119 120 121 122 123 |
# File 'lib/atomic-ruby/atomic_condition_variable.rb', line 115 def broadcast targets = nil @waiters.swap do |waiters| targets = waiters [].freeze end targets.each { |thread| thread.wakeup rescue nil } targets.size end |
#signal ⇒ true, false
Wakes one parked waiter, or no-ops if none are parked.
If a waiter has registered itself but is not yet inside ‘Thread.stop`, Ruby remembers the wakeup and the next `Thread.stop` returns immediately.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/atomic-ruby/atomic_condition_variable.rb', line 88 def signal target = nil @waiters.swap do |waiters| if waiters.empty? waiters else target = waiters.first waiters.drop(1).freeze end end return false unless target target.wakeup rescue nil true end |
#wait ⇒ untyped
Blocks until the given block returns a truthy value, then returns that value.
The block is evaluated optimistically first. If it returns truthy on that pass, no waiter is registered and the call returns immediately. Otherwise the calling thread registers itself, re-evaluates the block, and parks via ‘Thread.stop` until a #signal or #broadcast wakes it. The block may run more than once and may run concurrently with a signalling thread.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/atomic-ruby/atomic_condition_variable.rb', line 151 def wait result = yield return result if result self_thread = Thread.current loop do @waiters.swap { |waiters| (waiters + [self_thread]).freeze } result = yield if result @waiters.swap { |waiters| (waiters - [self_thread]).freeze } return result end Thread.stop @waiters.swap { |waiters| (waiters - [self_thread]).freeze } end end |
#waiter_count ⇒ Integer
Returns the number of currently parked waiters.
This operation is atomic and thread-safe. The returned value reflects the state at the time of the call, but may change immediately after in concurrent environments.
71 72 73 |
# File 'lib/atomic-ruby/atomic_condition_variable.rb', line 71 def waiter_count @waiters.value.size end |