Class: CarbonFiber::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/carbon_fiber/scheduler.rb

Overview

Implements the Ruby Fiber Scheduler interface.

Delegates I/O and timer operations to a native Zig selector (io_uring on Linux, kqueue on macOS). Operations the native layer doesn’t cover (DNS, process_wait) run on background threads.

Examples:

scheduler = CarbonFiber::Scheduler.new
Fiber.set_scheduler(scheduler)
Fiber.schedule { sleep 1; puts "done" }
scheduler.run
Fiber.set_scheduler(nil)

Instance Method Summary collapse

Constructor Details

#initialize(root_fiber = Fiber.current, selector: CarbonFiber::Native::Selector) ⇒ Scheduler

Returns a new instance of Scheduler.

Parameters:

  • root_fiber (Fiber) (defaults to: Fiber.current)

    the event loop fiber (defaults to current)

  • selector (Class) (defaults to: CarbonFiber::Native::Selector)

    native selector class to instantiate



46
47
48
49
50
51
52
53
54
# File 'lib/carbon_fiber/scheduler.rb', line 46

def initialize(root_fiber = Fiber.current, selector: CarbonFiber::Native::Selector)
  @root_fiber = root_fiber
  @scheduler_thread = Thread.current
  @selector = selector.new(root_fiber)
  @active_fibers = 0
  @background_count = 0
  @closed = false
  @closing = false
end

Instance Method Details

#address_resolve(hostname) ⇒ Array<String>

Resolve a hostname to addresses via Resolv.

Parameters:

  • hostname (String)

Returns:

  • (Array<String>)


284
285
286
287
288
289
# File 'lib/carbon_fiber/scheduler.rb', line 284

def address_resolve(hostname)
  if hostname.include?("%")
    hostname = hostname.split("%", 2).first
  end
  Resolv.getaddresses(hostname)
end

#block(_blocker, timeout = nil) ⇒ Object

Suspend the current fiber until unblocked or timed out.

Parameters:

  • _blocker (Object)

    unused, required by the protocol

  • timeout (Float, nil) (defaults to: nil)

    seconds before automatic resume



157
158
159
# File 'lib/carbon_fiber/scheduler.rb', line 157

def block(_blocker, timeout = nil)
  @selector.block(Fiber.current, timeout)
end

#blocking_operation_wait(work) ⇒ Object

Run an arbitrary callable on a background thread.

Parameters:

  • work (#call)


293
294
295
296
297
# File 'lib/carbon_fiber/scheduler.rb', line 293

def blocking_operation_wait(work)
  await_background_operation do
    work.call
  end
end

#close(internal = false) ⇒ Object

Drain pending work and release the native selector.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/carbon_fiber/scheduler.rb', line 62

def close(internal = false)
  return true if @closed || @closing

  unless internal
    return Fiber.set_scheduler(nil) if Fiber.scheduler == self
  end

  @closing = true
  run
  true
ensure
  unless @closed
    @selector&.destroy
    @closed = true
    @closing = false
    freeze
  end
end

#closed?Boolean

Returns whether the scheduler has been closed.

Returns:

  • (Boolean)

    whether the scheduler has been closed



82
83
84
# File 'lib/carbon_fiber/scheduler.rb', line 82

def closed?
  @closed
end

#current_timeFloat

Monotonic clock used by the scheduler for timers.

Returns:

  • (Float)

    seconds since an arbitrary epoch



88
89
90
# File 'lib/carbon_fiber/scheduler.rb', line 88

def current_time
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#fiber { ... } ⇒ Fiber

Create and schedule a non-blocking fiber.

Yields:

  • the block to run inside the fiber

Returns:

  • (Fiber)


95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/carbon_fiber/scheduler.rb', line 95

def fiber(&block)
  fiber = Fiber.new(blocking: false) do
    block.call
  ensure
    fiber_done
  end

  @active_fibers += 1
  @selector.push(fiber)
  @selector.wakeup unless Thread.current.equal?(@scheduler_thread)

  fiber
end

#fiber_interrupt(fiber, exception) ⇒ Object

Deliver an exception to a fiber from another fiber.

Parameters:

  • fiber (Fiber)
  • exception (Exception)


302
303
304
305
306
# File 'lib/carbon_fiber/scheduler.rb', line 302

def fiber_interrupt(fiber, exception)
  @selector.raise(fiber, exception)
  @selector.wakeup
  true
end

#io_close(io) ⇒ Object

Cancel pending waiters on an IO and close the descriptor.

Parameters:

  • io (IO)


251
252
253
254
255
256
257
258
259
260
261
# File 'lib/carbon_fiber/scheduler.rb', line 251

def io_close(io)
  descriptor = io.respond_to?(:to_i) ? io.to_i : io
  @selector.io_close(descriptor, IOError.new("stream closed while waiting"))

  Fiber.blocking do
    target = io.is_a?(IO) ? io : IO.for_fd(descriptor.to_i)
    target.close unless target.closed?
  end

  true
end

#io_read(io, buffer, length, offset = 0) ⇒ Integer

Read from an IO into a buffer via the native selector. Falls back to a background thread for non-socket descriptors.

Parameters:

  • io (IO)
  • buffer (IO::Buffer)
  • length (Integer)
  • offset (Integer) (defaults to: 0)

Returns:

  • (Integer)

    bytes read, or negative errno



207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/carbon_fiber/scheduler.rb', line 207

def io_read(io, buffer, length, offset = 0)
  # Native io_read_object extracts the descriptor in Zig, skipping a
  # `respond_to?(:fileno)` + `io.fileno` method-send pair per call.
  native_result = @selector.io_read_object(io, buffer, length, offset)
  return native_result unless native_result.nil?

  await_background_operation do
    Fiber.blocking { buffer.read(io, length, offset) }
  end
rescue NoMethodError, TypeError
  await_background_operation do
    Fiber.blocking { buffer.read(io, length, offset) }
  end
end

#io_selectObject

Blocking IO.select on a background thread.



243
244
245
246
247
# File 'lib/carbon_fiber/scheduler.rb', line 243

def io_select(...)
  await_background_operation do
    Fiber.blocking { IO.select(...) }
  end
end

#io_wait(io, events, timeout = nil) ⇒ Integer, false

Wait for I/O readiness on a file descriptor.

Parameters:

  • io (IO)
  • events (Integer)

    bitmask of IO::READABLE, IO::WRITABLE

  • timeout (Float, nil) (defaults to: nil)

Returns:

  • (Integer, false)

    readiness bitmask, or false on timeout



188
189
190
191
192
193
194
195
196
197
198
# File 'lib/carbon_fiber/scheduler.rb', line 188

def io_wait(io, events, timeout = nil)
  return poll_io_now(io, events) if timeout == 0

  # Native io_wait_object handles fileno extraction, Fiber.current,
  # and nil/numeric timeout in Zig — skipping a Ruby frame + branch
  # per call on Net::HTTP's hot read/write loop.
  result = @selector.io_wait_object(io, events, timeout)
  result.nil? ? await_background_operation { io_select_readiness(io, events, timeout) } : result
rescue NoMethodError, TypeError
  await_background_operation { io_select_readiness(io, events, timeout) }
end

#io_write(io, buffer, length, offset = 0) ⇒ Integer

Write from a buffer to an IO via the native selector. Falls back to a background thread for non-socket descriptors.

Parameters:

  • io (IO)
  • buffer (IO::Buffer)
  • length (Integer)
  • offset (Integer) (defaults to: 0)

Returns:

  • (Integer)

    bytes written, or negative errno



229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/carbon_fiber/scheduler.rb', line 229

def io_write(io, buffer, length, offset = 0)
  native_result = @selector.io_write_object(io, buffer, length, offset)
  return native_result unless native_result.nil?

  await_background_operation do
    Fiber.blocking { buffer.write(io, length, offset) }
  end
rescue NoMethodError, TypeError
  await_background_operation do
    Fiber.blocking { buffer.write(io, length, offset) }
  end
end

#kernel_sleep(duration = nil) ⇒ Object

Intercept Kernel#sleep. Parks the fiber on a native timer.

Parameters:

  • duration (Float, nil) (defaults to: nil)

    seconds to sleep; nil sleeps forever



171
172
173
174
175
176
177
178
179
180
181
# File 'lib/carbon_fiber/scheduler.rb', line 171

def kernel_sleep(duration = nil)
  if duration.nil?
    transfer
  elsif duration <= 0
    self.yield
  else
    block(nil, duration)
  end

  true
end

#process_wait(pid, flags) ⇒ Process::Status

Wait for a child process on a background thread.

Parameters:

  • pid (Integer)
  • flags (Integer)

    waitpid flags

Returns:

  • (Process::Status)


267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/carbon_fiber/scheduler.rb', line 267

def process_wait(pid, flags)
  # Ruby 4.0 bug: rb_process_status_wait re-enters the scheduler hook,
  # so native process_wait produces an incorrect status. Background-thread
  # waitpid avoids this because new threads have no scheduler installed.
  await_background_operation do
    if flags.zero?
      Process::Status.wait(pid, flags)
    else
      _waited_pid, status = Process.waitpid2(pid, flags)
      status
    end
  end
end

#push(fiber) ⇒ Object

Enqueue a fiber into the ready queue.

Parameters:

  • fiber (Fiber)


121
122
123
# File 'lib/carbon_fiber/scheduler.rb', line 121

def push(fiber)
  @selector.push(fiber)
end

#raise(fiber, exception) ⇒ Object

Deliver an exception to a suspended fiber.

Parameters:

  • fiber (Fiber)
  • exception (Exception)


139
140
141
# File 'lib/carbon_fiber/scheduler.rb', line 139

def raise(fiber, exception)
  @selector.raise(fiber, exception)
end

#resume(fiber, *arguments) ⇒ Object

Resume a fiber, optionally passing a value.

Parameters:

  • fiber (Fiber)
  • arguments (Array)

    at most one value to pass to the fiber



128
129
130
131
132
133
134
# File 'lib/carbon_fiber/scheduler.rb', line 128

def resume(fiber, *arguments)
  if arguments.empty?
    @selector.push(fiber)
  else
    @selector.resume(fiber, arguments.first)
  end
end

#runObject

Run the event loop until all fibers and background operations complete.



326
327
328
329
330
331
# File 'lib/carbon_fiber/scheduler.rb', line 326

def run
  Kernel.raise RuntimeError, "Scheduler has been closed" if closed?

  run_once until idle?
  true
end

#run_once(timeout = nil) ⇒ Object

Run one event loop iteration. Alias for #select.



321
322
323
# File 'lib/carbon_fiber/scheduler.rb', line 321

def run_once(timeout = nil)
  @selector.select(timeout)
end

#scheduler_closeObject

Called by Ruby when Fiber.set_scheduler(nil) is invoked.



57
58
59
# File 'lib/carbon_fiber/scheduler.rb', line 57

def scheduler_close
  close(true)
end

#select(timeout = nil) ⇒ Object

Run one iteration of the event loop.

Parameters:

  • timeout (Float, nil) (defaults to: nil)

    maximum seconds to wait



150
151
152
# File 'lib/carbon_fiber/scheduler.rb', line 150

def select(timeout = nil)
  @selector.select(timeout)
end

#timeout_after(duration, klass = Timeout::Error, message = "execution expired", &block) ⇒ Object

Run a block with a timeout, raising an exception if it expires.

Parameters:

  • duration (Float)

    seconds

  • klass (Class, Exception) (defaults to: Timeout::Error)

    exception class or instance

  • message (String) (defaults to: "execution expired")


312
313
314
315
316
317
318
# File 'lib/carbon_fiber/scheduler.rb', line 312

def timeout_after(duration, klass = Timeout::Error, message = "execution expired", &block)
  exc = klass.is_a?(Class) ? klass.new(message) : klass
  token = @selector.raise_after(Fiber.current, exc, duration)
  block.call(duration)
ensure
  @selector.cancel_timer(token) if token
end

#transferObject

Transfer control to the next ready fiber or the event loop.



110
111
112
# File 'lib/carbon_fiber/scheduler.rb', line 110

def transfer
  @selector.transfer
end

#unblock(_blocker, fiber) ⇒ Object

Resume a fiber previously suspended by #block.

Parameters:

  • _blocker (Object)

    unused, required by the protocol

  • fiber (Fiber)


164
165
166
167
# File 'lib/carbon_fiber/scheduler.rb', line 164

def unblock(_blocker, fiber)
  @selector.unblock(fiber)
  true
end

#wakeupObject

Wake the event loop (thread-safe).



144
145
146
# File 'lib/carbon_fiber/scheduler.rb', line 144

def wakeup
  @selector.wakeup
end

#yieldObject

Re-enqueue the current fiber and transfer to the event loop.



115
116
117
# File 'lib/carbon_fiber/scheduler.rb', line 115

def yield
  @selector.yield
end