Class: CarbonFiber::Async::Selector

Inherits:
Native::Selector show all
Defined in:
lib/carbon_fiber/async.rb

Overview

IO::Event::Selector-compatible adapter backed by our native Zig selector. Subclasses Native::Selector so hot-path methods (transfer, yield, wakeup) dispatch directly to native code without an extra Ruby method frame.

Registration:

require "async"
require "carbon_fiber/async"
CarbonFiber::Async.default!

Or via environment:

IO_EVENT_SELECTOR=CarbonFiberSelector ruby app.rb

Constant Summary collapse

EAGAIN =

Native Zig io_read/io_write use recv/send with kernel buffer draining. Falls back to Ruby-level nonblock+io_wait for non-socket fds (pipes, files) where native returns nil.

-Errno::EAGAIN::Errno
EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Native::Selector

#block, #cancel_block_timer, #cancel_timer, #destroy, #io_wait_with_timeout, #pending?, #poll_readable_now, #raise_after, #transfer, #unblock, #wakeup, #yield

Constructor Details

#initialize(loop) ⇒ Selector

Returns a new instance of Selector.

Parameters:

  • loop (Fiber)

    the Async event loop fiber



29
30
31
32
33
34
35
36
37
38
# File 'lib/carbon_fiber/async.rb', line 29

def initialize(loop)
  super
  @loop = loop
  @idle_duration = 0.0

  # Auxiliary ready queue for non-Fiber pushables (e.g. FiberInterrupt)
  # and cross-thread pushes of non-Fiber objects. Thread-safe via mutex.
  @auxiliary = []
  @auxiliary_mutex = Mutex.new
end

Instance Attribute Details

#idle_durationFloat (readonly)

Returns seconds spent idle in the last #select call.

Returns:

  • (Float)

    seconds spent idle in the last #select call



23
24
25
# File 'lib/carbon_fiber/async.rb', line 23

def idle_duration
  @idle_duration
end

#loopFiber (readonly)

Returns the event loop fiber.

Returns:

  • (Fiber)

    the event loop fiber



26
27
28
# File 'lib/carbon_fiber/async.rb', line 26

def loop
  @loop
end

Instance Method Details

#closeObject

Release native resources.



45
46
47
# File 'lib/carbon_fiber/async.rb', line 45

def close
  destroy
end

#io_close(io) ⇒ Object

Cancel pending waiters and close the descriptor.

Parameters:

  • io (IO)


148
149
150
151
# File 'lib/carbon_fiber/async.rb', line 148

def io_close(io)
  fd = io.respond_to?(:fileno) ? io.fileno : io.to_i
  super(fd, IOError.new("stream closed while waiting"))
end

#io_read(fiber, io, buffer, length, offset = 0) ⇒ Integer

Returns bytes read, or negative errno.

Parameters:

  • fiber (Fiber)
  • io (IO)
  • buffer (IO::Buffer)
  • length (Integer)
  • offset (Integer) (defaults to: 0)

Returns:

  • (Integer)

    bytes read, or negative errno



126
127
128
129
130
131
# File 'lib/carbon_fiber/async.rb', line 126

def io_read(fiber, io, buffer, length, offset = 0)
  result = native_io_read(io.fileno, buffer, length, offset)
  return result unless result.nil?

  ruby_io_read(fiber, io, buffer, length, offset)
end

#io_wait(fiber, io, events) ⇒ Integer, false

Wait for I/O readiness. Falls back to IO.select on a background thread when the native path returns nil (kqueue WRITE bypass, closed fd, duplicate waiter).

Parameters:

  • fiber (Fiber)
  • io (IO)
  • events (Integer)

    bitmask of IO::READABLE, IO::WRITABLE

Returns:

  • (Integer, false)

    readiness bitmask, or false on timeout



106
107
108
109
110
111
# File 'lib/carbon_fiber/async.rb', line 106

def io_wait(fiber, io, events)
  result = native_io_wait(fiber, io.fileno, events)
  return result unless result.nil?

  fallback_io_wait(io, events)
end

#io_write(fiber, io, buffer, length, offset = 0) ⇒ Integer

Returns bytes written, or negative errno.

Parameters:

  • fiber (Fiber)
  • io (IO)
  • buffer (IO::Buffer)
  • length (Integer)
  • offset (Integer) (defaults to: 0)

Returns:

  • (Integer)

    bytes written, or negative errno



139
140
141
142
143
144
# File 'lib/carbon_fiber/async.rb', line 139

def io_write(fiber, io, buffer, length, offset = 0)
  result = native_io_write(io.fileno, buffer, length, offset)
  return result unless result.nil?

  ruby_io_write(fiber, io, buffer, length, offset)
end

#process_wait(fiber, pid, flags) ⇒ Process::Status

Wait for a child process on a background thread.

Parameters:

  • fiber (Fiber)
  • pid (Integer)
  • flags (Integer)

Returns:

  • (Process::Status)


158
159
160
161
162
163
# File 'lib/carbon_fiber/async.rb', line 158

def process_wait(fiber, pid, flags)
  Thread.new do
    Thread.current.report_on_exception = false
    Process::Status.wait(pid, flags)
  end.value
end

#push(fiber) ⇒ Object

Enqueue a fiber or fiber-like object into the ready queue.

Parameters:

  • fiber (Fiber, Object)


51
52
53
54
55
56
57
# File 'lib/carbon_fiber/async.rb', line 51

def push(fiber)
  if fiber.is_a?(Fiber)
    super
  else
    @auxiliary_mutex.synchronize { @auxiliary << fiber }
  end
end

#raise(fiber, *arguments, **options) ⇒ Object

Re-enqueue the current fiber and raise on fiber.

Parameters:

  • fiber (Fiber)


70
71
72
73
74
# File 'lib/carbon_fiber/async.rb', line 70

def raise(fiber, *arguments, **options)
  current = Fiber.current
  native_push(current) unless current.equal?(@loop)
  fiber.raise(*arguments, **options)
end

#ready?Boolean

Returns whether there is pending work.

Returns:

  • (Boolean)

    whether there is pending work



77
78
79
# File 'lib/carbon_fiber/async.rb', line 77

def ready?
  !@auxiliary.empty? || pending?
end

#resume(fiber, *arguments) ⇒ Object

Re-enqueue the current fiber and transfer to fiber with arguments.

Parameters:

  • fiber (Fiber)
  • arguments (Array)


62
63
64
65
66
# File 'lib/carbon_fiber/async.rb', line 62

def resume(fiber, *arguments)
  current = Fiber.current
  native_push(current) unless current.equal?(@loop)
  fiber.transfer(*arguments)
end

#select(duration = nil) ⇒ Object

Run one event loop iteration, draining the auxiliary queue before and after the native select.

Note: idle_duration is not actually measured—it stays at 0.0. Async uses it for load stats only (not correctness), and the two Process.clock_gettime calls plus Float allocation cost ~1-2% on select-heavy workloads.

Parameters:

  • duration (Float, nil) (defaults to: nil)

    maximum seconds to wait



91
92
93
94
95
# File 'lib/carbon_fiber/async.rb', line 91

def select(duration = nil)
  drain_auxiliary
  super
  drain_auxiliary
end