Class: Core::Async::Scheduler
- Inherits:
-
Object
- Object
- Core::Async::Scheduler
- Extended by:
- Forwardable
- Includes:
- Is::Inspectable
- Defined in:
- lib/core/async/scheduler.rb
Overview
- public
-
The fiber scheduler.
Direct Known Subclasses
Instance Method Summary collapse
-
#block(blocker, timeout = nil) ⇒ Object
Scheduler Interface.
-
#cancel(fiber = ) ⇒ Object
- public
-
Cancel the given fiber.
- #close ⇒ Object
- #fiber(&block) ⇒ Object
-
#initialize ⇒ Scheduler
constructor
A new instance of Scheduler.
- #io_wait(io, events, timeout = nil) ⇒ Object
- #kernel_sleep(duration = nil) ⇒ Object
-
#observe ⇒ Object
- public
-
Run until there's no more work to do, yielding before each tick.
- #process_wait ⇒ Object
-
#run ⇒ Object
- public
-
Run until there's no more work to do.
-
#running? ⇒ Boolean
- public
-
Returns `true` if the scheduler is running.
-
#stop ⇒ Object
- public
-
Stop the scheduler, canceling each scheduled fiber.
-
#tick ⇒ Object
- public
-
Run one tick.
-
#timeout(seconds) ⇒ Object
- public
-
Yields to the given block, timing out after the given seconds.
- #unblock(blocker, fiber) ⇒ Object
Constructor Details
#initialize ⇒ Scheduler
Returns a new instance of Scheduler.
39 40 41 42 43 44 45 |
# File 'lib/core/async/scheduler.rb', line 39 def initialize @selector = IO::Event::Selector.new(Fiber.current) @timers = Timers::Group.new @closed = false @running = false @fibers = {} end |
Instance Method Details
#block(blocker, timeout = nil) ⇒ Object
Scheduler Interface
143 144 145 146 147 148 149 150 151 |
# File 'lib/core/async/scheduler.rb', line 143 def block(blocker, timeout = nil) if timeout timer = create_timer(timeout) end @selector.transfer ensure timer&.cancel end |
#cancel(fiber = ) ⇒ Object
- public
-
Cancel the given fiber.
117 118 119 |
# File 'lib/core/async/scheduler.rb', line 117 def cancel(fiber = Thread.current[:__corerb_async_current_fiber__]) self.raise(fiber, Cancel) if fiber end |
#close ⇒ Object
153 154 155 156 157 158 159 160 161 |
# File 'lib/core/async/scheduler.rb', line 153 def close run ensure unless @closed @closed = true Fiber.set_scheduler(nil) end end |
#fiber(&block) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/core/async/scheduler.rb', line 163 def fiber(&block) filament = Filament.new fiber = Fiber.new(blocking: false) { begin Thread.current[:__corerb_async_current_fiber__] = filament filament.object = fiber result = block.call(filament) rescue Cancel result = nil rescue => error result = Failure.new(error) ensure @fibers.delete(fiber) filament.resolve(result) end } @fibers[fiber] = filament if (parent = @fibers[Fiber.current]) parent.add_child(filament) end @selector.resume(fiber) fiber end |
#io_wait(io, events, timeout = nil) ⇒ Object
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/core/async/scheduler.rb', line 192 def io_wait(io, events, timeout = nil) fiber = Fiber.current if timeout timer = @timers.after(timeout) { fiber.raise(Timeout) } end @selector.io_wait(fiber, io, events) rescue Timeout false ensure timer&.cancel end |
#kernel_sleep(duration = nil) ⇒ Object
208 209 210 211 212 213 214 215 216 |
# File 'lib/core/async/scheduler.rb', line 208 def kernel_sleep(duration = nil) if duration timer = create_timer(duration) end @selector.transfer ensure timer&.cancel end |
#observe ⇒ Object
- public
-
Run until there's no more work to do, yielding before each tick.
67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/core/async/scheduler.rb', line 67 def observe @running = true result = nil until result == :finished yield self result = tick end ensure @running = false end |
#process_wait ⇒ Object
218 219 220 |
# File 'lib/core/async/scheduler.rb', line 218 def process_wait(...) @selector.process_wait(Fiber.current, ...) end |
#run ⇒ Object
- public
-
Run until there's no more work to do.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/core/async/scheduler.rb', line 49 def run @running = true yield self if block_given? Thread.handle_interrupt(Interrupt => :never) do result = nil until Thread.pending_interrupt? || result == :finished result = tick end end ensure @running = false end |
#running? ⇒ Boolean
- public
-
Returns `true` if the scheduler is running.
81 82 83 |
# File 'lib/core/async/scheduler.rb', line 81 def running? @running == true end |
#stop ⇒ Object
- public
-
Stop the scheduler, canceling each scheduled fiber.
107 108 109 110 111 112 113 |
# File 'lib/core/async/scheduler.rb', line 107 def stop signal = Cancel.new @fibers.each_key do |fiber| @selector.raise(fiber, signal) end end |
#tick ⇒ Object
- public
-
Run one tick.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/core/async/scheduler.rb', line 87 def tick interval = @timers.wait_interval if interval.nil? if @fibers.empty? return :finished end elsif interval < 0 interval = 0 end @selector.select(interval) @timers.fire rescue Errno::EINTR # noop end |
#timeout(seconds) ⇒ Object
- public
-
Yields to the given block, timing out after the given seconds.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/core/async/scheduler.rb', line 123 def timeout(seconds) if seconds && seconds > 0 fiber = Fiber.current timer = @timers.after(seconds) { if fiber.alive? fiber.raise(Timeout) end } end yield ensure timer&.cancel end |
#unblock(blocker, fiber) ⇒ Object
222 223 224 |
# File 'lib/core/async/scheduler.rb', line 222 def unblock(blocker, fiber) @selector.push(fiber) end |