Class: Tep::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/tep/scheduler.rb

Constant Summary collapse

READ =

Mode bits for io_wait. Mirror sphttp’s wire encoding so the C side and Ruby side stay aligned.

1
WRITE =
2

Class Method Summary collapse

Class Method Details

.alive_countObject



320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/tep/scheduler.rb', line 320

def self.alive_count
  n = 0
  i = 0
  total = Tep::APP.sched_fibers.length
  while i < total
    if Tep::APP.sched_fibers[i].f.alive?
      n += 1
    end
    i += 1
  end
  n
end

.any_io_waiterObject



226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/tep/scheduler.rb', line 226

def self.any_io_waiter
  i = 0
  n = Tep::APP.sched_fibers.length
  while i < n
    if Tep::APP.sched_fibers[i].f.alive? &&
       Tep::APP.sched_io_fd[i] >= 0 &&
       Tep::APP.sched_io_ready[i] == 0
      return true
    end
    i += 1
  end
  false
end

.any_time_readyObject

Is any alive fiber’s wake_at already <= now? Used by tick() to decide whether poll() can block: if anyone is time-due, the poll timeout collapses to 0 (non-blocking peek) so we don’t waste wall time idling when there’s runnable work.



244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/tep/scheduler.rb', line 244

def self.any_time_ready
  now = Time.now.to_i
  i = 0
  n = Tep::APP.sched_fibers.length
  while i < n
    if Tep::APP.sched_fibers[i].f.alive? && Tep::APP.sched_wake_at[i] <= now
      return true
    end
    i += 1
  end
  false
end

.clearObject

Reset the schedulable set. Useful between worker-loop iterations or between tests.



309
310
311
312
313
314
315
316
317
318
# File 'lib/tep/scheduler.rb', line 309

def self.clear
  while Tep::APP.sched_fibers.length > 0
    Tep::APP.sched_fibers.delete_at(0)
    Tep::APP.sched_wake_at.delete_at(0)
    Tep::APP.sched_io_fd.delete_at(0)
    Tep::APP.sched_io_mode.delete_at(0)
    Tep::APP.sched_io_ready.delete_at(0)
  end
  0
end

.io_wait(fd, mode, timeout_seconds) ⇒ Object

Park the current fiber until ‘fd` is ready for the given `mode` bits (1=READ, 2=WRITE, 3=both) OR `timeout_seconds` elapses. Returns the observed-ready bits (0 on timeout). When called from outside a fiber, falls back to a single poll() call so the same code works at top level.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/tep/scheduler.rb', line 280

def self.io_wait(fd, mode, timeout_seconds)
  idx = Tep::APP.sched_current
  if idx < 0
    # No fiber context -- single-shot poll inline.
    Sock.sphttp_poll_reset
    slot = Sock.sphttp_poll_add(fd, mode)
    Sock.sphttp_poll_run(timeout_seconds * 1000)
    return Sock.sphttp_poll_ready(slot)
  end
  Tep::APP.sched_io_fd[idx]    = fd
  Tep::APP.sched_io_mode[idx]  = mode
  Tep::APP.sched_io_ready[idx] = 0
  if timeout_seconds < 0
    # "Wait forever for I/O": -1 would mean "ready now" to the
    # tick picker, so use a far-future wake_at as the sentinel.
    Tep::APP.sched_wake_at[idx] = Time.now.to_i + 86400
  else
    Tep::APP.sched_wake_at[idx] = Time.now.to_i + timeout_seconds
  end
  Fiber.yield
  ready = Tep::APP.sched_io_ready[idx]
  Tep::APP.sched_io_fd[idx]    = -1
  Tep::APP.sched_io_mode[idx]  = 0
  Tep::APP.sched_io_ready[idx] = 0
  ready
end

.next_wakeObject



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/tep/scheduler.rb', line 208

def self.next_wake
  best = -1
  i = 0
  n = Tep::APP.sched_fibers.length
  while i < n
    if Tep::APP.sched_fibers[i].f.alive?
      if best < 0 || Tep::APP.sched_wake_at[i] < Tep::APP.sched_wake_at[best]
        best = i
      end
    end
    i += 1
  end
  if best < 0
    return -1
  end
  Tep::APP.sched_wake_at[best]
end

.pause(seconds) ⇒ Object

Called from within a fiber’s body to suspend until at-or- after ‘seconds` from now. Named `pause` rather than `sleep` to keep the semantics distinct from `Kernel#sleep`: this is a fiber-aware yield that returns the cooperative scheduler to the dispatch loop, not an OS-level sleep. Outside a fiber it falls through to bare `sleep(seconds)`.



263
264
265
266
267
268
269
270
271
272
273
# File 'lib/tep/scheduler.rb', line 263

def self.pause(seconds)
  idx = Tep::APP.sched_current
  if idx < 0
    # Called from outside any fiber -- fall back to POSIX sleep.
    sleep(seconds)
    return 0
  end
  Tep::APP.sched_wake_at[idx] = Time.now.to_i + seconds
  Fiber.yield
  0
end

.poll_round(timeout_ms) ⇒ Object

Build poll set from parked-on-I/O fibers, call poll(2), and write observed-ready bits back into the parallel arrays. ‘timeout_ms` is the poll() timeout (-1 = block forever, 0 = non-blocking peek). Idempotent for an empty set.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/tep/scheduler.rb', line 123

def self.poll_round(timeout_ms)
  Sock.sphttp_poll_reset
  slots = [-1] # slot index parallel to sched_fibers; -1 = not polled
  slots.delete_at(0)
  added = 0
  i = 0
  n = Tep::APP.sched_fibers.length
  while i < n
    slot = -1
    if Tep::APP.sched_fibers[i].f.alive? &&
       Tep::APP.sched_io_fd[i] >= 0 &&
       Tep::APP.sched_io_ready[i] == 0
      slot = Sock.sphttp_poll_add(Tep::APP.sched_io_fd[i],
                                  Tep::APP.sched_io_mode[i])
      added += 1
    end
    slots.push(slot)
    i += 1
  end
  if added == 0
    return 0
  end
  Sock.sphttp_poll_run(timeout_ms)
  now = Time.now.to_i
  i = 0
  while i < n
    if slots[i] >= 0
      ready = Sock.sphttp_poll_ready(slots[i])
      if ready > 0
        Tep::APP.sched_io_ready[i] = ready
        Tep::APP.sched_wake_at[i]  = now
      end
    end
    i += 1
  end
  added
end

.run_for(seconds) ⇒ Object

Drain until ‘seconds` has elapsed OR every fiber’s done. Between empty passes, blocks in poll(2) (or sleep, if no I/O waits) until the next wake-up.



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/tep/scheduler.rb', line 176

def self.run_for(seconds)
  deadline = Time.now.to_i + seconds
  while Time.now.to_i < deadline
    if !Scheduler.tick(0)
      # Nothing ready this pass. Compute the next deadline:
      # min(next_wake, overall_deadline). If any fiber is
      # parked on I/O, block in poll() until that or the
      # timer hits.
      next_at = Scheduler.next_wake
      gap = deadline - Time.now.to_i
      if next_at >= 0
        tgap = next_at - Time.now.to_i
        if tgap < gap
          gap = tgap
        end
      end
      if gap < 0
        gap = 0
      end
      if Scheduler.any_io_waiter
        # Park in poll for up to `gap` seconds.
        Scheduler.poll_round(gap * 1000)
      elsif next_at < 0
        return 0
      elsif gap > 0
        sleep gap
      end
    end
  end
  0
end

.run_until_emptyObject

Drain. Resumes everything ready until the schedulable set is empty (every fiber finished or all are waiting for a future wake_at / I/O). Returns the number of resumes performed. Pure non-blocking; no poll() wait between passes.



165
166
167
168
169
170
171
# File 'lib/tep/scheduler.rb', line 165

def self.run_until_empty
  n = 0
  while Scheduler.tick(0)
    n += 1
  end
  n
end

.scheduled_context?Boolean

True iff a Tep::Scheduler-managed fiber is currently executing. Set by tick() right before f.resume and reset right after, so this is the canonical “am I in cooperative context?” check for callers that want to pick a blocking vs. fiber-yielding path (e.g. Tep::Http – see lib/tep/http.rb#send_req).

Returns:

  • (Boolean)


338
339
340
# File 'lib/tep/scheduler.rb', line 338

def self.scheduled_context?
  Tep::APP.sched_current >= 0
end

.spawn_fiber(f) ⇒ Object



48
49
50
51
52
53
54
55
# File 'lib/tep/scheduler.rb', line 48

def self.spawn_fiber(f)
  Tep::APP.sched_fibers.push(Tep::FiberSlot.new(f))
  Tep::APP.sched_wake_at.push(-1)
  Tep::APP.sched_io_fd.push(-1)
  Tep::APP.sched_io_mode.push(0)
  Tep::APP.sched_io_ready.push(0)
  f
end

.tick(poll_timeout_ms) ⇒ Object

One scheduler pass. If any fibers are parked on I/O, build a poll set, run poll(2) for up to ‘poll_timeout_ms`, and mark ready ones. Then resume the soonest-due fiber whose wake_at is <= now. Returns true if it resumed something.

If a fiber is already time-due (wake_at <= now – e.g. a newly spawned fiber with wake_at=-1, or a fiber that just called pause(0)), poll() must NOT block: we have runnable work and any wait is wasted wall time. This matters for the cooperative request path – when an outer handler parks on io_wait and the accept fiber spawns an inner connection-fiber, the next tick has a wake_at=-1 fiber ready; without this short-circuit each “hand off to the freshly-spawned fiber” step costs a full poll-timeout’s worth of latency.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/tep/scheduler.rb', line 71

def self.tick(poll_timeout_ms)
  # Reclaim trailing dead slots. Without this, the parallel
  # arrays grow once per accepted connection and never shrink --
  # a slow leak and per-tick iteration tax in a long-running
  # Scheduled server. Tail-only (stop at first alive) is
  # deliberate: it keeps every surviving slot's index stable,
  # so external captures of sched_current held across Fiber.yield
  # (e.g. pg.rb's PG::Pool @waiter_idxs) stay valid. Middle
  # dead slots aren't reclaimed until the tail catches up; for
  # FIFO request lifecycles that's the common case.
  i = Tep::APP.sched_fibers.length - 1
  while i >= 0 && !Tep::APP.sched_fibers[i].f.alive?
    Tep::APP.sched_fibers.delete_at(i)
    Tep::APP.sched_wake_at.delete_at(i)
    Tep::APP.sched_io_fd.delete_at(i)
    Tep::APP.sched_io_mode.delete_at(i)
    Tep::APP.sched_io_ready.delete_at(i)
    i -= 1
  end

  ms = poll_timeout_ms
  if Scheduler.any_time_ready
    ms = 0
  end
  Scheduler.poll_round(ms)

  now  = Time.now.to_i
  best = -1
  i = 0
  n = Tep::APP.sched_fibers.length
  while i < n
    if Tep::APP.sched_fibers[i].f.alive? && Tep::APP.sched_wake_at[i] <= now
      if best < 0 || Tep::APP.sched_wake_at[i] < Tep::APP.sched_wake_at[best]
        best = i
      end
    end
    i += 1
  end
  if best < 0
    return false
  end
  Tep::APP.sched_current = best
  Tep::APP.sched_wake_at[best] = -1
  Tep::APP.sched_fibers[best].f.resume
  Tep::APP.sched_current = -1
  true
end