Class: Itsi::Scheduler
- Inherits:
-
Object
show all
- Defined in:
- lib/itsi/scheduler.rb,
lib/itsi/scheduler/version.rb,
lib/itsi/scheduler/native_extension.rb
Defined Under Namespace
Modules: NativeExtension
Classes: Error
Constant Summary
collapse
- VERSION =
"0.2.23"
Class Method Summary
collapse
Instance Method Summary
collapse
-
#block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token) ⇒ Object
-
#close ⇒ Object
Hook invoked at the end of the thread.
-
#closed? ⇒ Boolean
-
#fiber(&blk) ⇒ Object
Spin up a new fiber and immediately resume it.
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#io_wait(io, events, duration) ⇒ Object
-
#kernel_sleep(duration) ⇒ Object
-
#process_wait(pid, flags) ⇒ Object
Need to defer to Process::Status rather than our extension as we don’t have a means of creating our own Process::Status.
-
#resume_blocked(fiber) ⇒ Object
-
#resume_fiber(token) ⇒ Object
-
#resume_fiber_with_readiness(token, readiness) ⇒ Object
-
#run ⇒ Object
Run until no more work needs doing.
-
#switch_unblock_batch ⇒ Object
-
#tick ⇒ Object
-
#unblock(_blocker, fiber) ⇒ Object
-
#work? ⇒ Boolean
Keep running until we’ve got no timers we’re awaiting, no pending IO, no temporary yields, no pending unblocks.
-
#yield ⇒ Object
Yields upwards to the scheduler, with an intention to resume the fiber that yielded ASAP.
Constructor Details
Returns a new instance of Scheduler.
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/itsi/scheduler.rb', line 16
def initialize
@join_waiters = {}.compare_by_identity
@token_map = {}.compare_by_identity
@resume_tokens = {}.compare_by_identity
@unblocked = [[], []]
@unblock_idx = 0
@unblocked_mux = Mutex.new
@resume_fiber = method(:resume_fiber).to_proc
@resume_fiber_with_readiness = method(:resume_fiber_with_readiness).to_proc
@resume_blocked = method(:resume_blocked).to_proc
end
|
Class Method Details
.resume_token ⇒ Object
11
12
13
14
|
# File 'lib/itsi/scheduler.rb', line 11
def self.resume_token
@resume_token ||= 0
@resume_token += 1
end
|
Instance Method Details
#block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token) ⇒ Object
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/itsi/scheduler.rb', line 28
def block(_, timeout, fiber = Fiber.current, token = Scheduler.resume_token)
@join_waiters[fiber] = true
start_timer(timeout, token) if timeout
@resume_tokens[token] = fiber
@token_map[fiber] = token
Fiber.yield
ensure
@resume_tokens.delete(token)
@token_map.delete(fiber)
@join_waiters.delete(fiber)
end
|
#close ⇒ Object
Hook invoked at the end of the thread. Will start our scheduler’s Reactor.
126
127
128
129
130
131
|
# File 'lib/itsi/scheduler.rb', line 126
def close
run
ensure
@closed ||= true
freeze
end
|
#closed? ⇒ Boolean
144
145
146
|
# File 'lib/itsi/scheduler.rb', line 144
def closed?
@closed
end
|
#fiber(&blk) ⇒ Object
Spin up a new fiber and immediately resume it.
149
150
151
|
# File 'lib/itsi/scheduler.rb', line 149
def fiber(&blk)
Fiber.new(blocking: false, &blk).tap(&:resume)
end
|
#io_wait(io, events, duration) ⇒ Object
Register an IO waiter. This will get resumed by our scheduler inside the call to fetch_events.
44
45
46
47
48
49
50
51
|
# File 'lib/itsi/scheduler.rb', line 44
def io_wait(io, events, duration)
fiber = Fiber.current
token = Scheduler.resume_token
readiness = register_io_wait(io.fileno, events, duration, token)
readiness ||= block(nil, duration, fiber, token)
clear_timer(token)
readiness
end
|
#kernel_sleep(duration) ⇒ Object
60
61
62
|
# File 'lib/itsi/scheduler.rb', line 60
def kernel_sleep(duration)
block nil, duration
end
|
#process_wait(pid, flags) ⇒ Object
Need to defer to Process::Status rather than our extension as we don’t have a means of creating our own Process::Status.
135
136
137
138
139
140
141
142
|
# File 'lib/itsi/scheduler.rb', line 135
def process_wait(pid, flags)
result = nil
thread = Thread.new do
result = Process::Status.wait(pid, flags)
end
thread.join
result
end
|
#resume_blocked(fiber) ⇒ Object
90
91
92
93
94
95
96
|
# File 'lib/itsi/scheduler.rb', line 90
def resume_blocked(fiber)
if (token = @token_map[fiber])
resume_fiber(token)
elsif fiber.alive?
fiber.resume
end
end
|
#resume_fiber(token) ⇒ Object
74
75
76
77
78
79
80
|
# File 'lib/itsi/scheduler.rb', line 74
def resume_fiber(token)
if (fiber = @resume_tokens.delete(token))
fiber.resume
end
rescue StandardError => e
warn "Fiber #{fiber} terminated on exception: #{e.message}"
end
|
#resume_fiber_with_readiness(token, readiness) ⇒ Object
82
83
84
85
86
87
88
|
# File 'lib/itsi/scheduler.rb', line 82
def resume_fiber_with_readiness((token, readiness))
if (fiber = @resume_tokens.delete(token))
fiber.resume(readiness)
end
rescue StandardError => e
warn "Fiber #{fiber} terminated on exception: #{e.message}"
end
|
#run ⇒ Object
Run until no more work needs doing.
119
120
121
122
|
# File 'lib/itsi/scheduler.rb', line 119
def run
tick while work?
debug "Exit Scheduler"
end
|
#switch_unblock_batch ⇒ Object
98
99
100
101
102
103
104
|
# File 'lib/itsi/scheduler.rb', line 98
def switch_unblock_batch
@unblocked_mux.synchronize do
current = @unblocked[@unblock_idx]
@unblock_idx = (@unblock_idx + 1) % 2
current
end
end
|
#tick ⇒ Object
64
65
66
67
68
69
70
71
72
|
# File 'lib/itsi/scheduler.rb', line 64
def tick
events = fetch_due_events
timers = fetch_due_timers
unblocked = switch_unblock_batch
events&.each(&@resume_fiber_with_readiness)
unblocked.each(&@resume_blocked)
unblocked.clear
timers&.each(&@resume_fiber)
end
|
#unblock(_blocker, fiber) ⇒ Object
53
54
55
56
57
58
|
# File 'lib/itsi/scheduler.rb', line 53
def unblock(_blocker, fiber)
@unblocked_mux.synchronize do
@unblocked[@unblock_idx] << fiber
end
wake
end
|
#work? ⇒ Boolean
Keep running until we’ve got no timers we’re awaiting, no pending IO, no temporary yields, no pending unblocks.
114
115
116
|
# File 'lib/itsi/scheduler.rb', line 114
def work?
!@unblocked[@unblock_idx].empty? || !@join_waiters.empty? || has_pending_io?
end
|
#yield ⇒ Object
Yields upwards to the scheduler, with an intention to resume the fiber that yielded ASAP.
108
109
110
|
# File 'lib/itsi/scheduler.rb', line 108
def yield
kernel_sleep(0) if work?
end
|