Class: WaterDrop::Polling::Latch

Inherits:
Object
  • Object
show all
Defined in:
lib/waterdrop/polling/latch.rb

Overview

A thread-safe latch for synchronizing producer close operations

When a producer is closed, two threads are involved:

  1. The caller thread (user code calling producer.close)

  2. The poller thread (background thread running IO.select)

The close sequence:

  1. Caller calls producer.close -> unregister_from_poller -> Poller#unregister

  2. Poller#unregister signals via control pipe and calls state.wait_for_close (blocks on latch)

  3. Poller thread receives control signal, drains queue, calls state.close

  4. state.close releases the latch via release!

  5. Caller’s wait_for_close returns, unregister completes

This ensures the producer is fully drained and removed from the poller before returning control to the caller, preventing race conditions.

Instance Method Summary collapse

Constructor Details

#initializeLatch

Initializes a new latch in the unreleased state.



22
23
24
25
26
# File 'lib/waterdrop/polling/latch.rb', line 22

def initialize
  @mutex = Mutex.new
  @cv = ConditionVariable.new
  @released = false
end

Instance Method Details

#release!Object

Releases the latch and wakes any waiting threads



29
30
31
32
33
34
# File 'lib/waterdrop/polling/latch.rb', line 29

def release!
  @mutex.synchronize do
    @released = true
    @cv.broadcast
  end
end

#released?Boolean

Returns whether the latch has been released.

Returns:

  • (Boolean)

    whether the latch has been released



45
46
47
# File 'lib/waterdrop/polling/latch.rb', line 45

def released?
  @released
end

#waitObject

Waits until the latch is released Returns immediately if already released



38
39
40
41
42
# File 'lib/waterdrop/polling/latch.rb', line 38

def wait
  @mutex.synchronize do
    @cv.wait(@mutex) until @released
  end
end