Class: Async::Scheduler
Overview
Handles scheduling of fibers. Implements the fiber scheduler interface.
Direct Known Subclasses
Defined Under Namespace
Classes: ClosedError
Instance Attribute Summary
Attributes inherited from Node
#annotation, #children, #head, #parent, #tail
Class Method Summary collapse
-
.supported? ⇒ Boolean
Whether the fiber scheduler is supported.
Instance Method Summary collapse
- #address_resolve(hostname) ⇒ Object
-
#async(*arguments, **options, &block) ⇒ Object
deprecated
Deprecated.
With no replacement.
-
#block(blocker, timeout) ⇒ Object
Invoked when a fiber tries to perform a blocking operation which cannot continue.
- #close ⇒ Object
- #closed? ⇒ Boolean
- #fiber ⇒ Object
-
#initialize(parent = nil, selector: nil) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#interrupt ⇒ Object
Interrupt the event loop and cause it to exit.
- #io_read(io, buffer, length, offset = 0) ⇒ Object
- #io_wait(io, events, timeout = nil) ⇒ Object
- #io_write(io, buffer, length, offset = 0) ⇒ Object
- #kernel_sleep(duration = nil) ⇒ Object
-
#load ⇒ Object
Compute the scheduler load according to the busy and idle times that are updated by the run loop.
-
#process_wait(pid, flags) ⇒ Object
Wait for the specified process ID to exit.
-
#push(fiber) ⇒ Object
Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
- #raise(*arguments) ⇒ Object
- #resume(fiber, *arguments) ⇒ Object
-
#run ⇒ Object
Run the reactor until all tasks are finished.
-
#run_once(timeout = nil) ⇒ Object
Run one iteration of the event loop.
- #scheduler_close ⇒ Object
-
#terminate ⇒ Object
Terminate the scheduler.
- #timeout_after(duration, exception, message, &block) ⇒ Object
- #to_s ⇒ Object
-
#transfer ⇒ Object
Transfer from the calling fiber to the event loop.
- #unblock(blocker, fiber) ⇒ Object
-
#with_timeout(duration, exception = TimeoutError, message = "execution expired", &block) ⇒ Object
Invoke the block, but after the specified timeout, raise TimeoutError in any currenly blocking operation.
-
#yield ⇒ Object
Yield the current fiber and resume it on the next iteration of the event loop.
Methods inherited from Node
#The parent node.=, #annotate, #backtrace, #children?, #consume, #description, #finished?, #print_hierarchy, #root, #stop, #stopped?, #transient?, #traverse
Constructor Details
#initialize(parent = nil, selector: nil) ⇒ Scheduler
Returns a new instance of Scheduler.
31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/async/scheduler.rb', line 31 def initialize(parent = nil, selector: nil) super(parent) @selector = selector || ::IO::Event::Selector.new(Fiber.current) @interrupted = false @blocked = 0 @busy_time = 0.0 @idle_time = 0.0 @timers = ::IO::Event::Timers.new end |
Class Method Details
.supported? ⇒ Boolean
Whether the fiber scheduler is supported.
27 28 29 |
# File 'lib/async/scheduler.rb', line 27 def self.supported? true end |
Instance Method Details
#address_resolve(hostname) ⇒ Object
191 192 193 194 195 196 |
# File 'lib/async/scheduler.rb', line 191 def address_resolve(hostname) # On some platforms, hostnames may contain a device-specific suffix (e.g. %en0). We need to strip this before resolving. # See <https://github.com/socketry/async/issues/180> for more details. hostname = hostname.split("%", 2).first ::Resolv.getaddresses(hostname) end |
#async(*arguments, **options, &block) ⇒ Object
With no replacement.
Start an asynchronous task within the specified reactor. The task will be executed until the first blocking call, at which point it will yield and and this method will return.
This is the main entry point for scheduling asynchronus tasks.
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 |
# File 'lib/async/scheduler.rb', line 387 def async(*arguments, **, &block) Kernel::raise ClosedError if @selector.nil? task = Task.new(Task.current? || self, **, &block) # I want to take a moment to explain the logic of this. # When calling an async block, we deterministically execute it until the # first blocking operation. We don't *have* to do this - we could schedule # it for later execution, but it's useful to: # - Fail at the point of the method call where possible. # - Execute determinstically where possible. # - Avoid scheduler overhead if no blocking operation is performed. task.run(*arguments) # Console.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..." return task end |
#block(blocker, timeout) ⇒ Object
Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call #unblock must be performed to allow this fiber to continue.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/async/scheduler.rb', line 148 def block(blocker, timeout) # $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})" fiber = Fiber.current if timeout timer = @timers.after(timeout) do if fiber.alive? fiber.transfer(false) end end end begin @blocked += 1 @selector.transfer ensure @blocked -= 1 end ensure timer&.cancel! end |
#close ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/async/scheduler.rb', line 83 def close # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. until self.terminate self.run_once! end Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0 # We depend on GVL for consistency: # @guard.synchronize do # We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it. selector = @selector @selector = nil selector&.close # end consume end |
#closed? ⇒ Boolean
107 108 109 |
# File 'lib/async/scheduler.rb', line 107 def closed? @selector.nil? end |
#fiber ⇒ Object
405 406 407 |
# File 'lib/async/scheduler.rb', line 405 def fiber(...) return async(...).fiber end |
#interrupt ⇒ Object
Interrupt the event loop and cause it to exit.
117 118 119 120 |
# File 'lib/async/scheduler.rb', line 117 def interrupt @interrupted = true @selector&.wakeup end |
#io_read(io, buffer, length, offset = 0) ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/async/scheduler.rb', line 231 def io_read(io, buffer, length, offset = 0) fiber = Fiber.current if timeout = get_timeout(io) timer = @timers.after(timeout) do fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become readable!") end end @selector.io_read(fiber, io, buffer, length, offset) ensure timer&.cancel! end |
#io_wait(io, events, timeout = nil) ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/async/scheduler.rb', line 210 def io_wait(io, events, timeout = nil) fiber = Fiber.current if timeout # If an explicit timeout is specified, we expect that the user will handle it themselves: timer = @timers.after(timeout) do fiber.transfer end elsif timeout = get_timeout(io) # Otherwise, if we default to the io's timeout, we raise an exception: timer = @timers.after(timeout) do fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become ready!") end end return @selector.io_wait(fiber, io, events) ensure timer&.cancel! end |
#io_write(io, buffer, length, offset = 0) ⇒ Object
246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/async/scheduler.rb', line 246 def io_write(io, buffer, length, offset = 0) fiber = Fiber.current if timeout = get_timeout(io) timer = @timers.after(timeout) do fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become writable!") end end @selector.io_write(fiber, io, buffer, length, offset) ensure timer&.cancel! end |
#kernel_sleep(duration = nil) ⇒ Object
182 183 184 185 186 187 188 |
# File 'lib/async/scheduler.rb', line 182 def kernel_sleep(duration = nil) if duration self.block(nil, duration) else self.transfer end end |
#load ⇒ Object
Compute the scheduler load according to the busy and idle times that are updated by the run loop.
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/async/scheduler.rb', line 47 def load total_time = @busy_time + @idle_time # If the total time is zero, then the load is zero: return 0.0 if total_time.zero? # We normalize to a 1 second window: if total_time > 1.0 ratio = 1.0 / total_time @busy_time *= ratio @idle_time *= ratio # We don't need to divide here as we've already normalised it to a 1s window: return @busy_time else return @busy_time / total_time end end |
#process_wait(pid, flags) ⇒ Object
Wait for the specified process ID to exit.
267 268 269 |
# File 'lib/async/scheduler.rb', line 267 def process_wait(pid, flags) return @selector.process_wait(Fiber.current, pid, flags) end |
#push(fiber) ⇒ Object
Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.
134 135 136 |
# File 'lib/async/scheduler.rb', line 134 def push(fiber) @selector.push(fiber) end |
#raise(*arguments) ⇒ Object
138 139 140 |
# File 'lib/async/scheduler.rb', line 138 def raise(*arguments) @selector.raise(*arguments) end |
#resume(fiber, *arguments) ⇒ Object
142 143 144 |
# File 'lib/async/scheduler.rb', line 142 def resume(fiber, *arguments) @selector.resume(fiber, *arguments) end |
#run ⇒ Object
Run the reactor until all tasks are finished. Proxies arguments to #async immediately before entering the loop, if a block is provided.
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 |
# File 'lib/async/scheduler.rb', line 345 def run(...) Kernel::raise ClosedError if @selector.nil? initial_task = self.async(...) if block_given? interrupt = nil begin # In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this. Thread.handle_interrupt(::SignalException => :never) do while true # If we are interrupted, we need to exit: break if self.interrupted? # If we are finished, we need to exit: break unless self.run_once end end rescue Interrupt => interrupt Thread.handle_interrupt(::SignalException => :never) do self.stop end retry end # If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too. Kernel.raise interrupt if interrupt return initial_task ensure Console.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."} end |
#run_once(timeout = nil) ⇒ Object
Run one iteration of the event loop. Does not handle interrupts.
275 276 277 278 279 280 281 282 283 284 |
# File 'lib/async/scheduler.rb', line 275 def run_once(timeout = nil) Kernel::raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? # If we are finished, we stop the task tree and exit: if self.finished? return false end return run_once!(timeout) end |
#scheduler_close ⇒ Object
66 67 68 69 70 71 72 73 |
# File 'lib/async/scheduler.rb', line 66 def scheduler_close # If the execution context (thread) was handling an exception, we want to exit as quickly as possible: unless $! self.run end ensure self.close end |
#terminate ⇒ Object
Terminate the scheduler. We deliberately ignore interrupts here, as this code can be called from an interrupt, and we don’t want to be interrupted while cleaning up.
76 77 78 79 80 |
# File 'lib/async/scheduler.rb', line 76 def terminate Thread.handle_interrupt(::Interrupt => :never) do super end end |
#timeout_after(duration, exception, message, &block) ⇒ Object
425 426 427 428 429 |
# File 'lib/async/scheduler.rb', line 425 def timeout_after(duration, exception, , &block) with_timeout(duration, exception, ) do |timer| yield duration end end |
#to_s ⇒ Object
111 112 113 |
# File 'lib/async/scheduler.rb', line 111 def to_s "\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>" end |
#transfer ⇒ Object
Transfer from the calling fiber to the event loop.
123 124 125 |
# File 'lib/async/scheduler.rb', line 123 def transfer @selector.transfer end |
#unblock(blocker, fiber) ⇒ Object
171 172 173 174 175 176 177 178 179 |
# File 'lib/async/scheduler.rb', line 171 def unblock(blocker, fiber) # $stderr.puts "unblock(#{blocker}, #{fiber})" # This operation is protected by the GVL: if selector = @selector selector.push(fiber) selector.wakeup end end |
#with_timeout(duration, exception = TimeoutError, message = "execution expired", &block) ⇒ Object
Invoke the block, but after the specified timeout, raise TimeoutError in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
411 412 413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/async/scheduler.rb', line 411 def with_timeout(duration, exception = TimeoutError, = "execution expired", &block) fiber = Fiber.current timer = @timers.after(duration) do if fiber.alive? fiber.raise(exception, ) end end yield timer ensure timer&.cancel! end |
#yield ⇒ Object
Yield the current fiber and resume it on the next iteration of the event loop.
128 129 130 |
# File 'lib/async/scheduler.rb', line 128 def yield @selector.yield end |