Class: AtomicRuby::AtomicConditionVariable

Inherits:
Object
  • Object
show all
Defined in:
lib/atomic-ruby/atomic_condition_variable.rb

Overview

Note:

This class is NOT Ractor-safe as it parks ‘Thread` references, which cannot be shared across ractors.

Provides lock-free wait/signal coordination using atomic operations.

AtomicConditionVariable lets one or more threads park until another thread signals them, without the paired ‘Mutex` that Ruby’s ‘ConditionVariable` requires. Coordination is done by publishing the set of parked threads through an Atom, so all in-process synchronisation stays on the gem’s CAS path. Parking still uses ‘Thread.stop` and `Thread#wakeup`, which are the standard kernel-level primitives Ruby exposes for sleeping a thread.

The lost-wakeup race in a naive ‘check-then-park` consumer is avoided by the #wait contract: a waiter registers itself before re-evaluating the predicate, so any signal that fires after the producer makes the predicate true is guaranteed to see the waiter and wake it. Ruby also remembers pending wakeups across `Thread.stop`, so a wakeup that arrives between the predicate check and the actual park is not lost.

Examples:

Basic usage

condvar = AtomicConditionVariable.new
ready = AtomicBoolean.new(false)

waiter = Thread.new do
  condvar.wait { ready.true? }
  puts "ready"
end

ready.make_true
condvar.signal

Worker loop draining an atomic queue

condvar.wait do
  work = nil
  queue.swap do |q|
    q.empty? ? q : (work = q.first; q.drop(1).freeze)
  end
  work
end

Instance Method Summary collapse

Constructor Details

#initializeAtomicConditionVariable

Creates a new condition variable with no parked threads.

Examples:

condvar = AtomicConditionVariable.new


54
55
56
# File 'lib/atomic-ruby/atomic_condition_variable.rb', line 54

def initialize
  @waiters = Atom.new([].freeze)
end

Instance Method Details

#broadcastInteger

Wakes every parked waiter.

Each woken thread observes the wake the same way as with #signal.

Examples:

condvar = AtomicConditionVariable.new
condvar.broadcast #=> 0

Returns:

  • (Integer)

    The number of waiters signalled



115
116
117
118
119
120
121
122
123
# File 'lib/atomic-ruby/atomic_condition_variable.rb', line 115

def broadcast
  targets = nil
  @waiters.swap do |waiters|
    targets = waiters
    [].freeze
  end
  targets.each { |thread| thread.wakeup rescue nil }
  targets.size
end

#signaltrue, false

Wakes one parked waiter, or no-ops if none are parked.

If a waiter has registered itself but is not yet inside ‘Thread.stop`, Ruby remembers the wakeup and the next `Thread.stop` returns immediately.

Examples:

condvar = AtomicConditionVariable.new
condvar.signal #=> false

Returns:

  • (true, false)

    true if a waiter was signalled, false otherwise



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/atomic-ruby/atomic_condition_variable.rb', line 88

def signal
  target = nil
  @waiters.swap do |waiters|
    if waiters.empty?
      waiters
    else
      target = waiters.first
      waiters.drop(1).freeze
    end
  end
  return false unless target

  target.wakeup rescue nil
  true
end

#waituntyped

Blocks until the given block returns a truthy value, then returns that value.

The block is evaluated optimistically first. If it returns truthy on that pass, no waiter is registered and the call returns immediately. Otherwise the calling thread registers itself, re-evaluates the block, and parks via ‘Thread.stop` until a #signal or #broadcast wakes it. The block may run more than once and may run concurrently with a signalling thread.

Examples:

Simple wait

condvar = AtomicConditionVariable.new
ready = AtomicBoolean.new(false)

Thread.new do
  sleep(1)
  ready.make_true
  condvar.signal
end

condvar.wait { ready.true? } #=> true

Yield Returns:

  • (untyped)

    Truthy to wake, falsy to keep waiting

Returns:

  • (untyped)

    The first truthy value returned by the block



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/atomic-ruby/atomic_condition_variable.rb', line 151

def wait
  result = yield
  return result if result

  self_thread = Thread.current
  loop do
    @waiters.swap { |waiters| (waiters + [self_thread]).freeze }
    result = yield
    if result
      @waiters.swap { |waiters| (waiters - [self_thread]).freeze }
      return result
    end

    Thread.stop
    @waiters.swap { |waiters| (waiters - [self_thread]).freeze }
  end
end

#waiter_countInteger

Returns the number of currently parked waiters.

This operation is atomic and thread-safe. The returned value reflects the state at the time of the call, but may change immediately after in concurrent environments.

Examples:

condvar = AtomicConditionVariable.new
puts condvar.waiter_count #=> 0

Returns:

  • (Integer)

    The number of currently parked waiters



71
72
73
# File 'lib/atomic-ruby/atomic_condition_variable.rb', line 71

def waiter_count
  @waiters.value.size
end