Class: CarbonFiber::Native::Selector
- Inherits:
-
Object
- Object
- CarbonFiber::Native::Selector
- Defined in:
- lib/carbon_fiber/native/fallback.rb
Overview
Pure-Ruby fallback selector using threads and condition variables.
Loaded automatically when the native Zig extension is unavailable. Provides the same Selector API as the native implementation so the Scheduler and Async adapter work unchanged.
Direct Known Subclasses
Instance Method Summary collapse
-
#block(fiber, timeout = nil) ⇒ Object
Suspend the current fiber until unblocked or timed out.
-
#cancel_block_timer(fiber) ⇒ Object
Called from Scheduler#fiber_done’s ensure block.
-
#cancel_timer(token) ⇒ Object
Cancel a pending timer by token.
-
#destroy ⇒ Object
No-op; nothing to release.
-
#initialize(loop_fiber) ⇒ Selector
constructor
A new instance of Selector.
-
#io_close(fd, exception) ⇒ Object
Cancel pending waiters on a closed descriptor.
-
#io_read(_fd, _buffer, _length, _offset) ⇒ Object
Returns nil; the Scheduler handles io_read via background thread.
-
#io_wait(fiber, fd, events) ⇒ Object
Wait for read readiness on a file descriptor via IO.select on a background thread.
-
#io_wait_with_timeout(fiber, fd, events, timeout) ⇒ Object
Like #io_wait but with a timeout.
-
#io_write(_fd, _buffer, _length, _offset) ⇒ Object
Returns nil; the Scheduler handles io_write via background thread.
-
#pending? ⇒ Boolean
Whether there is pending work.
-
#poll_readable_now(fd) ⇒ Object
Non-destructive check if a descriptor has data available to read.
-
#process_wait(_fiber, _pid, _flags) ⇒ Object
Returns nil; the Scheduler handles process_wait via background thread.
-
#push(fiber) ⇒ Object
Enqueue a fiber into the ready queue.
-
#raise(fiber, exception) ⇒ Object
Enqueue an exception delivery to a fiber.
-
#raise_after(fiber, exception, duration) ⇒ Object
Schedule an exception to be raised on a fiber after
durationseconds. -
#resume(fiber, value) ⇒ Object
Enqueue a fiber with a return value.
-
#select(timeout = nil) ⇒ Object
Run one event loop iteration.
-
#transfer ⇒ Object
Transfer control to the event loop fiber.
-
#unblock(fiber) ⇒ Object
Resume a fiber previously suspended by #block.
-
#wakeup ⇒ Object
Wake the event loop.
-
#yield ⇒ Object
Transfer to the event loop fiber.
Constructor Details
#initialize(loop_fiber) ⇒ Selector
Returns a new instance of Selector.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/carbon_fiber/native/fallback.rb', line 14 def initialize(loop_fiber) @loop_fiber = loop_fiber @mutex = Thread::Mutex.new @cv = Thread::ConditionVariable.new @ready = [] @timers = {} @next_timer = 1 @read_waits = {} @next_wait_token = 1 # Fibers voluntarily parked in block() or do_io_wait, mapped to the # sleep timer token (or nil). flush_ready consults this set to decide # whether a fiber whose transfer returned unexpectedly was interrupted # mid-execution (Ruby 4.0 Fiber#raise bypass) and needs re-queueing. @blocked_fibers = {} end |
Instance Method Details
#block(fiber, timeout = nil) ⇒ Object
Suspend the current fiber until unblocked or timed out.
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/carbon_fiber/native/fallback.rb', line 128 def block(fiber, timeout = nil) token = nil token = resume_after(fiber, timeout, false) if timeout @mutex.synchronize { @blocked_fibers[fiber] = token } result = @loop_fiber.transfer # Normal wakeup path: drop the tracking entry and cancel any still- # armed sleep timer. If raise() ran first, it already cancelled the # timer and zeroed the token; the raise-unwind path then relies on # cancel_block_timer (invoked from fiber_done) to remove the entry. @mutex.synchronize do stored = @blocked_fibers.delete(fiber) @timers.delete(stored) if stored end result end |
#cancel_block_timer(fiber) ⇒ Object
Called from Scheduler#fiber_done’s ensure block. Removes the fiber from the blocked set and cancels any still-armed sleep timer. This is the only cleanup path when a raise() unwinds the fiber past block()‘s normal return.
165 166 167 168 169 170 |
# File 'lib/carbon_fiber/native/fallback.rb', line 165 def cancel_block_timer(fiber) @mutex.synchronize do stored = @blocked_fibers.delete(fiber) @timers.delete(stored) if stored end end |
#cancel_timer(token) ⇒ Object
Cancel a pending timer by token.
157 158 159 |
# File 'lib/carbon_fiber/native/fallback.rb', line 157 def cancel_timer(token) @mutex.synchronize { !!@timers.delete(token) } end |
#destroy ⇒ Object
No-op; nothing to release.
32 33 34 |
# File 'lib/carbon_fiber/native/fallback.rb', line 32 def destroy true end |
#io_close(fd, exception) ⇒ Object
Cancel pending waiters on a closed descriptor.
189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/carbon_fiber/native/fallback.rb', line 189 def io_close(fd, exception) woke = false @mutex.synchronize do wait = @read_waits.delete(fd) if wait @ready << [:raise, wait[:fiber], exception, true] woke = true @cv.signal end end woke end |
#io_read(_fd, _buffer, _length, _offset) ⇒ Object
Returns nil; the Scheduler handles io_read via background thread.
210 211 212 |
# File 'lib/carbon_fiber/native/fallback.rb', line 210 def io_read(_fd, _buffer, _length, _offset) nil end |
#io_wait(fiber, fd, events) ⇒ Object
Wait for read readiness on a file descriptor via IO.select on a background thread. Returns nil for non-READABLE events (handled by the Scheduler fallback).
175 176 177 178 179 |
# File 'lib/carbon_fiber/native/fallback.rb', line 175 def io_wait(fiber, fd, events) return nil unless events == IO::READABLE do_io_wait(fiber, fd, nil) end |
#io_wait_with_timeout(fiber, fd, events, timeout) ⇒ Object
Like #io_wait but with a timeout.
182 183 184 185 186 |
# File 'lib/carbon_fiber/native/fallback.rb', line 182 def io_wait_with_timeout(fiber, fd, events, timeout) return nil unless events == IO::READABLE do_io_wait(fiber, fd, timeout) end |
#io_write(_fd, _buffer, _length, _offset) ⇒ Object
Returns nil; the Scheduler handles io_write via background thread.
215 216 217 |
# File 'lib/carbon_fiber/native/fallback.rb', line 215 def io_write(_fd, _buffer, _length, _offset) nil end |
#pending? ⇒ Boolean
Returns whether there is pending work.
37 38 39 |
# File 'lib/carbon_fiber/native/fallback.rb', line 37 def pending? @mutex.synchronize { @ready.any? || @timers.any? || @read_waits.any? } end |
#poll_readable_now(fd) ⇒ Object
Non-destructive check if a descriptor has data available to read.
220 221 222 223 224 225 226 227 228 |
# File 'lib/carbon_fiber/native/fallback.rb', line 220 def poll_readable_now(fd) io = IO.new(fd, autoclose: false) ready = IO.select([io], nil, nil, 0) !!ready rescue IOError, SystemCallError false ensure io.close if io && !io.closed? end |
#process_wait(_fiber, _pid, _flags) ⇒ Object
Returns nil; the Scheduler handles process_wait via background thread.
205 206 207 |
# File 'lib/carbon_fiber/native/fallback.rb', line 205 def process_wait(_fiber, _pid, _flags) nil end |
#push(fiber) ⇒ Object
Enqueue a fiber into the ready queue.
42 43 44 45 46 47 48 |
# File 'lib/carbon_fiber/native/fallback.rb', line 42 def push(fiber) @mutex.synchronize do @ready << [:resume, fiber, nil, false] @cv.signal end fiber end |
#raise(fiber, exception) ⇒ Object
Enqueue an exception delivery to a fiber.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/carbon_fiber/native/fallback.rb', line 60 def raise(fiber, exception) @mutex.synchronize do # Cancel any armed sleep timer for this fiber so its block() wakeup # doesn't spuriously fire after the raise. Zero the token but leave # the blocked_fibers entry—it's removed by cancel_block_timer when # the fiber's ensure runs, so flush_ready's re-queue check still # correctly treats the fiber as "parked" until it exits. if @blocked_fibers.key?(fiber) token = @blocked_fibers[fiber] @timers.delete(token) if token @blocked_fibers[fiber] = nil end @ready << [:raise, fiber, exception, true] @cv.signal end fiber end |
#raise_after(fiber, exception, duration) ⇒ Object
Schedule an exception to be raised on a fiber after duration seconds.
152 153 154 |
# File 'lib/carbon_fiber/native/fallback.rb', line 152 def raise_after(fiber, exception, duration) schedule_timer(duration, :raise, fiber, exception) end |
#resume(fiber, value) ⇒ Object
Enqueue a fiber with a return value.
51 52 53 54 55 56 57 |
# File 'lib/carbon_fiber/native/fallback.rb', line 51 def resume(fiber, value) @mutex.synchronize do @ready << [:resume, fiber, value, true] @cv.signal end fiber end |
#select(timeout = nil) ⇒ Object
Run one event loop iteration.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/carbon_fiber/native/fallback.rb', line 101 def select(timeout = nil) flush_ready return 0 unless pending? deadline = next_wait_deadline(timeout) @mutex.synchronize do until @ready.any? collect_expired_timers_locked break if @ready.any? if deadline remaining = deadline - monotonic_time break if remaining <= 0 @cv.wait(@mutex, remaining) else @cv.wait(@mutex) end end end collect_expired_timers flush_ready end |
#transfer ⇒ Object
Transfer control to the event loop fiber.
85 86 87 88 89 |
# File 'lib/carbon_fiber/native/fallback.rb', line 85 def transfer return nil if Fiber.current.equal?(@loop_fiber) @loop_fiber.transfer end |
#unblock(fiber) ⇒ Object
Resume a fiber previously suspended by #block.
147 148 149 |
# File 'lib/carbon_fiber/native/fallback.rb', line 147 def unblock(fiber) resume(fiber, true) end |
#wakeup ⇒ Object
Wake the event loop.
79 80 81 82 |
# File 'lib/carbon_fiber/native/fallback.rb', line 79 def wakeup @mutex.synchronize { @cv.signal } true end |
#yield ⇒ Object
Transfer to the event loop fiber. flush_ready’s re-queue logic puts us back in the ready queue on the next pass—no explicit self-push needed, and avoiding it prevents duplicate ready entries.
94 95 96 97 98 |
# File 'lib/carbon_fiber/native/fallback.rb', line 94 def yield return nil if Fiber.current.equal?(@loop_fiber) @loop_fiber.transfer end |