Class: CarbonFiber::Async::Selector
- Inherits:
-
Native::Selector
- Object
- Native::Selector
- CarbonFiber::Async::Selector
- Defined in:
- lib/carbon_fiber/async.rb
Overview
IO::Event::Selector-compatible adapter backed by our native Zig selector. Subclasses Native::Selector so hot-path methods (transfer, yield, wakeup) dispatch directly to native code without an extra Ruby method frame.
Registration:
require "async"
require "carbon_fiber/async"
CarbonFiber::Async.default!
Or via environment:
IO_EVENT_SELECTOR=CarbonFiberSelector ruby app.rb
Constant Summary collapse
- EAGAIN =
Native Zig io_read/io_write use recv/send with kernel buffer draining. Falls back to Ruby-level nonblock+io_wait for non-socket fds (pipes, files) where native returns nil.
-Errno::EAGAIN::Errno
- EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno
Instance Attribute Summary collapse
-
#idle_duration ⇒ Float
readonly
Seconds spent idle in the last #select call.
-
#loop ⇒ Fiber
readonly
The event loop fiber.
Instance Method Summary collapse
-
#close ⇒ Object
Release native resources.
-
#initialize(loop) ⇒ Selector
constructor
A new instance of Selector.
-
#io_close(io) ⇒ Object
Cancel pending waiters and close the descriptor.
-
#io_read(fiber, io, buffer, length, offset = 0) ⇒ Integer
Bytes read, or negative errno.
-
#io_wait(fiber, io, events) ⇒ Integer, false
Wait for I/O readiness.
-
#io_write(fiber, io, buffer, length, offset = 0) ⇒ Integer
Bytes written, or negative errno.
-
#process_wait(fiber, pid, flags) ⇒ Process::Status
Wait for a child process on a background thread.
-
#push(fiber) ⇒ Object
Enqueue a fiber or fiber-like object into the ready queue.
-
#raise(fiber, *arguments, **options) ⇒ Object
Re-enqueue the current fiber and raise on
fiber. -
#ready? ⇒ Boolean
Whether there is pending work.
-
#resume(fiber, *arguments) ⇒ Object
Re-enqueue the current fiber and transfer to
fiberwith arguments. -
#select(duration = nil) ⇒ Object
Run one event loop iteration, draining the auxiliary queue before and after the native select.
Methods inherited from Native::Selector
#block, #cancel_block_timer, #cancel_timer, #destroy, #io_wait_with_timeout, #pending?, #poll_readable_now, #raise_after, #transfer, #unblock, #wakeup, #yield
Constructor Details
#initialize(loop) ⇒ Selector
Returns a new instance of Selector.
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/carbon_fiber/async.rb', line 29 def initialize(loop) super @loop = loop @idle_duration = 0.0 # Auxiliary ready queue for non-Fiber pushables (e.g. FiberInterrupt) # and cross-thread pushes of non-Fiber objects. Thread-safe via mutex. @auxiliary = [] @auxiliary_mutex = Mutex.new end |
Instance Attribute Details
#idle_duration ⇒ Float (readonly)
Returns seconds spent idle in the last #select call.
23 24 25 |
# File 'lib/carbon_fiber/async.rb', line 23 def idle_duration @idle_duration end |
#loop ⇒ Fiber (readonly)
Returns the event loop fiber.
26 27 28 |
# File 'lib/carbon_fiber/async.rb', line 26 def loop @loop end |
Instance Method Details
#close ⇒ Object
Release native resources.
45 46 47 |
# File 'lib/carbon_fiber/async.rb', line 45 def close destroy end |
#io_close(io) ⇒ Object
Cancel pending waiters and close the descriptor.
148 149 150 151 |
# File 'lib/carbon_fiber/async.rb', line 148 def io_close(io) fd = io.respond_to?(:fileno) ? io.fileno : io.to_i super(fd, IOError.new("stream closed while waiting")) end |
#io_read(fiber, io, buffer, length, offset = 0) ⇒ Integer
Returns bytes read, or negative errno.
126 127 128 129 130 131 |
# File 'lib/carbon_fiber/async.rb', line 126 def io_read(fiber, io, buffer, length, offset = 0) result = native_io_read(io.fileno, buffer, length, offset) return result unless result.nil? ruby_io_read(fiber, io, buffer, length, offset) end |
#io_wait(fiber, io, events) ⇒ Integer, false
Wait for I/O readiness. Falls back to IO.select on a background thread when the native path returns nil (kqueue WRITE bypass, closed fd, duplicate waiter).
106 107 108 109 110 111 |
# File 'lib/carbon_fiber/async.rb', line 106 def io_wait(fiber, io, events) result = native_io_wait(fiber, io.fileno, events) return result unless result.nil? fallback_io_wait(io, events) end |
#io_write(fiber, io, buffer, length, offset = 0) ⇒ Integer
Returns bytes written, or negative errno.
139 140 141 142 143 144 |
# File 'lib/carbon_fiber/async.rb', line 139 def io_write(fiber, io, buffer, length, offset = 0) result = native_io_write(io.fileno, buffer, length, offset) return result unless result.nil? ruby_io_write(fiber, io, buffer, length, offset) end |
#process_wait(fiber, pid, flags) ⇒ Process::Status
Wait for a child process on a background thread.
158 159 160 161 162 163 |
# File 'lib/carbon_fiber/async.rb', line 158 def process_wait(fiber, pid, flags) Thread.new do Thread.current.report_on_exception = false Process::Status.wait(pid, flags) end.value end |
#push(fiber) ⇒ Object
Enqueue a fiber or fiber-like object into the ready queue.
51 52 53 54 55 56 57 |
# File 'lib/carbon_fiber/async.rb', line 51 def push(fiber) if fiber.is_a?(Fiber) super else @auxiliary_mutex.synchronize { @auxiliary << fiber } end end |
#raise(fiber, *arguments, **options) ⇒ Object
Re-enqueue the current fiber and raise on fiber.
70 71 72 73 74 |
# File 'lib/carbon_fiber/async.rb', line 70 def raise(fiber, *arguments, **) current = Fiber.current native_push(current) unless current.equal?(@loop) fiber.raise(*arguments, **) end |
#ready? ⇒ Boolean
Returns whether there is pending work.
77 78 79 |
# File 'lib/carbon_fiber/async.rb', line 77 def ready? !@auxiliary.empty? || pending? end |
#resume(fiber, *arguments) ⇒ Object
Re-enqueue the current fiber and transfer to fiber with arguments.
62 63 64 65 66 |
# File 'lib/carbon_fiber/async.rb', line 62 def resume(fiber, *arguments) current = Fiber.current native_push(current) unless current.equal?(@loop) fiber.transfer(*arguments) end |
#select(duration = nil) ⇒ Object
Run one event loop iteration, draining the auxiliary queue before and after the native select.
Note: idle_duration is not actually measured—it stays at 0.0. Async uses it for load stats only (not correctness), and the two Process.clock_gettime calls plus Float allocation cost ~1-2% on select-heavy workloads.
91 92 93 94 95 |
# File 'lib/carbon_fiber/async.rb', line 91 def select(duration = nil) drain_auxiliary super drain_auxiliary end |