Class: Winloop::Scheduler
- Inherits:
-
Object
- Object
- Winloop::Scheduler
- Defined in:
- lib/winloop/scheduler.rb
Overview
A Fiber::Scheduler for MRI on Windows, backed by an I/O Completion Port.
Strategy (matches libuv/mio/wepoll on Windows):
* io_wait -> IOCTL_AFD_POLL submitted as an overlapped op on the IOCP
(the only way to get *readiness* onto a completion port).
Polls are COALESCED per socket: at most one AFD poll is
outstanding per socket, covering the union of every
waiter's interest, and its completion fans out to all of
them. (The AFD driver completes only ONE pending poll per
readiness edge, so independent polls on the same socket
would lose wakeups.)
* io_read/io_write -> recv_nonblock/write_nonblock driven by io_wait
* kernel_sleep /
timeout_after -> a monotonic timer min-heap (the loop's GQCSEx timeout)
* block / unblock -> in-process waiter lists; cross-thread unblock wakes the
loop with PostQueuedCompletionStatus
* close -> drives the run loop until every fiber has finished
Single thread-of-use: only the owning thread runs the loop and resumes fibers. #unblock may be called from another thread and only mutates the (locked) ready queue + posts a wakeup; it never resumes a fiber itself.
Defined Under Namespace
Classes: IoWaiter, PollEntry, Timer, Waiter
Instance Method Summary collapse
- #address_resolve(hostname) ⇒ Object
-
#await_on_thread(&block) ⇒ Object
Run a blocking operation on a fresh, scheduler-FREE thread and park the calling fiber on it cooperatively (Thread#value -> our block hook).
-
#block(blocker, timeout = nil) ⇒ Object
—- block / unblock (Mutex, ConditionVariable, Queue, Thread#join) —-.
-
#close ⇒ Object
MRI calls this automatically at thread/program exit.
- #closed? ⇒ Boolean
-
#fiber(&block) ⇒ Object
—- fiber creation —————————————————-.
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#io_read(io, buffer, length, offset = 0) ⇒ Object
—- stream read/write (recv_nonblock + io_wait) ———————-.
-
#io_wait(io, events, timeout) ⇒ Object
Wait until ‘io` is ready for `events`; returns the ready-events bitmask (0 on timeout).
- #io_write(io, buffer, length, offset = 0) ⇒ Object
-
#kernel_sleep(duration = nil) ⇒ Object
—- timers ———————————————————–.
- #monotonic ⇒ Object
- #process_wait(pid, flags) ⇒ Object
-
#run ⇒ Object
—- the run loop ——————————————————.
- #timeout_after(duration, exception_class, *exception_arguments) ⇒ Object
-
#unblock(_blocker, fiber) ⇒ Object
Called from ANY thread: only mutate shared state under the lock and wake the loop — never resume the fiber here.
Constructor Details
#initialize ⇒ Scheduler
Returns a new instance of Scheduler.
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/winloop/scheduler.rb', line 42 def initialize @backend = Backend.new @ready = [] # [[fiber, resume_value], ...] (shared: unblock pushes here) @timers = MinHeap.new # of Timer, keyed on :deadline @polls = {} # socket key (fileno) => PollEntry @ops = {} # backend poll id => PollEntry @parked = {} # fiber => Waiter (its CURRENT block/kernel_sleep park) @lock = Thread::Mutex.new # guards @ready + @parked (the cross-thread path) @loop = Fiber.current # the thread's root fiber == the loop @closed = false end |
Instance Method Details
#address_resolve(hostname) ⇒ Object
233 234 235 236 237 238 239 |
# File 'lib/winloop/scheduler.rb', line 233 def address_resolve(hostname) await_on_thread do Addrinfo.getaddrinfo(hostname, nil, nil, :STREAM).map(&:ip_address).uniq end rescue SocketError [] end |
#await_on_thread(&block) ⇒ Object
Run a blocking operation on a fresh, scheduler-FREE thread and park the calling fiber on it cooperatively (Thread#value -> our block hook). The worker is killed if this fiber unwinds first (Timeout.timeout / Fiber kill) so it can never outlive us, leak, or race a later operation. We do NOT join after kill — that would re-enter the scheduler mid-unwind; Thread#kill interrupts the blocking syscall and the worker terminates on its own.
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/winloop/scheduler.rb', line 216 def await_on_thread(&block) worker = Thread.new(&block) worker.report_on_exception = false # expected errors are surfaced via #value begin worker.value ensure if worker.alive? # Unwound first (Timeout/kill): interrupt the worker out of its blocking # syscall and WAIT for it to die before returning, so it can't read (and # discard) bytes that a later operation on the same IO expects. kill # interrupts readpartial immediately, so this join does not block. worker.kill worker.join end end end |
#block(blocker, timeout = nil) ⇒ Object
—- block / unblock (Mutex, ConditionVariable, Queue, Thread#join) —-
161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/winloop/scheduler.rb', line 161 def block(blocker, timeout = nil) fiber = Fiber.current waiter = Waiter.new(fiber, false) @lock.synchronize { @parked[fiber] = waiter } @timers.push(Timer.new(monotonic + timeout, waiter, false, nil)) if timeout Fiber.yield # true (unblocked) or false (timed out) ensure @lock.synchronize do waiter.settled = true @parked.delete(fiber) end end |
#close ⇒ Object
MRI calls this automatically at thread/program exit. It IS the event loop.
248 249 250 251 252 253 254 255 256 |
# File 'lib/winloop/scheduler.rb', line 248 def close return if @closed @closed = true run ensure @backend.shutdown # NEVER call Fiber.set_scheduler(nil) here — it raises SystemStackError # during MRI's implicit close at thread exit. end |
#closed? ⇒ Boolean
258 |
# File 'lib/winloop/scheduler.rb', line 258 def closed? = @closed |
#fiber(&block) ⇒ Object
—- fiber creation —————————————————-
202 203 204 205 206 |
# File 'lib/winloop/scheduler.rb', line 202 def fiber(&block) fiber = Fiber.new(blocking: false, &block) fiber.resume fiber end |
#io_read(io, buffer, length, offset = 0) ⇒ Object
—- stream read/write (recv_nonblock + io_wait) ———————-
‘length` is the MINIMUM bytes to transfer; 0 means “the whole region [offset, buffer.size)”. MUST use recv_nonblock — read_nonblock / sysread / readpartial re-enter this hook on mswin and recurse forever.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/winloop/scheduler.rb', line 90 def io_read(io, buffer, length, offset = 0) return blocking_read(io, buffer, length, offset) unless io.respond_to?(:recv_nonblock) total = 0 minimum = length.zero? ? 1 : length while total < minimum maximum = buffer.size - offset - total break if maximum <= 0 result = io.recv_nonblock(maximum, exception: false) case result when :wait_readable then io_wait(io, READABLE, nil) when nil then break # EOF else buffer.set_string(result, offset + total) total += result.bytesize end end total end |
#io_wait(io, events, timeout) ⇒ Object
Wait until ‘io` is ready for `events`; returns the ready-events bitmask (0 on timeout). events is an OR of Winloop::READABLE/WRITABLE/PRIORITY.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/winloop/scheduler.rb', line 62 def io_wait(io, events, timeout) key = io.fileno waiter = IoWaiter.new(Fiber.current, events, false) entry = (@polls[key] ||= PollEntry.new(key, io, nil, 0, [])) entry.waiters << waiter arm_poll(entry) @timers.push(Timer.new(monotonic + timeout, waiter, 0, nil)) if timeout Fiber.yield # resumed with the ready-events mask (or 0 on timeout) ensure # On the normal completion path the loop already removed us from the entry # and re-armed/dropped it. If we are unwinding by any other path (timeout, # or Timeout.timeout raised into us), detach and re-arm/cancel for the rest. waiter.settled = true if entry.waiters.delete(waiter) if entry.waiters.empty? cancel_entry(entry) else arm_poll(entry) # interest shrank; existing poll still covers it (no-op) end end end |
#io_write(io, buffer, length, offset = 0) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/winloop/scheduler.rb', line 109 def io_write(io, buffer, length, offset = 0) total = 0 maximum = buffer.size - offset return 0 if maximum <= 0 minimum = length.zero? ? maximum : length minimum = maximum if minimum > maximum # never spin writing empty chunks while total < minimum chunk = buffer.get_string(offset + total, maximum - total) break if chunk.empty? result = io.write_nonblock(chunk, exception: false) if result == :wait_writable io_wait(io, WRITABLE, nil) else total += result end end total end |
#kernel_sleep(duration = nil) ⇒ Object
—- timers ———————————————————–
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/winloop/scheduler.rb', line 130 def kernel_sleep(duration = nil) fiber = Fiber.current waiter = Waiter.new(fiber, false) # Park under @parked so a cross-thread #unblock (e.g. ConditionVariable#wait # with a timeout parks HERE, and a racing signal must wake us through the # SAME one-shot guard as the timer — never a second, competing wakeup). @lock.synchronize { @parked[fiber] = waiter } @timers.push(Timer.new(monotonic + duration, waiter, nil, nil)) if duration Fiber.yield duration ensure @lock.synchronize do waiter.settled = true # disarm a not-yet-fired timer on abnormal exit @parked.delete(fiber) end end |
#monotonic ⇒ Object
54 55 56 |
# File 'lib/winloop/scheduler.rb', line 54 def monotonic Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#process_wait(pid, flags) ⇒ Object
241 242 243 |
# File 'lib/winloop/scheduler.rb', line 241 def process_wait(pid, flags) await_on_thread { Process::Status.wait(pid, flags) } end |
#run ⇒ Object
—- the run loop ——————————————————
262 263 264 |
# File 'lib/winloop/scheduler.rb', line 262 def run run_once until done? end |
#timeout_after(duration, exception_class, *exception_arguments) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/winloop/scheduler.rb', line 147 def timeout_after(duration, exception_class, *exception_arguments) waiter = Waiter.new(Fiber.current, false) @timers.push(Timer.new(monotonic + duration, waiter, nil, [exception_class, exception_arguments])) begin yield duration ensure # disarm if the block finished before the deadline @lock.synchronize { waiter.settled = true } end end |
#unblock(_blocker, fiber) ⇒ Object
Called from ANY thread: only mutate shared state under the lock and wake the loop — never resume the fiber here.
We wake by FIBER identity through its single parked Waiter, ignoring ‘blocker`. That is deliberate and important:
* Thread#join calls block(thread) but MRI calls unblock(<a DIFFERENT
object>, fiber); keying on the fiber wakes it correctly anyway.
* A fiber can have more than one live wakeup source (a timeout timer AND
this unblock — e.g. ConditionVariable#wait(mutex, timeout) racing a
signal). Routing through the one Waiter's `settled` guard means exactly
one of them resumes the fiber; the loser is dropped. A raw, unguarded
"@ready << fiber" would double-resume and cut the fiber's NEXT wait short.
186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/winloop/scheduler.rb', line 186 def unblock(_blocker, fiber) @lock.synchronize do waiter = @parked[fiber] if waiter wake_locked(waiter, true) elsif @ready.none? { |f, _| f.equal?(fiber) } # Not parked in any wait we track (spurious/late unblock) — last resort, # de-duplicated so it can never double-enqueue. @ready << [fiber, true] end end @backend.wakeup end |