Class: CarbonFiber::Scheduler
- Inherits:
-
Object
- Object
- CarbonFiber::Scheduler
- Defined in:
- lib/carbon_fiber/scheduler.rb
Overview
Implements the Ruby Fiber Scheduler interface.
Delegates I/O and timer operations to a native Zig selector (io_uring on Linux, kqueue on macOS). Operations the native layer doesn’t cover (DNS, process_wait) run on background threads.
Instance Method Summary collapse
-
#address_resolve(hostname) ⇒ Array<String>
Resolve a hostname to addresses via Resolv.
-
#block(_blocker, timeout = nil) ⇒ Object
Suspend the current fiber until unblocked or timed out.
-
#blocking_operation_wait(work) ⇒ Object
Run an arbitrary callable on a background thread.
-
#close(internal = false) ⇒ Object
Drain pending work and release the native selector.
-
#closed? ⇒ Boolean
Whether the scheduler has been closed.
-
#current_time ⇒ Float
Monotonic clock used by the scheduler for timers.
-
#fiber { ... } ⇒ Fiber
Create and schedule a non-blocking fiber.
-
#fiber_interrupt(fiber, exception) ⇒ Object
Deliver an exception to a fiber from another fiber.
-
#initialize(root_fiber = Fiber.current, selector: CarbonFiber::Native::Selector) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#io_close(io) ⇒ Object
Cancel pending waiters on an IO and close the descriptor.
-
#io_read(io, buffer, length, offset = 0) ⇒ Integer
Read from an IO into a buffer via the native selector.
-
#io_select ⇒ Object
Blocking IO.select on a background thread.
-
#io_wait(io, events, timeout = nil) ⇒ Integer, false
Wait for I/O readiness on a file descriptor.
-
#io_write(io, buffer, length, offset = 0) ⇒ Integer
Write from a buffer to an IO via the native selector.
-
#kernel_sleep(duration = nil) ⇒ Object
Intercept Kernel#sleep.
-
#process_wait(pid, flags) ⇒ Process::Status
Wait for a child process on a background thread.
-
#push(fiber) ⇒ Object
Enqueue a fiber into the ready queue.
-
#raise(fiber, exception) ⇒ Object
Deliver an exception to a suspended fiber.
-
#resume(fiber, *arguments) ⇒ Object
Resume a fiber, optionally passing a value.
-
#run ⇒ Object
Run the event loop until all fibers and background operations complete.
-
#run_once(timeout = nil) ⇒ Object
Run one event loop iteration.
-
#scheduler_close ⇒ Object
Called by Ruby when Fiber.set_scheduler(nil) is invoked.
-
#select(timeout = nil) ⇒ Object
Run one iteration of the event loop.
-
#timeout_after(duration, klass = Timeout::Error, message = "execution expired", &block) ⇒ Object
Run a block with a timeout, raising an exception if it expires.
-
#transfer ⇒ Object
Transfer control to the next ready fiber or the event loop.
-
#unblock(_blocker, fiber) ⇒ Object
Resume a fiber previously suspended by #block.
-
#wakeup ⇒ Object
Wake the event loop (thread-safe).
-
#yield ⇒ Object
Re-enqueue the current fiber and transfer to the event loop.
Constructor Details
#initialize(root_fiber = Fiber.current, selector: CarbonFiber::Native::Selector) ⇒ Scheduler
Returns a new instance of Scheduler.
46 47 48 49 50 51 52 53 54 |
# File 'lib/carbon_fiber/scheduler.rb', line 46 def initialize(root_fiber = Fiber.current, selector: CarbonFiber::Native::Selector) @root_fiber = root_fiber @scheduler_thread = Thread.current @selector = selector.new(root_fiber) @active_fibers = 0 @background_count = 0 @closed = false @closing = false end |
Instance Method Details
#address_resolve(hostname) ⇒ Array<String>
Resolve a hostname to addresses via Resolv.
284 285 286 287 288 289 |
# File 'lib/carbon_fiber/scheduler.rb', line 284 def address_resolve(hostname) if hostname.include?("%") hostname = hostname.split("%", 2).first end Resolv.getaddresses(hostname) end |
#block(_blocker, timeout = nil) ⇒ Object
Suspend the current fiber until unblocked or timed out.
157 158 159 |
# File 'lib/carbon_fiber/scheduler.rb', line 157 def block(_blocker, timeout = nil) @selector.block(Fiber.current, timeout) end |
#blocking_operation_wait(work) ⇒ Object
Run an arbitrary callable on a background thread.
293 294 295 296 297 |
# File 'lib/carbon_fiber/scheduler.rb', line 293 def blocking_operation_wait(work) await_background_operation do work.call end end |
#close(internal = false) ⇒ Object
Drain pending work and release the native selector.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/carbon_fiber/scheduler.rb', line 62 def close(internal = false) return true if @closed || @closing unless internal return Fiber.set_scheduler(nil) if Fiber.scheduler == self end @closing = true run true ensure unless @closed @selector&.destroy @closed = true @closing = false freeze end end |
#closed? ⇒ Boolean
Returns whether the scheduler has been closed.
82 83 84 |
# File 'lib/carbon_fiber/scheduler.rb', line 82 def closed? @closed end |
#current_time ⇒ Float
Monotonic clock used by the scheduler for timers.
88 89 90 |
# File 'lib/carbon_fiber/scheduler.rb', line 88 def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#fiber { ... } ⇒ Fiber
Create and schedule a non-blocking fiber.
95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/carbon_fiber/scheduler.rb', line 95 def fiber(&block) fiber = Fiber.new(blocking: false) do block.call ensure fiber_done end @active_fibers += 1 @selector.push(fiber) @selector.wakeup unless Thread.current.equal?(@scheduler_thread) fiber end |
#fiber_interrupt(fiber, exception) ⇒ Object
Deliver an exception to a fiber from another fiber.
302 303 304 305 306 |
# File 'lib/carbon_fiber/scheduler.rb', line 302 def fiber_interrupt(fiber, exception) @selector.raise(fiber, exception) @selector.wakeup true end |
#io_close(io) ⇒ Object
Cancel pending waiters on an IO and close the descriptor.
251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/carbon_fiber/scheduler.rb', line 251 def io_close(io) descriptor = io.respond_to?(:to_i) ? io.to_i : io @selector.io_close(descriptor, IOError.new("stream closed while waiting")) Fiber.blocking do target = io.is_a?(IO) ? io : IO.for_fd(descriptor.to_i) target.close unless target.closed? end true end |
#io_read(io, buffer, length, offset = 0) ⇒ Integer
Read from an IO into a buffer via the native selector. Falls back to a background thread for non-socket descriptors.
207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/carbon_fiber/scheduler.rb', line 207 def io_read(io, buffer, length, offset = 0) # Native io_read_object extracts the descriptor in Zig, skipping a # `respond_to?(:fileno)` + `io.fileno` method-send pair per call. native_result = @selector.io_read_object(io, buffer, length, offset) return native_result unless native_result.nil? await_background_operation do Fiber.blocking { buffer.read(io, length, offset) } end rescue NoMethodError, TypeError await_background_operation do Fiber.blocking { buffer.read(io, length, offset) } end end |
#io_select ⇒ Object
Blocking IO.select on a background thread.
243 244 245 246 247 |
# File 'lib/carbon_fiber/scheduler.rb', line 243 def io_select(...) await_background_operation do Fiber.blocking { IO.select(...) } end end |
#io_wait(io, events, timeout = nil) ⇒ Integer, false
Wait for I/O readiness on a file descriptor.
188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/carbon_fiber/scheduler.rb', line 188 def io_wait(io, events, timeout = nil) return poll_io_now(io, events) if timeout == 0 # Native io_wait_object handles fileno extraction, Fiber.current, # and nil/numeric timeout in Zig — skipping a Ruby frame + branch # per call on Net::HTTP's hot read/write loop. result = @selector.io_wait_object(io, events, timeout) result.nil? ? await_background_operation { io_select_readiness(io, events, timeout) } : result rescue NoMethodError, TypeError await_background_operation { io_select_readiness(io, events, timeout) } end |
#io_write(io, buffer, length, offset = 0) ⇒ Integer
Write from a buffer to an IO via the native selector. Falls back to a background thread for non-socket descriptors.
229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/carbon_fiber/scheduler.rb', line 229 def io_write(io, buffer, length, offset = 0) native_result = @selector.io_write_object(io, buffer, length, offset) return native_result unless native_result.nil? await_background_operation do Fiber.blocking { buffer.write(io, length, offset) } end rescue NoMethodError, TypeError await_background_operation do Fiber.blocking { buffer.write(io, length, offset) } end end |
#kernel_sleep(duration = nil) ⇒ Object
Intercept Kernel#sleep. Parks the fiber on a native timer.
171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/carbon_fiber/scheduler.rb', line 171 def kernel_sleep(duration = nil) if duration.nil? transfer elsif duration <= 0 self.yield else block(nil, duration) end true end |
#process_wait(pid, flags) ⇒ Process::Status
Wait for a child process on a background thread.
267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/carbon_fiber/scheduler.rb', line 267 def process_wait(pid, flags) # Ruby 4.0 bug: rb_process_status_wait re-enters the scheduler hook, # so native process_wait produces an incorrect status. Background-thread # waitpid avoids this because new threads have no scheduler installed. await_background_operation do if flags.zero? Process::Status.wait(pid, flags) else _waited_pid, status = Process.waitpid2(pid, flags) status end end end |
#push(fiber) ⇒ Object
Enqueue a fiber into the ready queue.
121 122 123 |
# File 'lib/carbon_fiber/scheduler.rb', line 121 def push(fiber) @selector.push(fiber) end |
#raise(fiber, exception) ⇒ Object
Deliver an exception to a suspended fiber.
139 140 141 |
# File 'lib/carbon_fiber/scheduler.rb', line 139 def raise(fiber, exception) @selector.raise(fiber, exception) end |
#resume(fiber, *arguments) ⇒ Object
Resume a fiber, optionally passing a value.
128 129 130 131 132 133 134 |
# File 'lib/carbon_fiber/scheduler.rb', line 128 def resume(fiber, *arguments) if arguments.empty? @selector.push(fiber) else @selector.resume(fiber, arguments.first) end end |
#run ⇒ Object
Run the event loop until all fibers and background operations complete.
326 327 328 329 330 331 |
# File 'lib/carbon_fiber/scheduler.rb', line 326 def run Kernel.raise RuntimeError, "Scheduler has been closed" if closed? run_once until idle? true end |
#run_once(timeout = nil) ⇒ Object
Run one event loop iteration. Alias for #select.
321 322 323 |
# File 'lib/carbon_fiber/scheduler.rb', line 321 def run_once(timeout = nil) @selector.select(timeout) end |
#scheduler_close ⇒ Object
Called by Ruby when Fiber.set_scheduler(nil) is invoked.
57 58 59 |
# File 'lib/carbon_fiber/scheduler.rb', line 57 def scheduler_close close(true) end |
#select(timeout = nil) ⇒ Object
Run one iteration of the event loop.
150 151 152 |
# File 'lib/carbon_fiber/scheduler.rb', line 150 def select(timeout = nil) @selector.select(timeout) end |
#timeout_after(duration, klass = Timeout::Error, message = "execution expired", &block) ⇒ Object
Run a block with a timeout, raising an exception if it expires.
312 313 314 315 316 317 318 |
# File 'lib/carbon_fiber/scheduler.rb', line 312 def timeout_after(duration, klass = Timeout::Error, = "execution expired", &block) exc = klass.is_a?(Class) ? klass.new() : klass token = @selector.raise_after(Fiber.current, exc, duration) block.call(duration) ensure @selector.cancel_timer(token) if token end |
#transfer ⇒ Object
Transfer control to the next ready fiber or the event loop.
110 111 112 |
# File 'lib/carbon_fiber/scheduler.rb', line 110 def transfer @selector.transfer end |
#unblock(_blocker, fiber) ⇒ Object
Resume a fiber previously suspended by #block.
164 165 166 167 |
# File 'lib/carbon_fiber/scheduler.rb', line 164 def unblock(_blocker, fiber) @selector.unblock(fiber) true end |
#wakeup ⇒ Object
Wake the event loop (thread-safe).
144 145 146 |
# File 'lib/carbon_fiber/scheduler.rb', line 144 def wakeup @selector.wakeup end |
#yield ⇒ Object
Re-enqueue the current fiber and transfer to the event loop.
115 116 117 |
# File 'lib/carbon_fiber/scheduler.rb', line 115 def yield @selector.yield end |