Class: Tep::Scheduler
- Inherits:
-
Object
- Object
- Tep::Scheduler
- Defined in:
- lib/tep/scheduler.rb
Constant Summary collapse
- READ =
Mode bits for io_wait. Mirror sphttp’s wire encoding so the C side and Ruby side stay aligned.
1- WRITE =
2
Class Method Summary collapse
- .alive_count ⇒ Object
- .any_io_waiter ⇒ Object
-
.any_time_ready ⇒ Object
Is any alive fiber’s wake_at already <= now? Used by tick() to decide whether poll() can block: if anyone is time-due, the poll timeout collapses to 0 (non-blocking peek) so we don’t waste wall time idling when there’s runnable work.
-
.clear ⇒ Object
Reset the schedulable set.
-
.io_wait(fd, mode, timeout_seconds) ⇒ Object
Park the current fiber until ‘fd` is ready for the given `mode` bits (1=READ, 2=WRITE, 3=both) OR `timeout_seconds` elapses.
- .next_wake ⇒ Object
-
.pause(seconds) ⇒ Object
Called from within a fiber’s body to suspend until at-or- after ‘seconds` from now.
-
.poll_round(timeout_ms) ⇒ Object
Build poll set from parked-on-I/O fibers, call poll(2), and write observed-ready bits back into the parallel arrays.
-
.run_for(seconds) ⇒ Object
Drain until ‘seconds` has elapsed OR every fiber’s done.
-
.run_until_empty ⇒ Object
Drain.
-
.scheduled_context? ⇒ Boolean
True iff a Tep::Scheduler-managed fiber is currently executing.
- .spawn_fiber(f) ⇒ Object
-
.tick(poll_timeout_ms) ⇒ Object
One scheduler pass.
Class Method Details
.alive_count ⇒ Object
320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/tep/scheduler.rb', line 320 def self.alive_count n = 0 i = 0 total = Tep::APP.sched_fibers.length while i < total if Tep::APP.sched_fibers[i].f.alive? n += 1 end i += 1 end n end |
.any_io_waiter ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/tep/scheduler.rb', line 226 def self.any_io_waiter i = 0 n = Tep::APP.sched_fibers.length while i < n if Tep::APP.sched_fibers[i].f.alive? && Tep::APP.sched_io_fd[i] >= 0 && Tep::APP.sched_io_ready[i] == 0 return true end i += 1 end false end |
.any_time_ready ⇒ Object
Is any alive fiber’s wake_at already <= now? Used by tick() to decide whether poll() can block: if anyone is time-due, the poll timeout collapses to 0 (non-blocking peek) so we don’t waste wall time idling when there’s runnable work.
244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/tep/scheduler.rb', line 244 def self.any_time_ready now = Time.now.to_i i = 0 n = Tep::APP.sched_fibers.length while i < n if Tep::APP.sched_fibers[i].f.alive? && Tep::APP.sched_wake_at[i] <= now return true end i += 1 end false end |
.clear ⇒ Object
Reset the schedulable set. Useful between worker-loop iterations or between tests.
309 310 311 312 313 314 315 316 317 318 |
# File 'lib/tep/scheduler.rb', line 309 def self.clear while Tep::APP.sched_fibers.length > 0 Tep::APP.sched_fibers.delete_at(0) Tep::APP.sched_wake_at.delete_at(0) Tep::APP.sched_io_fd.delete_at(0) Tep::APP.sched_io_mode.delete_at(0) Tep::APP.sched_io_ready.delete_at(0) end 0 end |
.io_wait(fd, mode, timeout_seconds) ⇒ Object
Park the current fiber until ‘fd` is ready for the given `mode` bits (1=READ, 2=WRITE, 3=both) OR `timeout_seconds` elapses. Returns the observed-ready bits (0 on timeout). When called from outside a fiber, falls back to a single poll() call so the same code works at top level.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/tep/scheduler.rb', line 280 def self.io_wait(fd, mode, timeout_seconds) idx = Tep::APP.sched_current if idx < 0 # No fiber context -- single-shot poll inline. Sock.sphttp_poll_reset slot = Sock.sphttp_poll_add(fd, mode) Sock.sphttp_poll_run(timeout_seconds * 1000) return Sock.sphttp_poll_ready(slot) end Tep::APP.sched_io_fd[idx] = fd Tep::APP.sched_io_mode[idx] = mode Tep::APP.sched_io_ready[idx] = 0 if timeout_seconds < 0 # "Wait forever for I/O": -1 would mean "ready now" to the # tick picker, so use a far-future wake_at as the sentinel. Tep::APP.sched_wake_at[idx] = Time.now.to_i + 86400 else Tep::APP.sched_wake_at[idx] = Time.now.to_i + timeout_seconds end Fiber.yield ready = Tep::APP.sched_io_ready[idx] Tep::APP.sched_io_fd[idx] = -1 Tep::APP.sched_io_mode[idx] = 0 Tep::APP.sched_io_ready[idx] = 0 ready end |
.next_wake ⇒ Object
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/tep/scheduler.rb', line 208 def self.next_wake best = -1 i = 0 n = Tep::APP.sched_fibers.length while i < n if Tep::APP.sched_fibers[i].f.alive? if best < 0 || Tep::APP.sched_wake_at[i] < Tep::APP.sched_wake_at[best] best = i end end i += 1 end if best < 0 return -1 end Tep::APP.sched_wake_at[best] end |
.pause(seconds) ⇒ Object
Called from within a fiber’s body to suspend until at-or- after ‘seconds` from now. Named `pause` rather than `sleep` to keep the semantics distinct from `Kernel#sleep`: this is a fiber-aware yield that returns the cooperative scheduler to the dispatch loop, not an OS-level sleep. Outside a fiber it falls through to bare `sleep(seconds)`.
263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/tep/scheduler.rb', line 263 def self.pause(seconds) idx = Tep::APP.sched_current if idx < 0 # Called from outside any fiber -- fall back to POSIX sleep. sleep(seconds) return 0 end Tep::APP.sched_wake_at[idx] = Time.now.to_i + seconds Fiber.yield 0 end |
.poll_round(timeout_ms) ⇒ Object
Build poll set from parked-on-I/O fibers, call poll(2), and write observed-ready bits back into the parallel arrays. ‘timeout_ms` is the poll() timeout (-1 = block forever, 0 = non-blocking peek). Idempotent for an empty set.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/tep/scheduler.rb', line 123 def self.poll_round(timeout_ms) Sock.sphttp_poll_reset slots = [-1] # slot index parallel to sched_fibers; -1 = not polled slots.delete_at(0) added = 0 i = 0 n = Tep::APP.sched_fibers.length while i < n slot = -1 if Tep::APP.sched_fibers[i].f.alive? && Tep::APP.sched_io_fd[i] >= 0 && Tep::APP.sched_io_ready[i] == 0 slot = Sock.sphttp_poll_add(Tep::APP.sched_io_fd[i], Tep::APP.sched_io_mode[i]) added += 1 end slots.push(slot) i += 1 end if added == 0 return 0 end Sock.sphttp_poll_run(timeout_ms) now = Time.now.to_i i = 0 while i < n if slots[i] >= 0 ready = Sock.sphttp_poll_ready(slots[i]) if ready > 0 Tep::APP.sched_io_ready[i] = ready Tep::APP.sched_wake_at[i] = now end end i += 1 end added end |
.run_for(seconds) ⇒ Object
Drain until ‘seconds` has elapsed OR every fiber’s done. Between empty passes, blocks in poll(2) (or sleep, if no I/O waits) until the next wake-up.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/tep/scheduler.rb', line 176 def self.run_for(seconds) deadline = Time.now.to_i + seconds while Time.now.to_i < deadline if !Scheduler.tick(0) # Nothing ready this pass. Compute the next deadline: # min(next_wake, overall_deadline). If any fiber is # parked on I/O, block in poll() until that or the # timer hits. next_at = Scheduler.next_wake gap = deadline - Time.now.to_i if next_at >= 0 tgap = next_at - Time.now.to_i if tgap < gap gap = tgap end end if gap < 0 gap = 0 end if Scheduler.any_io_waiter # Park in poll for up to `gap` seconds. Scheduler.poll_round(gap * 1000) elsif next_at < 0 return 0 elsif gap > 0 sleep gap end end end 0 end |
.run_until_empty ⇒ Object
Drain. Resumes everything ready until the schedulable set is empty (every fiber finished or all are waiting for a future wake_at / I/O). Returns the number of resumes performed. Pure non-blocking; no poll() wait between passes.
165 166 167 168 169 170 171 |
# File 'lib/tep/scheduler.rb', line 165 def self.run_until_empty n = 0 while Scheduler.tick(0) n += 1 end n end |
.scheduled_context? ⇒ Boolean
True iff a Tep::Scheduler-managed fiber is currently executing. Set by tick() right before f.resume and reset right after, so this is the canonical “am I in cooperative context?” check for callers that want to pick a blocking vs. fiber-yielding path (e.g. Tep::Http – see lib/tep/http.rb#send_req).
338 339 340 |
# File 'lib/tep/scheduler.rb', line 338 def self.scheduled_context? Tep::APP.sched_current >= 0 end |
.spawn_fiber(f) ⇒ Object
48 49 50 51 52 53 54 55 |
# File 'lib/tep/scheduler.rb', line 48 def self.spawn_fiber(f) Tep::APP.sched_fibers.push(Tep::FiberSlot.new(f)) Tep::APP.sched_wake_at.push(-1) Tep::APP.sched_io_fd.push(-1) Tep::APP.sched_io_mode.push(0) Tep::APP.sched_io_ready.push(0) f end |
.tick(poll_timeout_ms) ⇒ Object
One scheduler pass. If any fibers are parked on I/O, build a poll set, run poll(2) for up to ‘poll_timeout_ms`, and mark ready ones. Then resume the soonest-due fiber whose wake_at is <= now. Returns true if it resumed something.
If a fiber is already time-due (wake_at <= now – e.g. a newly spawned fiber with wake_at=-1, or a fiber that just called pause(0)), poll() must NOT block: we have runnable work and any wait is wasted wall time. This matters for the cooperative request path – when an outer handler parks on io_wait and the accept fiber spawns an inner connection-fiber, the next tick has a wake_at=-1 fiber ready; without this short-circuit each “hand off to the freshly-spawned fiber” step costs a full poll-timeout’s worth of latency.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/tep/scheduler.rb', line 71 def self.tick(poll_timeout_ms) # Reclaim trailing dead slots. Without this, the parallel # arrays grow once per accepted connection and never shrink -- # a slow leak and per-tick iteration tax in a long-running # Scheduled server. Tail-only (stop at first alive) is # deliberate: it keeps every surviving slot's index stable, # so external captures of sched_current held across Fiber.yield # (e.g. pg.rb's PG::Pool @waiter_idxs) stay valid. Middle # dead slots aren't reclaimed until the tail catches up; for # FIFO request lifecycles that's the common case. i = Tep::APP.sched_fibers.length - 1 while i >= 0 && !Tep::APP.sched_fibers[i].f.alive? Tep::APP.sched_fibers.delete_at(i) Tep::APP.sched_wake_at.delete_at(i) Tep::APP.sched_io_fd.delete_at(i) Tep::APP.sched_io_mode.delete_at(i) Tep::APP.sched_io_ready.delete_at(i) i -= 1 end ms = poll_timeout_ms if Scheduler.any_time_ready ms = 0 end Scheduler.poll_round(ms) now = Time.now.to_i best = -1 i = 0 n = Tep::APP.sched_fibers.length while i < n if Tep::APP.sched_fibers[i].f.alive? && Tep::APP.sched_wake_at[i] <= now if best < 0 || Tep::APP.sched_wake_at[i] < Tep::APP.sched_wake_at[best] best = i end end i += 1 end if best < 0 return false end Tep::APP.sched_current = best Tep::APP.sched_wake_at[best] = -1 Tep::APP.sched_fibers[best].f.resume Tep::APP.sched_current = -1 true end |