Class: Winloop::Scheduler
- Inherits:
-
Object
- Object
- Winloop::Scheduler
- Defined in:
- lib/winloop/scheduler.rb
Overview
A Fiber::Scheduler for MRI on Windows, backed by an I/O Completion Port.
Strategy (matches libuv/mio/wepoll on Windows):
* io_wait -> IOCTL_AFD_POLL submitted as an overlapped op on the IOCP
(the only way to get *readiness* onto a completion port).
Polls are COALESCED per socket: at most one AFD poll is
outstanding per socket, covering the union of every
waiter's interest, and its completion fans out to all of
them. (The AFD driver completes only ONE pending poll per
readiness edge, so independent polls on the same socket
would lose wakeups.)
* io_read/io_write -> recv_nonblock/write_nonblock driven by io_wait
* kernel_sleep /
timeout_after -> a monotonic timer min-heap (the loop's GQCSEx timeout)
* block / unblock -> in-process waiter lists; cross-thread unblock wakes the
loop with PostQueuedCompletionStatus
* close -> drives the run loop until every fiber has finished
Single thread-of-use: only the owning thread runs the loop and resumes fibers. #unblock may be called from another thread and only mutates the (locked) ready queue + posts a wakeup; it never resumes a fiber itself.
Defined Under Namespace
Classes: IoWaiter, PollEntry, Timer, Waiter
Instance Method Summary collapse
- #address_resolve(hostname) ⇒ Object
-
#await_on_thread(&block) ⇒ Object
Run a blocking operation on a fresh, scheduler-FREE thread and park the calling fiber on it cooperatively (Thread#value -> our block hook).
-
#await_op(op_id, timeout: nil) ⇒ Object
Park the calling fiber until op_id's completion packet is reaped by the loop, then return [bytes, error, data] (op_result is pulled HERE, in the woken fiber, so orphaned completions never allocate Strings).
-
#block(blocker, timeout = nil) ⇒ Object
---- block / unblock (Mutex, ConditionVariable, Queue, Thread#join) ----.
-
#close ⇒ Object
MRI calls this automatically at thread/program exit.
- #closed? ⇒ Boolean
-
#fiber(&block) ⇒ Object
---- fiber creation ----------------------------------------------------.
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#io_read(io, buffer, length, offset = 0) ⇒ Object
---- stream read/write (recv_nonblock + io_wait) ----------------------.
-
#io_wait(io, events, timeout) ⇒ Object
Wait until
iois ready forevents; returns the ready-events bitmask (0 on timeout). - #io_write(io, buffer, length, offset = 0) ⇒ Object
-
#kernel_sleep(duration = nil) ⇒ Object
---- timers -----------------------------------------------------------.
- #monotonic ⇒ Object
- #op_abandon(op_id) ⇒ Object
-
#op_associate(handle) ⇒ Object
---- generic OVERLAPPED completions (the cross-gem op protocol) ---------.
- #op_cancel(op_id) ⇒ Object
- #op_prepare(handle, tag: 0, capacity: 0) ⇒ Object
- #op_state(op_id) ⇒ Object
- #op_submitted(op_id) ⇒ Object
- #process_wait(pid, flags) ⇒ Object
-
#run ⇒ Object
---- the run loop ------------------------------------------------------.
- #timeout_after(duration, exception_class, *exception_arguments) ⇒ Object
-
#unblock(_blocker, fiber) ⇒ Object
Called from ANY thread: only mutate shared state under the lock and wake the loop — never resume the fiber here.
Constructor Details
#initialize ⇒ Scheduler
Returns a new instance of Scheduler.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/winloop/scheduler.rb', line 42 def initialize @backend = Backend.new @ready = [] # [[fiber, resume_value], ...] (shared: unblock pushes here) @timers = MinHeap.new # of Timer, keyed on :deadline @polls = {} # socket key (fileno) => PollEntry @ops = {} # backend poll id => PollEntry @parked = {} # fiber => Waiter (its CURRENT block/kernel_sleep park) @lock = Thread::Mutex.new # guards @ready + @parked (the cross-thread path) @loop = Fiber.current # the thread's root fiber == the loop @closed = false # ---- generic OVERLAPPED op state (winloop 0.2) ---- @thread = Thread.current # the op API is loop-thread-only (enforced) @op_waiters = {} # op_id => Waiter (a fiber parked in await_op) @op_orphans = {} # op_id => true (timed-out/unwound awaits; the late # packet is retired without waking anyone) @op_done = {} # op_id => [bytes, error] (completed before anyone awaited) @op_shut = false # true once the loop has fully finished (backend shut down). # NOT @closed: under Winloop.run, #close IS the event loop, # so @closed is true while op-protocol fibers still run. end |
Instance Method Details
#address_resolve(hostname) ⇒ Object
243 244 245 246 247 248 249 |
# File 'lib/winloop/scheduler.rb', line 243 def address_resolve(hostname) await_on_thread do Addrinfo.getaddrinfo(hostname, nil, nil, :STREAM).map(&:ip_address).uniq end rescue SocketError [] end |
#await_on_thread(&block) ⇒ Object
Run a blocking operation on a fresh, scheduler-FREE thread and park the calling fiber on it cooperatively (Thread#value -> our block hook). The worker is killed if this fiber unwinds first (Timeout.timeout / Fiber kill) so it can never outlive us, leak, or race a later operation. We do NOT join after kill — that would re-enter the scheduler mid-unwind; Thread#kill interrupts the blocking syscall and the worker terminates on its own.
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'lib/winloop/scheduler.rb', line 226 def await_on_thread(&block) worker = Thread.new(&block) worker.report_on_exception = false # expected errors are surfaced via #value begin worker.value ensure if worker.alive? # Unwound first (Timeout/kill): interrupt the worker out of its blocking # syscall and WAIT for it to die before returning, so it can't read (and # discard) bytes that a later operation on the same IO expects. kill # interrupts readpartial immediately, so this join does not block. worker.kill worker.join end end end |
#await_op(op_id, timeout: nil) ⇒ Object
Park the calling fiber until op_id's completion packet is reaped by the loop, then return [bytes, error, data] (op_result is pulled HERE, in the woken fiber, so orphaned completions never allocate Strings). On timeout: returns nil, auto-cancels and orphans the op. Either way the op is retired: after await_op returns — value or nil — the op_id must not be used again.
timeout: nil (wait forever — safe provided the op was submitted on a handle that is still the associated handle) or a positive Numeric in seconds. Recommendation: pass a real timeout while first integrating a new native API; switch to nil once the submit path is proven.
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 |
# File 'lib/winloop/scheduler.rb', line 313 def await_op(op_id, timeout: nil) op_guard! validate_op_timeout!(timeout) if @op_done.delete(op_id) # early completion: no yield return @backend.op_result(op_id) end raise Error, "winloop: op #{op_id} already has a waiter" if @op_waiters.key?(op_id) raise Error, "winloop: op #{op_id} has not been submitted" if @backend.op_state(op_id) == :prepared waiter = Waiter.new(Fiber.current, false) @op_waiters[op_id] = waiter @timers.push(Timer.new(monotonic + timeout, waiter, nil, nil)) if timeout completed = Fiber.yield # [bytes, error] or nil (timer fired) completed ? @backend.op_result(op_id) : nil ensure # NOTHING in this ensure may raise — it runs during Timeout::Error / # Fiber#raise / Fiber#kill unwinds and must never mask the in-flight # exception (op_cancel's no-raise-on-CancelIoEx contract exists for this). if waiter waiter.settled = true if @op_waiters.delete(op_id) # loop did NOT deliver: timeout or unwind @op_orphans[op_id] = true begin @backend.op_cancel(op_id) # late packet -> orphan path retires it rescue Error # backend already shut down mid-unwind: nothing left to cancel end elsif !completed # loop DID deliver, but we were unwound # (Fiber#kill, or a sibling raise) before resuming with the value: the # record sits OP_COMPLETED with nobody left to op_result it — free it # now. Guarded: the loop's settled/timer branch may have retired the id. @backend.op_free(op_id) if op_completed_unconsumed?(op_id) end end end |
#block(blocker, timeout = nil) ⇒ Object
---- block / unblock (Mutex, ConditionVariable, Queue, Thread#join) ----
171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/winloop/scheduler.rb', line 171 def block(blocker, timeout = nil) fiber = Fiber.current waiter = Waiter.new(fiber, false) @lock.synchronize { @parked[fiber] = waiter } @timers.push(Timer.new(monotonic + timeout, waiter, false, nil)) if timeout Fiber.yield # true (unblocked) or false (timed out) ensure @lock.synchronize do waiter.settled = true @parked.delete(fiber) end end |
#close ⇒ Object
MRI calls this automatically at thread/program exit. It IS the event loop.
351 352 353 354 355 356 357 358 359 360 |
# File 'lib/winloop/scheduler.rb', line 351 def close return if @closed @closed = true run ensure @op_shut = true # the op protocol is over; op_* / await_op raise from here on @backend.shutdown # NEVER call Fiber.set_scheduler(nil) here — it raises SystemStackError # during MRI's implicit close at thread exit. end |
#closed? ⇒ Boolean
362 |
# File 'lib/winloop/scheduler.rb', line 362 def closed? = @closed |
#fiber(&block) ⇒ Object
---- fiber creation ----------------------------------------------------
212 213 214 215 216 |
# File 'lib/winloop/scheduler.rb', line 212 def fiber(&block) fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end |
#io_read(io, buffer, length, offset = 0) ⇒ Object
---- stream read/write (recv_nonblock + io_wait) ----------------------
length is the MINIMUM bytes to transfer; 0 means "the whole region
[offset, buffer.size)". MUST use recv_nonblock — read_nonblock / sysread /
readpartial re-enter this hook on mswin and recurse forever.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/winloop/scheduler.rb', line 100 def io_read(io, buffer, length, offset = 0) return blocking_read(io, buffer, length, offset) unless io.respond_to?(:recv_nonblock) total = 0 minimum = length.zero? ? 1 : length while total < minimum maximum = buffer.size - offset - total break if maximum <= 0 result = io.recv_nonblock(maximum, exception: false) case result when :wait_readable then io_wait(io, READABLE, nil) when nil then break # EOF else buffer.set_string(result, offset + total) total += result.bytesize end end total end |
#io_wait(io, events, timeout) ⇒ Object
Wait until io is ready for events; returns the ready-events bitmask
(0 on timeout). events is an OR of Winloop::READABLE/WRITABLE/PRIORITY.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/winloop/scheduler.rb', line 72 def io_wait(io, events, timeout) key = io.fileno waiter = IoWaiter.new(Fiber.current, events, false) entry = (@polls[key] ||= PollEntry.new(key, io, nil, 0, [])) entry.waiters << waiter arm_poll(entry) @timers.push(Timer.new(monotonic + timeout, waiter, 0, nil)) if timeout Fiber.yield # resumed with the ready-events mask (or 0 on timeout) ensure # On the normal completion path the loop already removed us from the entry # and re-armed/dropped it. If we are unwinding by any other path (timeout, # or Timeout.timeout raised into us), detach and re-arm/cancel for the rest. waiter.settled = true if entry.waiters.delete(waiter) if entry.waiters.empty? cancel_entry(entry) else arm_poll(entry) # interest shrank; existing poll still covers it (no-op) end end end |
#io_write(io, buffer, length, offset = 0) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/winloop/scheduler.rb', line 119 def io_write(io, buffer, length, offset = 0) total = 0 maximum = buffer.size - offset return 0 if maximum <= 0 minimum = length.zero? ? maximum : length minimum = maximum if minimum > maximum # never spin writing empty chunks while total < minimum chunk = buffer.get_string(offset + total, maximum - total) break if chunk.empty? result = io.write_nonblock(chunk, exception: false) if result == :wait_writable io_wait(io, WRITABLE, nil) else total += result end end total end |
#kernel_sleep(duration = nil) ⇒ Object
---- timers -----------------------------------------------------------
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/winloop/scheduler.rb', line 140 def kernel_sleep(duration = nil) fiber = Fiber.current waiter = Waiter.new(fiber, false) # Park under @parked so a cross-thread #unblock (e.g. ConditionVariable#wait # with a timeout parks HERE, and a racing signal must wake us through the # SAME one-shot guard as the timer — never a second, competing wakeup). @lock.synchronize { @parked[fiber] = waiter } @timers.push(Timer.new(monotonic + duration, waiter, nil, nil)) if duration Fiber.yield duration ensure @lock.synchronize do waiter.settled = true # disarm a not-yet-fired timer on abnormal exit @parked.delete(fiber) end end |
#monotonic ⇒ Object
64 65 66 |
# File 'lib/winloop/scheduler.rb', line 64 def monotonic Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#op_abandon(op_id) ⇒ Object
288 289 290 291 |
# File 'lib/winloop/scheduler.rb', line 288 def op_abandon(op_id) op_guard! @backend.op_abandon(op_id) end |
#op_associate(handle) ⇒ Object
---- generic OVERLAPPED completions (the cross-gem op protocol) ---------
Client gems feature-detect with Fiber.scheduler.respond_to?(:await_op)
and talk ONLY to these wrappers (never the Backend — they cannot reach
#wait/#shutdown and corrupt the loop). The contract loop:
op_associate(h) # once, at handle-open time
op_id, ov, buf = op_prepare(h, capacity: n)
ok = <the client's own native submit using ov/buf>
if ok # native call returned TRUE or GetLastError == ERROR_IO_PENDING
op_submitted(op_id)
bytes, error, data = await_op(op_id, timeout: t) # fiber parks here
else # synchronous failure: no packet will ever arrive
op_abandon(op_id)
end
All of these are loop-thread-only and raise once the scheduler has shut down.
273 274 275 276 |
# File 'lib/winloop/scheduler.rb', line 273 def op_associate(handle) op_guard! @backend.associate(handle) end |
#op_cancel(op_id) ⇒ Object
293 294 295 296 |
# File 'lib/winloop/scheduler.rb', line 293 def op_cancel(op_id) op_guard! @backend.op_cancel(op_id) end |
#op_prepare(handle, tag: 0, capacity: 0) ⇒ Object
278 279 280 281 |
# File 'lib/winloop/scheduler.rb', line 278 def op_prepare(handle, tag: 0, capacity: 0) op_guard! @backend.op_prepare(handle, tag: tag, capacity: capacity) end |
#op_state(op_id) ⇒ Object
298 299 300 301 |
# File 'lib/winloop/scheduler.rb', line 298 def op_state(op_id) op_guard! @backend.op_state(op_id) end |
#op_submitted(op_id) ⇒ Object
283 284 285 286 |
# File 'lib/winloop/scheduler.rb', line 283 def op_submitted(op_id) op_guard! @backend.op_submitted(op_id) end |
#process_wait(pid, flags) ⇒ Object
251 252 253 |
# File 'lib/winloop/scheduler.rb', line 251 def process_wait(pid, flags) await_on_thread { Process::Status.wait(pid, flags) } end |
#run ⇒ Object
---- the run loop ------------------------------------------------------
366 367 368 |
# File 'lib/winloop/scheduler.rb', line 366 def run run_once until done? end |
#timeout_after(duration, exception_class, *exception_arguments) ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/winloop/scheduler.rb', line 157 def timeout_after(duration, exception_class, *exception_arguments) waiter = Waiter.new(Fiber.current, false) @timers.push(Timer.new(monotonic + duration, waiter, nil, [exception_class, exception_arguments])) begin yield duration ensure # disarm if the block finished before the deadline @lock.synchronize { waiter.settled = true } end end |
#unblock(_blocker, fiber) ⇒ Object
Called from ANY thread: only mutate shared state under the lock and wake the loop — never resume the fiber here.
We wake by FIBER identity through its single parked Waiter, ignoring
blocker. That is deliberate and important:
* Thread#join calls block(thread) but MRI calls unblock(<a DIFFERENT
object>, fiber); keying on the fiber wakes it correctly anyway.
* A fiber can have more than one live wakeup source (a timeout timer AND
this unblock — e.g. ConditionVariable#wait(mutex, timeout) racing a
signal). Routing through the one Waiter's `settled` guard means exactly
one of them resumes the fiber; the loser is dropped. A raw, unguarded
"@ready << fiber" would double-resume and cut the fiber's NEXT wait short.
196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/winloop/scheduler.rb', line 196 def unblock(_blocker, fiber) @lock.synchronize do waiter = @parked[fiber] if waiter wake_locked(waiter, true) elsif @ready.none? { |f, _| f.equal?(fiber) } # Not parked in any wait we track (spurious/late unblock) — last resort, # de-duplicated so it can never double-enqueue. @ready << [fiber, true] end end @backend.wakeup end |