Class: WaterDrop::Polling::Latch
- Inherits:
-
Object
- Object
- WaterDrop::Polling::Latch
- Defined in:
- lib/waterdrop/polling/latch.rb
Overview
A thread-safe latch for synchronizing producer close operations
When a producer is closed, two threads are involved:
-
The caller thread (user code calling producer.close)
-
The poller thread (background thread running IO.select)
The close sequence:
-
Caller calls producer.close -> unregister_from_poller -> Poller#unregister
-
Poller#unregister signals via control pipe and calls state.wait_for_close (blocks on latch)
-
Poller thread receives control signal, drains queue, calls state.close
-
state.close releases the latch via release!
-
Caller’s wait_for_close returns, unregister completes
This ensures the producer is fully drained and removed from the poller before returning control to the caller, preventing race conditions.
Instance Method Summary collapse
-
#initialize ⇒ Latch
constructor
Initializes a new latch in the unreleased state.
-
#release! ⇒ Object
Releases the latch and wakes any waiting threads.
-
#released? ⇒ Boolean
Whether the latch has been released.
-
#wait ⇒ Object
Waits until the latch is released Returns immediately if already released.
Constructor Details
#initialize ⇒ Latch
Initializes a new latch in the unreleased state.
22 23 24 25 26 |
# File 'lib/waterdrop/polling/latch.rb', line 22 def initialize @mutex = Mutex.new @cv = ConditionVariable.new @released = false end |
Instance Method Details
#release! ⇒ Object
Releases the latch and wakes any waiting threads
29 30 31 32 33 34 |
# File 'lib/waterdrop/polling/latch.rb', line 29 def release! @mutex.synchronize do @released = true @cv.broadcast end end |
#released? ⇒ Boolean
Returns whether the latch has been released.
45 46 47 |
# File 'lib/waterdrop/polling/latch.rb', line 45 def released? @released end |
#wait ⇒ Object
Waits until the latch is released Returns immediately if already released
38 39 40 41 42 |
# File 'lib/waterdrop/polling/latch.rb', line 38 def wait @mutex.synchronize do @cv.wait(@mutex) until @released end end |