Class: Winloop::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/winloop/scheduler.rb

Overview

A Fiber::Scheduler for MRI on Windows, backed by an I/O Completion Port.

Strategy (matches libuv/mio/wepoll on Windows):

* io_wait          -> IOCTL_AFD_POLL submitted as an overlapped op on the IOCP
                      (the only way to get *readiness* onto a completion port).
                      Polls are COALESCED per socket: at most one AFD poll is
                      outstanding per socket, covering the union of every
                      waiter's interest, and its completion fans out to all of
                      them. (The AFD driver completes only ONE pending poll per
                      readiness edge, so independent polls on the same socket
                      would lose wakeups.)
* io_read/io_write -> recv_nonblock/write_nonblock driven by io_wait
* kernel_sleep /
  timeout_after    -> a monotonic timer min-heap (the loop's GQCSEx timeout)
* block / unblock  -> in-process waiter lists; cross-thread unblock wakes the
                      loop with PostQueuedCompletionStatus
* close            -> drives the run loop until every fiber has finished

Single thread-of-use: only the owning thread runs the loop and resumes fibers. #unblock may be called from another thread and only mutates the (locked) ready queue + posts a wakeup; it never resumes a fiber itself.

Defined Under Namespace

Classes: IoWaiter, PollEntry, Timer, Waiter

Instance Method Summary collapse

Constructor Details

#initializeScheduler

Returns a new instance of Scheduler.



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/winloop/scheduler.rb', line 42

def initialize
  @backend = Backend.new
  @ready   = []     # [[fiber, resume_value], ...]   (shared: unblock pushes here)
  @timers  = MinHeap.new # of Timer, keyed on :deadline
  @polls   = {}     # socket key (fileno) => PollEntry
  @ops     = {}     # backend poll id => PollEntry
  @parked  = {}     # fiber => Waiter   (its CURRENT block/kernel_sleep park)
  @lock    = Thread::Mutex.new # guards @ready + @parked (the cross-thread path)
  @loop    = Fiber.current     # the thread's root fiber == the loop
  @closed  = false
end

Instance Method Details

#address_resolve(hostname) ⇒ Object



233
234
235
236
237
238
239
# File 'lib/winloop/scheduler.rb', line 233

def address_resolve(hostname)
  await_on_thread do
    Addrinfo.getaddrinfo(hostname, nil, nil, :STREAM).map(&:ip_address).uniq
  end
rescue SocketError
  []
end

#await_on_thread(&block) ⇒ Object

Run a blocking operation on a fresh, scheduler-FREE thread and park the calling fiber on it cooperatively (Thread#value -> our block hook). The worker is killed if this fiber unwinds first (Timeout.timeout / Fiber kill) so it can never outlive us, leak, or race a later operation. We do NOT join after kill — that would re-enter the scheduler mid-unwind; Thread#kill interrupts the blocking syscall and the worker terminates on its own.



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/winloop/scheduler.rb', line 216

def await_on_thread(&block)
  worker = Thread.new(&block)
  worker.report_on_exception = false # expected errors are surfaced via #value
  begin
    worker.value
  ensure
    if worker.alive?
      # Unwound first (Timeout/kill): interrupt the worker out of its blocking
      # syscall and WAIT for it to die before returning, so it can't read (and
      # discard) bytes that a later operation on the same IO expects. kill
      # interrupts readpartial immediately, so this join does not block.
      worker.kill
      worker.join
    end
  end
end

#block(blocker, timeout = nil) ⇒ Object

—- block / unblock (Mutex, ConditionVariable, Queue, Thread#join) —-



161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/winloop/scheduler.rb', line 161

def block(blocker, timeout = nil)
  fiber  = Fiber.current
  waiter = Waiter.new(fiber, false)
  @lock.synchronize { @parked[fiber] = waiter }
  @timers.push(Timer.new(monotonic + timeout, waiter, false, nil)) if timeout
  Fiber.yield # true (unblocked) or false (timed out)
ensure
  @lock.synchronize do
    waiter.settled = true
    @parked.delete(fiber)
  end
end

#closeObject

MRI calls this automatically at thread/program exit. It IS the event loop.



248
249
250
251
252
253
254
255
256
# File 'lib/winloop/scheduler.rb', line 248

def close
  return if @closed
  @closed = true
  run
ensure
  @backend.shutdown
  # NEVER call Fiber.set_scheduler(nil) here — it raises SystemStackError
  # during MRI's implicit close at thread exit.
end

#closed?Boolean

Returns:

  • (Boolean)


258
# File 'lib/winloop/scheduler.rb', line 258

def closed? = @closed

#fiber(&block) ⇒ Object

—- fiber creation —————————————————-



202
203
204
205
206
# File 'lib/winloop/scheduler.rb', line 202

def fiber(&block)
  fiber = Fiber.new(blocking: false, &block)
  fiber.resume
  fiber
end

#io_read(io, buffer, length, offset = 0) ⇒ Object

—- stream read/write (recv_nonblock + io_wait) ———————-

‘length` is the MINIMUM bytes to transfer; 0 means “the whole region [offset, buffer.size)”. MUST use recv_nonblock — read_nonblock / sysread / readpartial re-enter this hook on mswin and recurse forever.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/winloop/scheduler.rb', line 90

def io_read(io, buffer, length, offset = 0)
  return blocking_read(io, buffer, length, offset) unless io.respond_to?(:recv_nonblock)
  total   = 0
  minimum = length.zero? ? 1 : length
  while total < minimum
    maximum = buffer.size - offset - total
    break if maximum <= 0
    result = io.recv_nonblock(maximum, exception: false)
    case result
    when :wait_readable then io_wait(io, READABLE, nil)
    when nil then break # EOF
    else
      buffer.set_string(result, offset + total)
      total += result.bytesize
    end
  end
  total
end

#io_wait(io, events, timeout) ⇒ Object

Wait until ‘io` is ready for `events`; returns the ready-events bitmask (0 on timeout). events is an OR of Winloop::READABLE/WRITABLE/PRIORITY.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/winloop/scheduler.rb', line 62

def io_wait(io, events, timeout)
  key    = io.fileno
  waiter = IoWaiter.new(Fiber.current, events, false)
  entry  = (@polls[key] ||= PollEntry.new(key, io, nil, 0, []))
  entry.waiters << waiter
  arm_poll(entry)
  @timers.push(Timer.new(monotonic + timeout, waiter, 0, nil)) if timeout
  Fiber.yield # resumed with the ready-events mask (or 0 on timeout)
ensure
  # On the normal completion path the loop already removed us from the entry
  # and re-armed/dropped it. If we are unwinding by any other path (timeout,
  # or Timeout.timeout raised into us), detach and re-arm/cancel for the rest.
  waiter.settled = true
  if entry.waiters.delete(waiter)
    if entry.waiters.empty?
      cancel_entry(entry)
    else
      arm_poll(entry) # interest shrank; existing poll still covers it (no-op)
    end
  end
end

#io_write(io, buffer, length, offset = 0) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/winloop/scheduler.rb', line 109

def io_write(io, buffer, length, offset = 0)
  total   = 0
  maximum = buffer.size - offset
  return 0 if maximum <= 0
  minimum = length.zero? ? maximum : length
  minimum = maximum if minimum > maximum # never spin writing empty chunks
  while total < minimum
    chunk = buffer.get_string(offset + total, maximum - total)
    break if chunk.empty?
    result = io.write_nonblock(chunk, exception: false)
    if result == :wait_writable
      io_wait(io, WRITABLE, nil)
    else
      total += result
    end
  end
  total
end

#kernel_sleep(duration = nil) ⇒ Object

—- timers ———————————————————–



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/winloop/scheduler.rb', line 130

def kernel_sleep(duration = nil)
  fiber  = Fiber.current
  waiter = Waiter.new(fiber, false)
  # Park under @parked so a cross-thread #unblock (e.g. ConditionVariable#wait
  # with a timeout parks HERE, and a racing signal must wake us through the
  # SAME one-shot guard as the timer — never a second, competing wakeup).
  @lock.synchronize { @parked[fiber] = waiter }
  @timers.push(Timer.new(monotonic + duration, waiter, nil, nil)) if duration
  Fiber.yield
  duration
ensure
  @lock.synchronize do
    waiter.settled = true # disarm a not-yet-fired timer on abnormal exit
    @parked.delete(fiber)
  end
end

#monotonicObject



54
55
56
# File 'lib/winloop/scheduler.rb', line 54

def monotonic
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#process_wait(pid, flags) ⇒ Object



241
242
243
# File 'lib/winloop/scheduler.rb', line 241

def process_wait(pid, flags)
  await_on_thread { Process::Status.wait(pid, flags) }
end

#runObject

—- the run loop ——————————————————



262
263
264
# File 'lib/winloop/scheduler.rb', line 262

def run
  run_once until done?
end

#timeout_after(duration, exception_class, *exception_arguments) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
# File 'lib/winloop/scheduler.rb', line 147

def timeout_after(duration, exception_class, *exception_arguments)
  waiter = Waiter.new(Fiber.current, false)
  @timers.push(Timer.new(monotonic + duration, waiter, nil,
                         [exception_class, exception_arguments]))
  begin
    yield duration
  ensure
    # disarm if the block finished before the deadline
    @lock.synchronize { waiter.settled = true }
  end
end

#unblock(_blocker, fiber) ⇒ Object

Called from ANY thread: only mutate shared state under the lock and wake the loop — never resume the fiber here.

We wake by FIBER identity through its single parked Waiter, ignoring ‘blocker`. That is deliberate and important:

* Thread#join calls block(thread) but MRI calls unblock(<a DIFFERENT
  object>, fiber); keying on the fiber wakes it correctly anyway.
* A fiber can have more than one live wakeup source (a timeout timer AND
  this unblock — e.g. ConditionVariable#wait(mutex, timeout) racing a
  signal). Routing through the one Waiter's `settled` guard means exactly
  one of them resumes the fiber; the loser is dropped. A raw, unguarded
  "@ready << fiber" would double-resume and cut the fiber's NEXT wait short.


186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/winloop/scheduler.rb', line 186

def unblock(_blocker, fiber)
  @lock.synchronize do
    waiter = @parked[fiber]
    if waiter
      wake_locked(waiter, true)
    elsif @ready.none? { |f, _| f.equal?(fiber) }
      # Not parked in any wait we track (spurious/late unblock) — last resort,
      # de-duplicated so it can never double-enqueue.
      @ready << [fiber, true]
    end
  end
  @backend.wakeup
end