Class: CarbonFiber::Native::Selector

Inherits:
Object
  • Object
show all
Defined in:
lib/carbon_fiber/native/fallback.rb

Overview

Pure-Ruby fallback selector using threads and condition variables.

Loaded automatically when the native Zig extension is unavailable. Provides the same Selector API as the native implementation so the Scheduler and Async adapter work unchanged.

Direct Known Subclasses

Async::Selector

Instance Method Summary collapse

Constructor Details

#initialize(loop_fiber) ⇒ Selector

Returns a new instance of Selector.

Parameters:

  • loop_fiber (Fiber)

    the event loop fiber



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/carbon_fiber/native/fallback.rb', line 14

def initialize(loop_fiber)
  @loop_fiber = loop_fiber
  @mutex = Thread::Mutex.new
  @cv = Thread::ConditionVariable.new
  @ready = []
  @timers = {}
  @next_timer = 1
  @read_waits = {}
  @next_wait_token = 1

  # Fibers voluntarily parked in block() or do_io_wait, mapped to the
  # sleep timer token (or nil). flush_ready consults this set to decide
  # whether a fiber whose transfer returned unexpectedly was interrupted
  # mid-execution (Ruby 4.0 Fiber#raise bypass) and needs re-queueing.
  @blocked_fibers = {}
end

Instance Method Details

#block(fiber, timeout = nil) ⇒ Object

Suspend the current fiber until unblocked or timed out.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/carbon_fiber/native/fallback.rb', line 144

def block(fiber, timeout = nil)
  token = nil
  token = resume_after(fiber, timeout, false) if timeout
  @mutex.synchronize { @blocked_fibers[fiber] = token }

  result = @loop_fiber.transfer

  # Normal wakeup path: drop the tracking entry and cancel any still-
  # armed sleep timer. If raise() ran first, it already cancelled the
  # timer and zeroed the token; the raise-unwind path then relies on
  # cancel_block_timer (invoked from fiber_done) to remove the entry.
  @mutex.synchronize do
    stored = @blocked_fibers.delete(fiber)
    @timers.delete(stored) if stored
  end
  result
end

#cancel_block_timer(fiber) ⇒ Object

Called from Scheduler#fiber_done’s ensure block. Removes the fiber from the blocked set and cancels any still-armed sleep timer. This is the only cleanup path when a raise() unwinds the fiber past block()‘s normal return.



181
182
183
184
185
186
# File 'lib/carbon_fiber/native/fallback.rb', line 181

def cancel_block_timer(fiber)
  @mutex.synchronize do
    stored = @blocked_fibers.delete(fiber)
    @timers.delete(stored) if stored
  end
end

#cancel_timer(token) ⇒ Object

Cancel a pending timer by token.



173
174
175
# File 'lib/carbon_fiber/native/fallback.rb', line 173

def cancel_timer(token)
  @mutex.synchronize { !!@timers.delete(token) }
end

#destroyObject

No-op; nothing to release.



32
33
34
# File 'lib/carbon_fiber/native/fallback.rb', line 32

def destroy
  true
end

#io_close(fd, exception) ⇒ Object

Cancel pending waiters on a closed descriptor.



205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/carbon_fiber/native/fallback.rb', line 205

def io_close(fd, exception)
  woke = false

  @mutex.synchronize do
    wait = @read_waits.delete(fd)
    if wait
      @ready << [:raise, wait[:fiber], exception, true]
      woke = true
      @cv.signal
    end
  end

  woke
end

#io_read(_fd, _buffer, _length, _offset) ⇒ Object

Returns nil; the Scheduler handles io_read via background thread.



226
227
228
# File 'lib/carbon_fiber/native/fallback.rb', line 226

def io_read(_fd, _buffer, _length, _offset)
  nil
end

#io_wait(fiber, fd, events) ⇒ Object

Wait for read readiness on a file descriptor via IO.select on a background thread. Returns nil for non-READABLE events (handled by the Scheduler fallback).



191
192
193
194
195
# File 'lib/carbon_fiber/native/fallback.rb', line 191

def io_wait(fiber, fd, events)
  return nil unless events == IO::READABLE

  do_io_wait(fiber, fd, nil)
end

#io_wait_with_timeout(fiber, fd, events, timeout) ⇒ Object

Like #io_wait but with a timeout.



198
199
200
201
202
# File 'lib/carbon_fiber/native/fallback.rb', line 198

def io_wait_with_timeout(fiber, fd, events, timeout)
  return nil unless events == IO::READABLE

  do_io_wait(fiber, fd, timeout)
end

#io_write(_fd, _buffer, _length, _offset) ⇒ Object

Returns nil; the Scheduler handles io_write via background thread.



231
232
233
# File 'lib/carbon_fiber/native/fallback.rb', line 231

def io_write(_fd, _buffer, _length, _offset)
  nil
end

#kernel_sleep(duration = nil) ⇒ Object

Mirrors ‘Selector#kernel_sleep` on the native side so `Scheduler#kernel_sleep` can delegate to `@selector.kernel_sleep` in both paths. Branches on the duration: nil parks the fiber on the loop without a timer, non-positive yields, positive parks on a native timer for `duration` seconds.



132
133
134
135
136
137
138
139
140
141
# File 'lib/carbon_fiber/native/fallback.rb', line 132

def kernel_sleep(duration = nil)
  if duration.nil?
    transfer
  elsif duration <= 0
    self.yield
  else
    block(Fiber.current, duration)
  end
  true
end

#pending?Boolean

Returns whether there is pending work.

Returns:

  • (Boolean)

    whether there is pending work



37
38
39
# File 'lib/carbon_fiber/native/fallback.rb', line 37

def pending?
  @mutex.synchronize { @ready.any? || @timers.any? || @read_waits.any? }
end

#poll_readable_now(fd) ⇒ Object

Non-destructive check if a descriptor has data available to read.



236
237
238
239
240
241
242
243
244
# File 'lib/carbon_fiber/native/fallback.rb', line 236

def poll_readable_now(fd)
  io = IO.new(fd, autoclose: false)
  ready = IO.select([io], nil, nil, 0)
  !!ready
rescue IOError, SystemCallError
  false
ensure
  io.close if io && !io.closed?
end

#process_wait(_fiber, _pid, _flags) ⇒ Object

Returns nil; the Scheduler handles process_wait via background thread.



221
222
223
# File 'lib/carbon_fiber/native/fallback.rb', line 221

def process_wait(_fiber, _pid, _flags)
  nil
end

#push(fiber) ⇒ Object

Enqueue a fiber into the ready queue.



42
43
44
45
46
47
48
# File 'lib/carbon_fiber/native/fallback.rb', line 42

def push(fiber)
  @mutex.synchronize do
    @ready << [:resume, fiber, nil, false]
    @cv.signal
  end
  fiber
end

#raise(fiber, exception) ⇒ Object

Enqueue an exception delivery to a fiber.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/carbon_fiber/native/fallback.rb', line 60

def raise(fiber, exception)
  @mutex.synchronize do
    # Cancel any armed sleep timer for this fiber so its block() wakeup
    # doesn't spuriously fire after the raise. Zero the token but leave
    # the blocked_fibers entry—it's removed by cancel_block_timer when
    # the fiber's ensure runs, so flush_ready's re-queue check still
    # correctly treats the fiber as "parked" until it exits.
    if @blocked_fibers.key?(fiber)
      token = @blocked_fibers[fiber]
      @timers.delete(token) if token
      @blocked_fibers[fiber] = nil
    end
    @ready << [:raise, fiber, exception, true]
    @cv.signal
  end
  fiber
end

#raise_after(fiber, exception, duration) ⇒ Object

Schedule an exception to be raised on a fiber after duration seconds.



168
169
170
# File 'lib/carbon_fiber/native/fallback.rb', line 168

def raise_after(fiber, exception, duration)
  schedule_timer(duration, :raise, fiber, exception)
end

#resume(fiber, value) ⇒ Object

Enqueue a fiber with a return value.



51
52
53
54
55
56
57
# File 'lib/carbon_fiber/native/fallback.rb', line 51

def resume(fiber, value)
  @mutex.synchronize do
    @ready << [:resume, fiber, value, true]
    @cv.signal
  end
  fiber
end

#select(timeout = nil) ⇒ Object

Run one event loop iteration.



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/carbon_fiber/native/fallback.rb', line 101

def select(timeout = nil)
  flush_ready
  return 0 unless pending?

  deadline = next_wait_deadline(timeout)

  @mutex.synchronize do
    until @ready.any?
      collect_expired_timers_locked
      break if @ready.any?

      if deadline
        remaining = deadline - monotonic_time
        break if remaining <= 0

        @cv.wait(@mutex, remaining)
      else
        @cv.wait(@mutex)
      end
    end
  end

  collect_expired_timers
  flush_ready
end

#transferObject

Transfer control to the event loop fiber.



85
86
87
88
89
# File 'lib/carbon_fiber/native/fallback.rb', line 85

def transfer
  return nil if Fiber.current.equal?(@loop_fiber)

  @loop_fiber.transfer
end

#unblock(fiber) ⇒ Object

Resume a fiber previously suspended by #block.



163
164
165
# File 'lib/carbon_fiber/native/fallback.rb', line 163

def unblock(fiber)
  resume(fiber, true)
end

#wakeupObject

Wake the event loop.



79
80
81
82
# File 'lib/carbon_fiber/native/fallback.rb', line 79

def wakeup
  @mutex.synchronize { @cv.signal }
  true
end

#yieldObject

Transfer to the event loop fiber. flush_ready’s re-queue logic puts us back in the ready queue on the next pass—no explicit self-push needed, and avoiding it prevents duplicate ready entries.



94
95
96
97
98
# File 'lib/carbon_fiber/native/fallback.rb', line 94

def yield
  return nil if Fiber.current.equal?(@loop_fiber)

  @loop_fiber.transfer
end