Class: Async::Scheduler

Inherits:
Node
  • Object
show all
Defined in:
lib/async/scheduler.rb

Overview

Handles scheduling of fibers. Implements the fiber scheduler interface.

Direct Known Subclasses

Reactor

Defined Under Namespace

Classes: ClosedError

Instance Attribute Summary

Attributes inherited from Node

#annotation, #children, #head, #parent, #tail

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Node

#The parent node.=, #annotate, #backtrace, #children?, #consume, #description, #finished?, #print_hierarchy, #root, #stop, #stopped?, #transient?, #traverse

Constructor Details

#initialize(parent = nil, selector: nil) ⇒ Scheduler

Returns a new instance of Scheduler.



31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/async/scheduler.rb', line 31

def initialize(parent = nil, selector: nil)
	super(parent)
	
	@selector = selector || ::IO::Event::Selector.new(Fiber.current)
	@interrupted = false
	
	@blocked = 0
	
	@busy_time = 0.0
	@idle_time = 0.0
	
	@timers = ::IO::Event::Timers.new
end

Class Method Details

.supported?Boolean

Whether the fiber scheduler is supported.

Returns:

  • (Boolean)


27
28
29
# File 'lib/async/scheduler.rb', line 27

def self.supported?
	true
end

Instance Method Details

#address_resolve(hostname) ⇒ Object



191
192
193
194
195
196
# File 'lib/async/scheduler.rb', line 191

def address_resolve(hostname)
	# On some platforms, hostnames may contain a device-specific suffix (e.g. %en0). We need to strip this before resolving.
	# See <https://github.com/socketry/async/issues/180> for more details.
	hostname = hostname.split("%", 2).first
	::Resolv.getaddresses(hostname)
end

#async(*arguments, **options, &block) ⇒ Object

Deprecated.

With no replacement.

Start an asynchronous task within the specified reactor. The task will be executed until the first blocking call, at which point it will yield and and this method will return.

This is the main entry point for scheduling asynchronus tasks.



387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'lib/async/scheduler.rb', line 387

def async(*arguments, **options, &block)
	Kernel::raise ClosedError if @selector.nil?
	
	task = Task.new(Task.current? || self, **options, &block)
	
	# I want to take a moment to explain the logic of this.
	# When calling an async block, we deterministically execute it until the
	# first blocking operation. We don't *have* to do this - we could schedule
	# it for later execution, but it's useful to:
	# - Fail at the point of the method call where possible.
	# - Execute determinstically where possible.
	# - Avoid scheduler overhead if no blocking operation is performed.
	task.run(*arguments)
	
	# Console.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
	return task
end

#block(blocker, timeout) ⇒ Object

Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call #unblock must be performed to allow this fiber to continue.



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/async/scheduler.rb', line 148

def block(blocker, timeout)
	# $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})"
	fiber = Fiber.current
	
	if timeout
		timer = @timers.after(timeout) do
			if fiber.alive?
				fiber.transfer(false)
			end
		end
	end
	
	begin
		@blocked += 1
		@selector.transfer
	ensure
		@blocked -= 1
	end
ensure
	timer&.cancel!
end

#closeObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/async/scheduler.rb', line 83

def close
	# It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly.
	until self.terminate
		self.run_once!
	end
	
	Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0
	
	# We depend on GVL for consistency:
	# @guard.synchronize do
	
	# We want `@selector = nil` to be a visible side effect from this point forward, specifically in `#interrupt` and `#unblock`. If the selector is closed, then we don't want to push any fibers to it.
	selector = @selector
	@selector = nil
	
	selector&.close
	
	# end
	
	consume
end

#closed?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/async/scheduler.rb', line 107

def closed?
	@selector.nil?
end

#fiberObject



405
406
407
# File 'lib/async/scheduler.rb', line 405

def fiber(...)
	return async(...).fiber
end

#interruptObject

Interrupt the event loop and cause it to exit.



117
118
119
120
# File 'lib/async/scheduler.rb', line 117

def interrupt
	@interrupted = true
	@selector&.wakeup
end

#io_read(io, buffer, length, offset = 0) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/async/scheduler.rb', line 231

def io_read(io, buffer, length, offset = 0)
	fiber = Fiber.current
	
	if timeout = get_timeout(io)
		timer = @timers.after(timeout) do
			fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become readable!")
		end
	end
	
	@selector.io_read(fiber, io, buffer, length, offset)
ensure
	timer&.cancel!
end

#io_wait(io, events, timeout = nil) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/async/scheduler.rb', line 210

def io_wait(io, events, timeout = nil)
	fiber = Fiber.current
	
	if timeout
		# If an explicit timeout is specified, we expect that the user will handle it themselves:
		timer = @timers.after(timeout) do
			fiber.transfer
		end
	elsif timeout = get_timeout(io)
		# Otherwise, if we default to the io's timeout, we raise an exception:
		timer = @timers.after(timeout) do
			fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become ready!")
		end
	end
	
	return @selector.io_wait(fiber, io, events)
ensure
	timer&.cancel!
end

#io_write(io, buffer, length, offset = 0) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/async/scheduler.rb', line 246

def io_write(io, buffer, length, offset = 0)
	fiber = Fiber.current
	
	if timeout = get_timeout(io)
		timer = @timers.after(timeout) do
			fiber.raise(::IO::TimeoutError, "Timeout while waiting for IO to become writable!")
		end
	end
	
	@selector.io_write(fiber, io, buffer, length, offset)
ensure
	timer&.cancel!
end

#kernel_sleep(duration = nil) ⇒ Object



182
183
184
185
186
187
188
# File 'lib/async/scheduler.rb', line 182

def kernel_sleep(duration = nil)
	if duration
		self.block(nil, duration)
	else
		self.transfer
	end
end

#loadObject

Compute the scheduler load according to the busy and idle times that are updated by the run loop.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/async/scheduler.rb', line 47

def load
	total_time = @busy_time + @idle_time
	
	# If the total time is zero, then the load is zero:
	return 0.0 if total_time.zero?
	
	# We normalize to a 1 second window:
	if total_time > 1.0
		ratio = 1.0 / total_time
		@busy_time *= ratio
		@idle_time *= ratio
		
		# We don't need to divide here as we've already normalised it to a 1s window:
		return @busy_time
	else
		return @busy_time / total_time
	end
end

#process_wait(pid, flags) ⇒ Object

Wait for the specified process ID to exit.



267
268
269
# File 'lib/async/scheduler.rb', line 267

def process_wait(pid, flags)
	return @selector.process_wait(Fiber.current, pid, flags)
end

#push(fiber) ⇒ Object

Schedule a fiber (or equivalent object) to be resumed on the next loop through the reactor.



134
135
136
# File 'lib/async/scheduler.rb', line 134

def push(fiber)
	@selector.push(fiber)
end

#raise(*arguments) ⇒ Object



138
139
140
# File 'lib/async/scheduler.rb', line 138

def raise(*arguments)
	@selector.raise(*arguments)
end

#resume(fiber, *arguments) ⇒ Object



142
143
144
# File 'lib/async/scheduler.rb', line 142

def resume(fiber, *arguments)
	@selector.resume(fiber, *arguments)
end

#runObject

Run the reactor until all tasks are finished. Proxies arguments to #async immediately before entering the loop, if a block is provided.



345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/async/scheduler.rb', line 345

def run(...)
	Kernel::raise ClosedError if @selector.nil?
	
	initial_task = self.async(...) if block_given?
	interrupt = nil
	
	begin
		# In theory, we could use Exception here to be a little bit safer, but we've only shown the case for SignalException to be a problem, so let's not over-engineer this.
		Thread.handle_interrupt(::SignalException => :never) do
			while true
				# If we are interrupted, we need to exit:
				break if self.interrupted?
				
				# If we are finished, we need to exit:
				break unless self.run_once
			end
		end
	rescue Interrupt => interrupt
		Thread.handle_interrupt(::SignalException => :never) do
			self.stop
		end
		
		retry
	end
	
	# If the event loop was interrupted, and we finished exiting normally (due to the interrupt), we need to re-raise the interrupt so that the caller can handle it too.
	Kernel.raise interrupt if interrupt
	
	return initial_task
ensure
	Console.debug(self) {"Exiting run-loop because #{$! ? $! : 'finished'}."}
end

#run_once(timeout = nil) ⇒ Object

Run one iteration of the event loop. Does not handle interrupts.



275
276
277
278
279
280
281
282
283
284
# File 'lib/async/scheduler.rb', line 275

def run_once(timeout = nil)
	Kernel::raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?
	
	# If we are finished, we stop the task tree and exit:
	if self.finished?
		return false
	end
	
	return run_once!(timeout)
end

#scheduler_closeObject



66
67
68
69
70
71
72
73
# File 'lib/async/scheduler.rb', line 66

def scheduler_close
	# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
	unless $!
		self.run
	end
ensure
	self.close
end

#terminateObject

Terminate the scheduler. We deliberately ignore interrupts here, as this code can be called from an interrupt, and we don’t want to be interrupted while cleaning up.



76
77
78
79
80
# File 'lib/async/scheduler.rb', line 76

def terminate
	Thread.handle_interrupt(::Interrupt => :never) do
		super
	end
end

#timeout_after(duration, exception, message, &block) ⇒ Object



425
426
427
428
429
# File 'lib/async/scheduler.rb', line 425

def timeout_after(duration, exception, message, &block)
	with_timeout(duration, exception, message) do |timer|
		yield duration
	end
end

#to_sObject



111
112
113
# File 'lib/async/scheduler.rb', line 111

def to_s
	"\#<#{self.description} #{@children&.size || 0} children (#{stopped? ? 'stopped' : 'running'})>"
end

#transferObject

Transfer from the calling fiber to the event loop.



123
124
125
# File 'lib/async/scheduler.rb', line 123

def transfer
	@selector.transfer
end

#unblock(blocker, fiber) ⇒ Object



171
172
173
174
175
176
177
178
179
# File 'lib/async/scheduler.rb', line 171

def unblock(blocker, fiber)
	# $stderr.puts "unblock(#{blocker}, #{fiber})"
	
	# This operation is protected by the GVL:
	if selector = @selector
		selector.push(fiber)
		selector.wakeup
	end
end

#with_timeout(duration, exception = TimeoutError, message = "execution expired", &block) ⇒ Object

Invoke the block, but after the specified timeout, raise TimeoutError in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.



411
412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/async/scheduler.rb', line 411

def with_timeout(duration, exception = TimeoutError, message = "execution expired", &block)
	fiber = Fiber.current
	
	timer = @timers.after(duration) do
		if fiber.alive?
			fiber.raise(exception, message)
		end
	end
	
	yield timer
ensure
	timer&.cancel!
end

#yieldObject

Yield the current fiber and resume it on the next iteration of the event loop.



128
129
130
# File 'lib/async/scheduler.rb', line 128

def yield
	@selector.yield
end