Class: Itsi::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/itsi/scheduler.rb,
lib/itsi/scheduler/version.rb,
lib/itsi/scheduler/native_extension.rb

Defined Under Namespace

Modules: NativeExtension Classes: Error, WorkRequest

Constant Summary collapse

VERSION =
"0.2.27.rc1"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeScheduler

Returns a new instance of Scheduler.



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/itsi/scheduler.rb', line 19

def initialize
  @join_waiters = {}.compare_by_identity
  @token_map = {}.compare_by_identity
  @resume_tokens = {}.compare_by_identity
  @timeout_requests = {}
  @unblocked = [[], []]
  @unblock_idx = 0
  @unblocked_mux = Mutex.new
  @resume_fiber = method(:resume_fiber).to_proc
  @resume_fiber_with_readiness = method(:resume_fiber_with_readiness).to_proc
  @resume_blocked = method(:resume_blocked).to_proc
  setup_worker_pool
end

Class Method Details

.resume_tokenObject



14
15
16
17
# File 'lib/itsi/scheduler.rb', line 14

def self.resume_token
  @resume_token ||= 0
  @resume_token += 1
end

Instance Method Details

#address_resolve(hostname) ⇒ Object



206
207
208
# File 'lib/itsi/scheduler.rb', line 206

def address_resolve(hostname)
  blocking_operation_wait(-> { native_address_resolve(hostname) })
end

#block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/itsi/scheduler.rb', line 33

def block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token)
  @join_waiters[fiber] = true

  start_timer(timeout, token) if timeout
  @resume_tokens[token] = fiber
  @token_map[fiber] = token
  Fiber.yield
ensure
  cancel_wait(token)
  @resume_tokens.delete(token)
  @token_map.delete(fiber)
  @join_waiters.delete(fiber)
end

#blocking_operation_wait(work) ⇒ Object



90
91
92
93
94
95
96
97
# File 'lib/itsi/scheduler.rb', line 90

def blocking_operation_wait(work)
  request = WorkRequest.new(fiber: Fiber.current, work: work)
  @worker_queue << request
  block(nil, nil, request.fiber)
  raise request.error if request.error

  request.result
end

#closeObject

Hook invoked at the end of the thread. Will start our scheduler’s Reactor.



192
193
194
195
196
197
198
# File 'lib/itsi/scheduler.rb', line 192

def close
  run
ensure
  shutdown_worker_pool
  @closed ||= true
  freeze
end

#closed?Boolean

Returns:

  • (Boolean)


216
217
218
# File 'lib/itsi/scheduler.rb', line 216

def closed?
  @closed
end

#fiber(&blk) ⇒ Object

Spin up a new fiber and immediately resume it.



221
222
223
# File 'lib/itsi/scheduler.rb', line 221

def fiber(&blk)
  Fiber.new(blocking: false, &blk).tap(&:resume)
end

#fiber_interrupt(fiber, exception) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/itsi/scheduler.rb', line 82

def fiber_interrupt(fiber, exception)
  cancel_wait(@token_map[fiber]) if @token_map.key?(fiber)
  fiber.raise(exception)
  true
rescue FiberError
  false
end

#io_select(readables, writables, exceptables, timeout) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/itsi/scheduler.rb', line 99

def io_select(readables, writables, exceptables, timeout)
  readables = Array(readables).compact
  writables = Array(writables).compact
  exceptables = Array(exceptables).compact
  ios = (readables + writables + exceptables).uniq

  if ios.length == 1
    io = ios.first
    events = 0
    events |= IO::READABLE if readables.include?(io)
    events |= IO::WRITABLE if writables.include?(io)
    events |= IO::PRIORITY if exceptables.include?(io)
    readiness = io_wait(io, events, timeout)
    return nil unless readiness

    return [
      (readiness & IO::READABLE).zero? ? [] : readables.select { |entry| entry == io },
      (readiness & IO::WRITABLE).zero? ? [] : writables.select { |entry| entry == io },
      (readiness & IO::PRIORITY).zero? ? [] : exceptables.select { |entry| entry == io }
    ]
  end

  blocking_operation_wait(-> { IO.select(readables, writables, exceptables, timeout) })
end

#io_wait(io, events, duration) ⇒ Object

Register an IO waiter. This will get resumed by our scheduler inside the call to fetch_events.



50
51
52
53
54
55
56
57
# File 'lib/itsi/scheduler.rb', line 50

def io_wait(io, events, duration)
  fiber = Fiber.current
  token = Scheduler.resume_token
  readiness = register_io_wait(io.fileno, events, duration, token)
  readiness ||= block(nil, duration, fiber, token)
  clear_timer(token)
  readiness
end

#kernel_sleep(duration) ⇒ Object



66
67
68
# File 'lib/itsi/scheduler.rb', line 66

def kernel_sleep(duration)
  block nil, duration
end

#process_forkObject



210
211
212
213
214
# File 'lib/itsi/scheduler.rb', line 210

def process_fork
  shutdown_worker_pool
  setup_worker_pool
  nil
end

#process_wait(pid, flags) ⇒ Object

Need to defer to Process::Status rather than our extension as we don’t have a means of creating our own Process::Status.



202
203
204
# File 'lib/itsi/scheduler.rb', line 202

def process_wait(pid, flags)
  blocking_operation_wait(-> { Process::Status.wait(pid, flags) })
end

#resume_blocked(fiber) ⇒ Object



156
157
158
159
160
161
162
# File 'lib/itsi/scheduler.rb', line 156

def resume_blocked(fiber)
  if (token = @token_map[fiber])
    resume_fiber(token)
  elsif fiber.alive?
    fiber.resume
  end
end

#resume_fiber(token) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/itsi/scheduler.rb', line 134

def resume_fiber(token)
  if (request = @timeout_requests.delete(token))
    fiber, exception = request
    fiber_interrupt(fiber, exception)
    return
  end

  if (fiber = @resume_tokens.delete(token))
    fiber.resume
  end
rescue StandardError => e
  warn "Fiber #{fiber} terminated on exception: #{e.message}"
end

#resume_fiber_with_readiness(token, readiness) ⇒ Object



148
149
150
151
152
153
154
# File 'lib/itsi/scheduler.rb', line 148

def resume_fiber_with_readiness((token, readiness))
  if (fiber = @resume_tokens.delete(token))
    fiber.resume(readiness)
  end
rescue StandardError => e
  warn "Fiber #{fiber} terminated on exception: #{e.message}"
end

#runObject

Run until no more work needs doing.



185
186
187
188
# File 'lib/itsi/scheduler.rb', line 185

def run
  tick while work?
  debug "Exit Scheduler"
end

#switch_unblock_batchObject



164
165
166
167
168
169
170
# File 'lib/itsi/scheduler.rb', line 164

def switch_unblock_batch
  @unblocked_mux.synchronize do
    current = @unblocked[@unblock_idx]
    @unblock_idx = (@unblock_idx + 1) % 2
    current
  end
end

#tickObject



124
125
126
127
128
129
130
131
132
# File 'lib/itsi/scheduler.rb', line 124

def tick
  events = fetch_due_events
  timers = fetch_due_timers
  unblocked = switch_unblock_batch
  events&.each(&@resume_fiber_with_readiness)
  unblocked.each(&@resume_blocked)
  unblocked.clear
  timers&.each(&@resume_fiber)
end

#timeout_after(duration, klass = Timeout::Error, message = "execution expired") ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/itsi/scheduler.rb', line 70

def timeout_after(duration, klass = Timeout::Error, message = "execution expired")
  fiber = Fiber.current
  token = Scheduler.resume_token
  exception = klass.is_a?(Class) ? klass.new(message) : klass
  @timeout_requests[token] = [fiber, exception]
  start_timer(duration, token)
  yield duration
ensure
  clear_timer(token) if token
  @timeout_requests.delete(token) if token
end

#unblock(_blocker, fiber) ⇒ Object



59
60
61
62
63
64
# File 'lib/itsi/scheduler.rb', line 59

def unblock(_blocker, fiber)
  @unblocked_mux.synchronize do
    @unblocked[@unblock_idx] << fiber
  end
  wake
end

#work?Boolean

Keep running until we’ve got no timers we’re awaiting, no pending IO, no temporary yields, no pending unblocks.

Returns:

  • (Boolean)


180
181
182
# File 'lib/itsi/scheduler.rb', line 180

def work?
  !@unblocked[@unblock_idx].empty? || !@join_waiters.empty? || has_pending_io?
end

#yieldObject

Yields upwards to the scheduler, with an intention to resume the fiber that yielded ASAP.



174
175
176
# File 'lib/itsi/scheduler.rb', line 174

def yield
  kernel_sleep(0) if work?
end