Class: CarbonFiber::Native::Selector

Inherits:
Object
  • Object
show all
Defined in:
lib/carbon_fiber/native/fallback.rb

Overview

Pure-Ruby fallback selector using threads and condition variables.

Loaded automatically when the native Zig extension is unavailable. Provides the same Selector API as the native implementation so the Scheduler and Async adapter work unchanged.

Direct Known Subclasses

Async::Selector

Instance Method Summary collapse

Constructor Details

#initialize(loop_fiber) ⇒ Selector

Returns a new instance of Selector.

Parameters:

  • loop_fiber (Fiber)

    the event loop fiber



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/carbon_fiber/native/fallback.rb', line 14

def initialize(loop_fiber)
  @loop_fiber = loop_fiber
  @mutex = Thread::Mutex.new
  @cv = Thread::ConditionVariable.new
  @ready = []
  @timers = {}
  @next_timer = 1
  @read_waits = {}
  @next_wait_token = 1

  # Fibers voluntarily parked in block() or do_io_wait, mapped to the
  # sleep timer token (or nil). flush_ready consults this set to decide
  # whether a fiber whose transfer returned unexpectedly was interrupted
  # mid-execution (Ruby 4.0 Fiber#raise bypass) and needs re-queueing.
  @blocked_fibers = {}
end

Instance Method Details

#block(fiber, timeout = nil) ⇒ Object

Suspend the current fiber until unblocked or timed out.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/carbon_fiber/native/fallback.rb', line 128

def block(fiber, timeout = nil)
  token = nil
  token = resume_after(fiber, timeout, false) if timeout
  @mutex.synchronize { @blocked_fibers[fiber] = token }

  result = @loop_fiber.transfer

  # Normal wakeup path: drop the tracking entry and cancel any still-
  # armed sleep timer. If raise() ran first, it already cancelled the
  # timer and zeroed the token; the raise-unwind path then relies on
  # cancel_block_timer (invoked from fiber_done) to remove the entry.
  @mutex.synchronize do
    stored = @blocked_fibers.delete(fiber)
    @timers.delete(stored) if stored
  end
  result
end

#cancel_block_timer(fiber) ⇒ Object

Called from Scheduler#fiber_done’s ensure block. Removes the fiber from the blocked set and cancels any still-armed sleep timer. This is the only cleanup path when a raise() unwinds the fiber past block()‘s normal return.



165
166
167
168
169
170
# File 'lib/carbon_fiber/native/fallback.rb', line 165

def cancel_block_timer(fiber)
  @mutex.synchronize do
    stored = @blocked_fibers.delete(fiber)
    @timers.delete(stored) if stored
  end
end

#cancel_timer(token) ⇒ Object

Cancel a pending timer by token.



157
158
159
# File 'lib/carbon_fiber/native/fallback.rb', line 157

def cancel_timer(token)
  @mutex.synchronize { !!@timers.delete(token) }
end

#destroyObject

No-op; nothing to release.



32
33
34
# File 'lib/carbon_fiber/native/fallback.rb', line 32

def destroy
  true
end

#io_close(fd, exception) ⇒ Object

Cancel pending waiters on a closed descriptor.



189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/carbon_fiber/native/fallback.rb', line 189

def io_close(fd, exception)
  woke = false

  @mutex.synchronize do
    wait = @read_waits.delete(fd)
    if wait
      @ready << [:raise, wait[:fiber], exception, true]
      woke = true
      @cv.signal
    end
  end

  woke
end

#io_read(_fd, _buffer, _length, _offset) ⇒ Object

Returns nil; the Scheduler handles io_read via background thread.



210
211
212
# File 'lib/carbon_fiber/native/fallback.rb', line 210

def io_read(_fd, _buffer, _length, _offset)
  nil
end

#io_wait(fiber, fd, events) ⇒ Object

Wait for read readiness on a file descriptor via IO.select on a background thread. Returns nil for non-READABLE events (handled by the Scheduler fallback).



175
176
177
178
179
# File 'lib/carbon_fiber/native/fallback.rb', line 175

def io_wait(fiber, fd, events)
  return nil unless events == IO::READABLE

  do_io_wait(fiber, fd, nil)
end

#io_wait_with_timeout(fiber, fd, events, timeout) ⇒ Object

Like #io_wait but with a timeout.



182
183
184
185
186
# File 'lib/carbon_fiber/native/fallback.rb', line 182

def io_wait_with_timeout(fiber, fd, events, timeout)
  return nil unless events == IO::READABLE

  do_io_wait(fiber, fd, timeout)
end

#io_write(_fd, _buffer, _length, _offset) ⇒ Object

Returns nil; the Scheduler handles io_write via background thread.



215
216
217
# File 'lib/carbon_fiber/native/fallback.rb', line 215

def io_write(_fd, _buffer, _length, _offset)
  nil
end

#pending?Boolean

Returns whether there is pending work.

Returns:

  • (Boolean)

    whether there is pending work



37
38
39
# File 'lib/carbon_fiber/native/fallback.rb', line 37

def pending?
  @mutex.synchronize { @ready.any? || @timers.any? || @read_waits.any? }
end

#poll_readable_now(fd) ⇒ Object

Non-destructive check if a descriptor has data available to read.



220
221
222
223
224
225
226
227
228
# File 'lib/carbon_fiber/native/fallback.rb', line 220

def poll_readable_now(fd)
  io = IO.new(fd, autoclose: false)
  ready = IO.select([io], nil, nil, 0)
  !!ready
rescue IOError, SystemCallError
  false
ensure
  io.close if io && !io.closed?
end

#process_wait(_fiber, _pid, _flags) ⇒ Object

Returns nil; the Scheduler handles process_wait via background thread.



205
206
207
# File 'lib/carbon_fiber/native/fallback.rb', line 205

def process_wait(_fiber, _pid, _flags)
  nil
end

#push(fiber) ⇒ Object

Enqueue a fiber into the ready queue.



42
43
44
45
46
47
48
# File 'lib/carbon_fiber/native/fallback.rb', line 42

def push(fiber)
  @mutex.synchronize do
    @ready << [:resume, fiber, nil, false]
    @cv.signal
  end
  fiber
end

#raise(fiber, exception) ⇒ Object

Enqueue an exception delivery to a fiber.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/carbon_fiber/native/fallback.rb', line 60

def raise(fiber, exception)
  @mutex.synchronize do
    # Cancel any armed sleep timer for this fiber so its block() wakeup
    # doesn't spuriously fire after the raise. Zero the token but leave
    # the blocked_fibers entry—it's removed by cancel_block_timer when
    # the fiber's ensure runs, so flush_ready's re-queue check still
    # correctly treats the fiber as "parked" until it exits.
    if @blocked_fibers.key?(fiber)
      token = @blocked_fibers[fiber]
      @timers.delete(token) if token
      @blocked_fibers[fiber] = nil
    end
    @ready << [:raise, fiber, exception, true]
    @cv.signal
  end
  fiber
end

#raise_after(fiber, exception, duration) ⇒ Object

Schedule an exception to be raised on a fiber after duration seconds.



152
153
154
# File 'lib/carbon_fiber/native/fallback.rb', line 152

def raise_after(fiber, exception, duration)
  schedule_timer(duration, :raise, fiber, exception)
end

#resume(fiber, value) ⇒ Object

Enqueue a fiber with a return value.



51
52
53
54
55
56
57
# File 'lib/carbon_fiber/native/fallback.rb', line 51

def resume(fiber, value)
  @mutex.synchronize do
    @ready << [:resume, fiber, value, true]
    @cv.signal
  end
  fiber
end

#select(timeout = nil) ⇒ Object

Run one event loop iteration.



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/carbon_fiber/native/fallback.rb', line 101

def select(timeout = nil)
  flush_ready
  return 0 unless pending?

  deadline = next_wait_deadline(timeout)

  @mutex.synchronize do
    until @ready.any?
      collect_expired_timers_locked
      break if @ready.any?

      if deadline
        remaining = deadline - monotonic_time
        break if remaining <= 0

        @cv.wait(@mutex, remaining)
      else
        @cv.wait(@mutex)
      end
    end
  end

  collect_expired_timers
  flush_ready
end

#transferObject

Transfer control to the event loop fiber.



85
86
87
88
89
# File 'lib/carbon_fiber/native/fallback.rb', line 85

def transfer
  return nil if Fiber.current.equal?(@loop_fiber)

  @loop_fiber.transfer
end

#unblock(fiber) ⇒ Object

Resume a fiber previously suspended by #block.



147
148
149
# File 'lib/carbon_fiber/native/fallback.rb', line 147

def unblock(fiber)
  resume(fiber, true)
end

#wakeupObject

Wake the event loop.



79
80
81
82
# File 'lib/carbon_fiber/native/fallback.rb', line 79

def wakeup
  @mutex.synchronize { @cv.signal }
  true
end

#yieldObject

Transfer to the event loop fiber. flush_ready’s re-queue logic puts us back in the ready queue on the next pass—no explicit self-push needed, and avoiding it prevents duplicate ready entries.



94
95
96
97
98
# File 'lib/carbon_fiber/native/fallback.rb', line 94

def yield
  return nil if Fiber.current.equal?(@loop_fiber)

  @loop_fiber.transfer
end