Class: Winloop::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/winloop/scheduler.rb

Overview

A Fiber::Scheduler for MRI on Windows, backed by an I/O Completion Port.

Strategy (matches libuv/mio/wepoll on Windows):

* io_wait          -> IOCTL_AFD_POLL submitted as an overlapped op on the IOCP
                    (the only way to get *readiness* onto a completion port).
                    Polls are COALESCED per socket: at most one AFD poll is
                    outstanding per socket, covering the union of every
                    waiter's interest, and its completion fans out to all of
                    them. (The AFD driver completes only ONE pending poll per
                    readiness edge, so independent polls on the same socket
                    would lose wakeups.)
* io_read/io_write -> recv_nonblock/write_nonblock driven by io_wait
* kernel_sleep /
timeout_after    -> a monotonic timer min-heap (the loop's GQCSEx timeout)
* block / unblock  -> in-process waiter lists; cross-thread unblock wakes the
                    loop with PostQueuedCompletionStatus
* close            -> drives the run loop until every fiber has finished

Single thread-of-use: only the owning thread runs the loop and resumes fibers. #unblock may be called from another thread and only mutates the (locked) ready queue + posts a wakeup; it never resumes a fiber itself.

Defined Under Namespace

Classes: IoWaiter, PollEntry, Timer, Waiter

Instance Method Summary collapse

Constructor Details

#initializeScheduler

Returns a new instance of Scheduler.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/winloop/scheduler.rb', line 42

def initialize
  @backend = Backend.new
  @ready   = []     # [[fiber, resume_value], ...]   (shared: unblock pushes here)
  @timers  = MinHeap.new # of Timer, keyed on :deadline
  @polls   = {}     # socket key (fileno) => PollEntry
  @ops     = {}     # backend poll id => PollEntry
  @parked  = {}     # fiber => Waiter   (its CURRENT block/kernel_sleep park)
  @lock    = Thread::Mutex.new # guards @ready + @parked (the cross-thread path)
  @loop    = Fiber.current     # the thread's root fiber == the loop
  @closed  = false

  # ---- generic OVERLAPPED op state (winloop 0.2) ----
  @thread     = Thread.current # the op API is loop-thread-only (enforced)
  @op_waiters = {} # op_id => Waiter        (a fiber parked in await_op)
  @op_orphans = {} # op_id => true          (timed-out/unwound awaits; the late
                   #                         packet is retired without waking anyone)
  @op_done    = {} # op_id => [bytes, error] (completed before anyone awaited)
  @op_shut    = false # true once the loop has fully finished (backend shut down).
                      # NOT @closed: under Winloop.run, #close IS the event loop,
                      # so @closed is true while op-protocol fibers still run.
end

Instance Method Details

#address_resolve(hostname) ⇒ Object



243
244
245
246
247
248
249
# File 'lib/winloop/scheduler.rb', line 243

def address_resolve(hostname)
  await_on_thread do
    Addrinfo.getaddrinfo(hostname, nil, nil, :STREAM).map(&:ip_address).uniq
  end
rescue SocketError
  []
end

#await_on_thread(&block) ⇒ Object

Run a blocking operation on a fresh, scheduler-FREE thread and park the calling fiber on it cooperatively (Thread#value -> our block hook). The worker is killed if this fiber unwinds first (Timeout.timeout / Fiber kill) so it can never outlive us, leak, or race a later operation. We do NOT join after kill — that would re-enter the scheduler mid-unwind; Thread#kill interrupts the blocking syscall and the worker terminates on its own.



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/winloop/scheduler.rb', line 226

def await_on_thread(&block)
  worker = Thread.new(&block)
  worker.report_on_exception = false # expected errors are surfaced via #value
  begin
    worker.value
  ensure
    if worker.alive?
      # Unwound first (Timeout/kill): interrupt the worker out of its blocking
      # syscall and WAIT for it to die before returning, so it can't read (and
      # discard) bytes that a later operation on the same IO expects. kill
      # interrupts readpartial immediately, so this join does not block.
      worker.kill
      worker.join
    end
  end
end

#await_op(op_id, timeout: nil) ⇒ Object

Park the calling fiber until op_id's completion packet is reaped by the loop, then return [bytes, error, data] (op_result is pulled HERE, in the woken fiber, so orphaned completions never allocate Strings). On timeout: returns nil, auto-cancels and orphans the op. Either way the op is retired: after await_op returns — value or nil — the op_id must not be used again.

timeout: nil (wait forever — safe provided the op was submitted on a handle that is still the associated handle) or a positive Numeric in seconds. Recommendation: pass a real timeout while first integrating a new native API; switch to nil once the submit path is proven.



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# File 'lib/winloop/scheduler.rb', line 313

def await_op(op_id, timeout: nil)
  op_guard!
  validate_op_timeout!(timeout)
  if @op_done.delete(op_id)              # early completion: no yield
    return @backend.op_result(op_id)
  end
  raise Error, "winloop: op #{op_id} already has a waiter" if @op_waiters.key?(op_id)
  raise Error, "winloop: op #{op_id} has not been submitted" if @backend.op_state(op_id) == :prepared
  waiter = Waiter.new(Fiber.current, false)
  @op_waiters[op_id] = waiter
  @timers.push(Timer.new(monotonic + timeout, waiter, nil, nil)) if timeout
  completed = Fiber.yield                # [bytes, error] or nil (timer fired)
  completed ? @backend.op_result(op_id) : nil
ensure
  # NOTHING in this ensure may raise — it runs during Timeout::Error /
  # Fiber#raise / Fiber#kill unwinds and must never mask the in-flight
  # exception (op_cancel's no-raise-on-CancelIoEx contract exists for this).
  if waiter
    waiter.settled = true
    if @op_waiters.delete(op_id)         # loop did NOT deliver: timeout or unwind
      @op_orphans[op_id] = true
      begin
        @backend.op_cancel(op_id)        # late packet -> orphan path retires it
      rescue Error
        # backend already shut down mid-unwind: nothing left to cancel
      end
    elsif !completed                     # loop DID deliver, but we were unwound
      # (Fiber#kill, or a sibling raise) before resuming with the value: the
      # record sits OP_COMPLETED with nobody left to op_result it — free it
      # now. Guarded: the loop's settled/timer branch may have retired the id.
      @backend.op_free(op_id) if op_completed_unconsumed?(op_id)
    end
  end
end

#block(blocker, timeout = nil) ⇒ Object

---- block / unblock (Mutex, ConditionVariable, Queue, Thread#join) ----



171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/winloop/scheduler.rb', line 171

def block(blocker, timeout = nil)
  fiber  = Fiber.current
  waiter = Waiter.new(fiber, false)
  @lock.synchronize { @parked[fiber] = waiter }
  @timers.push(Timer.new(monotonic + timeout, waiter, false, nil)) if timeout
  Fiber.yield # true (unblocked) or false (timed out)
ensure
  @lock.synchronize do
    waiter.settled = true
    @parked.delete(fiber)
  end
end

#closeObject

MRI calls this automatically at thread/program exit. It IS the event loop.



351
352
353
354
355
356
357
358
359
360
# File 'lib/winloop/scheduler.rb', line 351

def close
  return if @closed
  @closed = true
  run
ensure
  @op_shut = true # the op protocol is over; op_* / await_op raise from here on
  @backend.shutdown
  # NEVER call Fiber.set_scheduler(nil) here — it raises SystemStackError
  # during MRI's implicit close at thread exit.
end

#closed?Boolean

Returns:

  • (Boolean)


362
# File 'lib/winloop/scheduler.rb', line 362

def closed? = @closed

#fiber(&block) ⇒ Object

---- fiber creation ----------------------------------------------------



212
213
214
215
216
# File 'lib/winloop/scheduler.rb', line 212

def fiber(&block)
  fiber = Fiber.new(blocking: false, &block)
  fiber.resume
  fiber
end

#io_read(io, buffer, length, offset = 0) ⇒ Object

---- stream read/write (recv_nonblock + io_wait) ----------------------

length is the MINIMUM bytes to transfer; 0 means "the whole region [offset, buffer.size)". MUST use recv_nonblock — read_nonblock / sysread / readpartial re-enter this hook on mswin and recurse forever.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/winloop/scheduler.rb', line 100

def io_read(io, buffer, length, offset = 0)
  return blocking_read(io, buffer, length, offset) unless io.respond_to?(:recv_nonblock)
  total   = 0
  minimum = length.zero? ? 1 : length
  while total < minimum
    maximum = buffer.size - offset - total
    break if maximum <= 0
    result = io.recv_nonblock(maximum, exception: false)
    case result
    when :wait_readable then io_wait(io, READABLE, nil)
    when nil then break # EOF
    else
      buffer.set_string(result, offset + total)
      total += result.bytesize
    end
  end
  total
end

#io_wait(io, events, timeout) ⇒ Object

Wait until io is ready for events; returns the ready-events bitmask (0 on timeout). events is an OR of Winloop::READABLE/WRITABLE/PRIORITY.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/winloop/scheduler.rb', line 72

def io_wait(io, events, timeout)
  key    = io.fileno
  waiter = IoWaiter.new(Fiber.current, events, false)
  entry  = (@polls[key] ||= PollEntry.new(key, io, nil, 0, []))
  entry.waiters << waiter
  arm_poll(entry)
  @timers.push(Timer.new(monotonic + timeout, waiter, 0, nil)) if timeout
  Fiber.yield # resumed with the ready-events mask (or 0 on timeout)
ensure
  # On the normal completion path the loop already removed us from the entry
  # and re-armed/dropped it. If we are unwinding by any other path (timeout,
  # or Timeout.timeout raised into us), detach and re-arm/cancel for the rest.
  waiter.settled = true
  if entry.waiters.delete(waiter)
    if entry.waiters.empty?
      cancel_entry(entry)
    else
      arm_poll(entry) # interest shrank; existing poll still covers it (no-op)
    end
  end
end

#io_write(io, buffer, length, offset = 0) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/winloop/scheduler.rb', line 119

def io_write(io, buffer, length, offset = 0)
  total   = 0
  maximum = buffer.size - offset
  return 0 if maximum <= 0
  minimum = length.zero? ? maximum : length
  minimum = maximum if minimum > maximum # never spin writing empty chunks
  while total < minimum
    chunk = buffer.get_string(offset + total, maximum - total)
    break if chunk.empty?
    result = io.write_nonblock(chunk, exception: false)
    if result == :wait_writable
      io_wait(io, WRITABLE, nil)
    else
      total += result
    end
  end
  total
end

#kernel_sleep(duration = nil) ⇒ Object

---- timers -----------------------------------------------------------



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/winloop/scheduler.rb', line 140

def kernel_sleep(duration = nil)
  fiber  = Fiber.current
  waiter = Waiter.new(fiber, false)
  # Park under @parked so a cross-thread #unblock (e.g. ConditionVariable#wait
  # with a timeout parks HERE, and a racing signal must wake us through the
  # SAME one-shot guard as the timer — never a second, competing wakeup).
  @lock.synchronize { @parked[fiber] = waiter }
  @timers.push(Timer.new(monotonic + duration, waiter, nil, nil)) if duration
  Fiber.yield
  duration
ensure
  @lock.synchronize do
    waiter.settled = true # disarm a not-yet-fired timer on abnormal exit
    @parked.delete(fiber)
  end
end

#monotonicObject



64
65
66
# File 'lib/winloop/scheduler.rb', line 64

def monotonic
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#op_abandon(op_id) ⇒ Object



288
289
290
291
# File 'lib/winloop/scheduler.rb', line 288

def op_abandon(op_id)
  op_guard!
  @backend.op_abandon(op_id)
end

#op_associate(handle) ⇒ Object

---- generic OVERLAPPED completions (the cross-gem op protocol) ---------

Client gems feature-detect with Fiber.scheduler.respond_to?(:await_op) and talk ONLY to these wrappers (never the Backend — they cannot reach #wait/#shutdown and corrupt the loop). The contract loop:

op_associate(h)                            # once, at handle-open time
op_id, ov, buf = op_prepare(h, capacity: n)
ok = <the client's own native submit using ov/buf>
if ok # native call returned TRUE or GetLastError == ERROR_IO_PENDING
op_submitted(op_id)
bytes, error, data = await_op(op_id, timeout: t)  # fiber parks here
else  # synchronous failure: no packet will ever arrive
op_abandon(op_id)
end

All of these are loop-thread-only and raise once the scheduler has shut down.



273
274
275
276
# File 'lib/winloop/scheduler.rb', line 273

def op_associate(handle)
  op_guard!
  @backend.associate(handle)
end

#op_cancel(op_id) ⇒ Object



293
294
295
296
# File 'lib/winloop/scheduler.rb', line 293

def op_cancel(op_id)
  op_guard!
  @backend.op_cancel(op_id)
end

#op_prepare(handle, tag: 0, capacity: 0) ⇒ Object



278
279
280
281
# File 'lib/winloop/scheduler.rb', line 278

def op_prepare(handle, tag: 0, capacity: 0)
  op_guard!
  @backend.op_prepare(handle, tag: tag, capacity: capacity)
end

#op_state(op_id) ⇒ Object



298
299
300
301
# File 'lib/winloop/scheduler.rb', line 298

def op_state(op_id)
  op_guard!
  @backend.op_state(op_id)
end

#op_submitted(op_id) ⇒ Object



283
284
285
286
# File 'lib/winloop/scheduler.rb', line 283

def (op_id)
  op_guard!
  @backend.(op_id)
end

#process_wait(pid, flags) ⇒ Object



251
252
253
# File 'lib/winloop/scheduler.rb', line 251

def process_wait(pid, flags)
  await_on_thread { Process::Status.wait(pid, flags) }
end

#runObject

---- the run loop ------------------------------------------------------



366
367
368
# File 'lib/winloop/scheduler.rb', line 366

def run
  run_once until done?
end

#timeout_after(duration, exception_class, *exception_arguments) ⇒ Object



157
158
159
160
161
162
163
164
165
166
167
# File 'lib/winloop/scheduler.rb', line 157

def timeout_after(duration, exception_class, *exception_arguments)
  waiter = Waiter.new(Fiber.current, false)
  @timers.push(Timer.new(monotonic + duration, waiter, nil,
                         [exception_class, exception_arguments]))
  begin
    yield duration
  ensure
    # disarm if the block finished before the deadline
    @lock.synchronize { waiter.settled = true }
  end
end

#unblock(_blocker, fiber) ⇒ Object

Called from ANY thread: only mutate shared state under the lock and wake the loop — never resume the fiber here.

We wake by FIBER identity through its single parked Waiter, ignoring blocker. That is deliberate and important:

* Thread#join calls block(thread) but MRI calls unblock(<a DIFFERENT
object>, fiber); keying on the fiber wakes it correctly anyway.
* A fiber can have more than one live wakeup source (a timeout timer AND
this unblock — e.g. ConditionVariable#wait(mutex, timeout) racing a
signal). Routing through the one Waiter's `settled` guard means exactly
one of them resumes the fiber; the loser is dropped. A raw, unguarded
"@ready << fiber" would double-resume and cut the fiber's NEXT wait short.


196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/winloop/scheduler.rb', line 196

def unblock(_blocker, fiber)
  @lock.synchronize do
    waiter = @parked[fiber]
    if waiter
      wake_locked(waiter, true)
    elsif @ready.none? { |f, _| f.equal?(fiber) }
      # Not parked in any wait we track (spurious/late unblock) — last resort,
      # de-duplicated so it can never double-enqueue.
      @ready << [fiber, true]
    end
  end
  @backend.wakeup
end