Class: Itsi::Scheduler
- Inherits:
-
Object
show all
- Defined in:
- lib/itsi/scheduler.rb,
lib/itsi/scheduler/version.rb,
lib/itsi/scheduler/native_extension.rb
Defined Under Namespace
Modules: NativeExtension
Classes: Error, WorkRequest
Constant Summary
collapse
- VERSION =
"0.2.27.rc1"
Class Method Summary
collapse
Instance Method Summary
collapse
-
#address_resolve(hostname) ⇒ Object
-
#block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token) ⇒ Object
-
#blocking_operation_wait(work) ⇒ Object
-
#close ⇒ Object
Hook invoked at the end of the thread.
-
#closed? ⇒ Boolean
-
#fiber(&blk) ⇒ Object
Spin up a new fiber and immediately resume it.
-
#fiber_interrupt(fiber, exception) ⇒ Object
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#io_select(readables, writables, exceptables, timeout) ⇒ Object
-
#io_wait(io, events, duration) ⇒ Object
-
#kernel_sleep(duration) ⇒ Object
-
#process_fork ⇒ Object
-
#process_wait(pid, flags) ⇒ Object
Need to defer to Process::Status rather than our extension as we don’t have a means of creating our own Process::Status.
-
#resume_blocked(fiber) ⇒ Object
-
#resume_fiber(token) ⇒ Object
-
#resume_fiber_with_readiness(token, readiness) ⇒ Object
-
#run ⇒ Object
Run until no more work needs doing.
-
#switch_unblock_batch ⇒ Object
-
#tick ⇒ Object
-
#timeout_after(duration, klass = Timeout::Error, message = "execution expired") ⇒ Object
-
#unblock(_blocker, fiber) ⇒ Object
-
#work? ⇒ Boolean
Keep running until we’ve got no timers we’re awaiting, no pending IO, no temporary yields, no pending unblocks.
-
#yield ⇒ Object
Yields upwards to the scheduler, with an intention to resume the fiber that yielded ASAP.
Constructor Details
Returns a new instance of Scheduler.
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/itsi/scheduler.rb', line 19
def initialize
@join_waiters = {}.compare_by_identity
@token_map = {}.compare_by_identity
@resume_tokens = {}.compare_by_identity
@timeout_requests = {}
@unblocked = [[], []]
@unblock_idx = 0
@unblocked_mux = Mutex.new
@resume_fiber = method(:resume_fiber).to_proc
@resume_fiber_with_readiness = method(:resume_fiber_with_readiness).to_proc
@resume_blocked = method(:resume_blocked).to_proc
setup_worker_pool
end
|
Class Method Details
.resume_token ⇒ Object
14
15
16
17
|
# File 'lib/itsi/scheduler.rb', line 14
def self.resume_token
@resume_token ||= 0
@resume_token += 1
end
|
Instance Method Details
#address_resolve(hostname) ⇒ Object
206
207
208
|
# File 'lib/itsi/scheduler.rb', line 206
def address_resolve(hostname)
blocking_operation_wait(-> { native_address_resolve(hostname) })
end
|
#block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token) ⇒ Object
33
34
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/itsi/scheduler.rb', line 33
def block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token)
@join_waiters[fiber] = true
start_timer(timeout, token) if timeout
@resume_tokens[token] = fiber
@token_map[fiber] = token
Fiber.yield
ensure
cancel_wait(token)
@resume_tokens.delete(token)
@token_map.delete(fiber)
@join_waiters.delete(fiber)
end
|
#blocking_operation_wait(work) ⇒ Object
90
91
92
93
94
95
96
97
|
# File 'lib/itsi/scheduler.rb', line 90
def blocking_operation_wait(work)
request = WorkRequest.new(fiber: Fiber.current, work: work)
@worker_queue << request
block(nil, nil, request.fiber)
raise request.error if request.error
request.result
end
|
#close ⇒ Object
Hook invoked at the end of the thread. Will start our scheduler’s Reactor.
192
193
194
195
196
197
198
|
# File 'lib/itsi/scheduler.rb', line 192
def close
run
ensure
shutdown_worker_pool
@closed ||= true
freeze
end
|
#closed? ⇒ Boolean
216
217
218
|
# File 'lib/itsi/scheduler.rb', line 216
def closed?
@closed
end
|
#fiber(&blk) ⇒ Object
Spin up a new fiber and immediately resume it.
221
222
223
|
# File 'lib/itsi/scheduler.rb', line 221
def fiber(&blk)
Fiber.new(blocking: false, &blk).tap(&:resume)
end
|
#fiber_interrupt(fiber, exception) ⇒ Object
82
83
84
85
86
87
88
|
# File 'lib/itsi/scheduler.rb', line 82
def fiber_interrupt(fiber, exception)
cancel_wait(@token_map[fiber]) if @token_map.key?(fiber)
fiber.raise(exception)
true
rescue FiberError
false
end
|
#io_select(readables, writables, exceptables, timeout) ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/itsi/scheduler.rb', line 99
def io_select(readables, writables, exceptables, timeout)
readables = Array(readables).compact
writables = Array(writables).compact
exceptables = Array(exceptables).compact
ios = (readables + writables + exceptables).uniq
if ios.length == 1
io = ios.first
events = 0
events |= IO::READABLE if readables.include?(io)
events |= IO::WRITABLE if writables.include?(io)
events |= IO::PRIORITY if exceptables.include?(io)
readiness = io_wait(io, events, timeout)
return nil unless readiness
return [
(readiness & IO::READABLE).zero? ? [] : readables.select { |entry| entry == io },
(readiness & IO::WRITABLE).zero? ? [] : writables.select { |entry| entry == io },
(readiness & IO::PRIORITY).zero? ? [] : exceptables.select { |entry| entry == io }
]
end
blocking_operation_wait(-> { IO.select(readables, writables, exceptables, timeout) })
end
|
#io_wait(io, events, duration) ⇒ Object
Register an IO waiter. This will get resumed by our scheduler inside the call to fetch_events.
50
51
52
53
54
55
56
57
|
# File 'lib/itsi/scheduler.rb', line 50
def io_wait(io, events, duration)
fiber = Fiber.current
token = Scheduler.resume_token
readiness = register_io_wait(io.fileno, events, duration, token)
readiness ||= block(nil, duration, fiber, token)
clear_timer(token)
readiness
end
|
#kernel_sleep(duration) ⇒ Object
66
67
68
|
# File 'lib/itsi/scheduler.rb', line 66
def kernel_sleep(duration)
block nil, duration
end
|
#process_fork ⇒ Object
210
211
212
213
214
|
# File 'lib/itsi/scheduler.rb', line 210
def process_fork
shutdown_worker_pool
setup_worker_pool
nil
end
|
#process_wait(pid, flags) ⇒ Object
Need to defer to Process::Status rather than our extension as we don’t have a means of creating our own Process::Status.
202
203
204
|
# File 'lib/itsi/scheduler.rb', line 202
def process_wait(pid, flags)
blocking_operation_wait(-> { Process::Status.wait(pid, flags) })
end
|
#resume_blocked(fiber) ⇒ Object
156
157
158
159
160
161
162
|
# File 'lib/itsi/scheduler.rb', line 156
def resume_blocked(fiber)
if (token = @token_map[fiber])
resume_fiber(token)
elsif fiber.alive?
fiber.resume
end
end
|
#resume_fiber(token) ⇒ Object
134
135
136
137
138
139
140
141
142
143
144
145
146
|
# File 'lib/itsi/scheduler.rb', line 134
def resume_fiber(token)
if (request = @timeout_requests.delete(token))
fiber, exception = request
fiber_interrupt(fiber, exception)
return
end
if (fiber = @resume_tokens.delete(token))
fiber.resume
end
rescue StandardError => e
warn "Fiber #{fiber} terminated on exception: #{e.message}"
end
|
#resume_fiber_with_readiness(token, readiness) ⇒ Object
148
149
150
151
152
153
154
|
# File 'lib/itsi/scheduler.rb', line 148
def resume_fiber_with_readiness((token, readiness))
if (fiber = @resume_tokens.delete(token))
fiber.resume(readiness)
end
rescue StandardError => e
warn "Fiber #{fiber} terminated on exception: #{e.message}"
end
|
#run ⇒ Object
Run until no more work needs doing.
185
186
187
188
|
# File 'lib/itsi/scheduler.rb', line 185
def run
tick while work?
debug "Exit Scheduler"
end
|
#switch_unblock_batch ⇒ Object
164
165
166
167
168
169
170
|
# File 'lib/itsi/scheduler.rb', line 164
def switch_unblock_batch
@unblocked_mux.synchronize do
current = @unblocked[@unblock_idx]
@unblock_idx = (@unblock_idx + 1) % 2
current
end
end
|
#tick ⇒ Object
124
125
126
127
128
129
130
131
132
|
# File 'lib/itsi/scheduler.rb', line 124
def tick
events = fetch_due_events
timers = fetch_due_timers
unblocked = switch_unblock_batch
events&.each(&@resume_fiber_with_readiness)
unblocked.each(&@resume_blocked)
unblocked.clear
timers&.each(&@resume_fiber)
end
|
#timeout_after(duration, klass = Timeout::Error, message = "execution expired") ⇒ Object
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/itsi/scheduler.rb', line 70
def timeout_after(duration, klass = Timeout::Error, message = "execution expired")
fiber = Fiber.current
token = Scheduler.resume_token
exception = klass.is_a?(Class) ? klass.new(message) : klass
@timeout_requests[token] = [fiber, exception]
start_timer(duration, token)
yield duration
ensure
clear_timer(token) if token
@timeout_requests.delete(token) if token
end
|
#unblock(_blocker, fiber) ⇒ Object
59
60
61
62
63
64
|
# File 'lib/itsi/scheduler.rb', line 59
def unblock(_blocker, fiber)
@unblocked_mux.synchronize do
@unblocked[@unblock_idx] << fiber
end
wake
end
|
#work? ⇒ Boolean
Keep running until we’ve got no timers we’re awaiting, no pending IO, no temporary yields, no pending unblocks.
180
181
182
|
# File 'lib/itsi/scheduler.rb', line 180
def work?
!@unblocked[@unblock_idx].empty? || !@join_waiters.empty? || has_pending_io?
end
|
#yield ⇒ Object
Yields upwards to the scheduler, with an intention to resume the fiber that yielded ASAP.
174
175
176
|
# File 'lib/itsi/scheduler.rb', line 174
def yield
kernel_sleep(0) if work?
end
|