Class: IO::Event::Selector::Select

Inherits:
Object
  • Object
show all
Defined in:
lib/io/event/selector/select.rb

Overview

A pure-Ruby implementation of the event selector.

Defined Under Namespace

Classes: Optional, Waiter

Constant Summary collapse

EAGAIN =
-Errno::EAGAIN::Errno
EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ Select

Initialize the selector with the given event loop fiber.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/io/event/selector/select.rb', line 14

def initialize(loop)
	@loop = loop
	
	@waiting = Hash.new.compare_by_identity
	
	# Flag indicating whether the selector is currently blocked in a system call.
	# Set to true when blocked in ::IO.select, false otherwise.
	# Used by wakeup() to determine if an interrupt signal is needed.
	@blocked = false
	
	@ready = Queue.new
	@interrupt = Interrupt.attach(self)
	
	@idle_duration = 0.0
end

Instance Attribute Details

#idle_durationObject (readonly)

Returns the value of attribute idle_duration.



34
35
36
# File 'lib/io/event/selector/select.rb', line 34

def idle_duration
  @idle_duration
end

#loopObject (readonly)

Returns the value of attribute loop.



31
32
33
# File 'lib/io/event/selector/select.rb', line 31

def loop
  @loop
end

#The event loop fiber.(eventloopfiber.) ⇒ Object (readonly)



31
# File 'lib/io/event/selector/select.rb', line 31

attr :loop

Instance Method Details

#closeObject

Close the selector and release any resources.



48
49
50
51
52
53
# File 'lib/io/event/selector/select.rb', line 48

def close
	@interrupt.close
	
	@loop = nil
	@waiting = nil
end

#io_read(fiber, io, buffer, length, offset = 0) ⇒ Object

Read from the given IO to the buffer.



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/io/event/selector/select.rb', line 198

def io_read(fiber, io, buffer, length, offset = 0)
	# Ensure offset is within the bounds of the buffer to avoid ArgumentError
	if offset > buffer.size
		return -Errno::EINVAL::Errno
	end
	
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.read(io, 0, offset)}
			
			if result < 0
				if length > 0 and again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

#io_select(readable, writable, priority, timeout) ⇒ Object

Wait for multiple IO objects to become readable or writable.



180
181
182
183
184
# File 'lib/io/event/selector/select.rb', line 180

def io_select(readable, writable, priority, timeout)
	Thread.new do
		IO.select(readable, writable, priority, timeout)
	end.value
end

#io_wait(fiber, io, events) ⇒ Object

Wait for the given IO to become readable or writable.



167
168
169
170
171
172
173
# File 'lib/io/event/selector/select.rb', line 167

def io_wait(fiber, io, events)
	waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
	
	@loop.transfer || false
ensure
	waiter&.invalidate
end

#io_write(fiber, io, buffer, length, offset = 0) ⇒ Object

Write to the given IO from the buffer.



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/io/event/selector/select.rb', line 233

def io_write(fiber, io, buffer, length, offset = 0)
	# Ensure offset is within the bounds of the buffer to avoid ArgumentError
	if offset > buffer.size
		return -Errno::EINVAL::Errno
	end
	
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.write(io, 0, offset)}
			
			if result < 0
				if length > 0 and again?(result)
					self.io_wait(fiber, io, IO::WRITABLE)
				else
					return result
				end
			elsif result == 0
				break result
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

#process_wait(fiber, pid, flags) ⇒ Object

Wait for a process to change state.



270
271
272
273
274
# File 'lib/io/event/selector/select.rb', line 270

def process_wait(fiber, pid, flags)
	Thread.new do
		Process::Status.wait(pid, flags)
	end.value
end

#push(fiber) ⇒ Object

Append the given fiber into the ready list.



99
100
101
# File 'lib/io/event/selector/select.rb', line 99

def push(fiber)
	@ready.push(fiber)
end

#raise(fiber, *arguments, **options) ⇒ Object

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.



104
105
106
107
108
109
110
111
# File 'lib/io/event/selector/select.rb', line 104

def raise(fiber, *arguments, **options)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments, **options)
ensure
	optional.nullify
end

#ready?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/io/event/selector/select.rb', line 114

def ready?
	!@ready.empty?
end

#resume(fiber, *arguments) ⇒ Object

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.



79
80
81
82
83
84
85
86
# File 'lib/io/event/selector/select.rb', line 79

def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

#select(duration = nil) ⇒ Object

Wait for IO events or a timeout.



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'lib/io/event/selector/select.rb', line 293

def select(duration = nil)
	if pop_ready
		# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
		duration = 0
	end
	
	readable = Array.new
	writable = Array.new
	priority = Array.new
	
	@waiting.delete_if do |io, waiter|
		if io.closed?
			# When an IO is closed, we silently drop it. Ruby 4's `rb_thread_io_close_interrupt` will take care of interrupting any fibers waiting on the closed IO, so we don't need to do anything here.
			true
		else
			waiter.each do |fiber, events|
				if (events & IO::READABLE) > 0
					readable << io
				end
				
				if (events & IO::WRITABLE) > 0
					writable << io
				end
				
				if (events & IO::PRIORITY) > 0
					priority << io
				end
			end
			
			false
		end
	end
	
	duration = 0 unless @ready.empty?
	error = nil
	
	if duration&.>(0)
		start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
	else
		@idle_duration = 0.0
	end
	
	# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
	Thread.handle_interrupt(::Exception => :on_blocking) do
		@blocked = true
		readable, writable, priority = ::IO.select(readable, writable, priority, duration)
	rescue ::Exception => error
		# Requeue below...
	ensure
		@blocked = false
		if start_time
			end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
			@idle_duration = end_time - start_time
		end
	end
	
	if error
		if error.is_a?(IOError) || error.is_a?(Errno::EBADF)
			# This can happen if an IO is closed while we're blocked in ::IO.select. Ruby 4's `rb_thread_io_close_interrupt` will take care of interrupting any fibers waiting on the closed IO, so we don't need to do anything here, except try again:
			return 0
		end
		
		# For all other errors (e.g. thread interrupts), re-queue on the scheduler thread:
		Thread.current.raise(error)
		return 0
	end
	
	ready = Hash.new(0).compare_by_identity
	
	readable&.each do |io|
		# Skip any IO that was closed/reused after IO.select returned - its fd number
		# may now belong to a different file, so resuming the waiter would be wrong:
		ready[io] |= IO::READABLE unless io.closed?
	end
	
	writable&.each do |io|
		ready[io] |= IO::WRITABLE unless io.closed?
	end
	
	priority&.each do |io|
		ready[io] |= IO::PRIORITY unless io.closed?
	end
	
	ready.each do |io, events|
		@waiting.delete(io).dispatch(events) do |waiter|
			# Re-schedule the waiting IO:
			waiter.tail = @waiting[io]
			@waiting[io] = waiter
		end
	end
	
	return ready.size
end

#This is the amount of time the event loop was idle during the last select call.=(istheamountoftimetheeventloopwasidleduringthelastselectcall. = (value)) ⇒ Object



34
# File 'lib/io/event/selector/select.rb', line 34

attr :idle_duration

#transferObject

Transfer from the current fiber to the event loop.



74
75
76
# File 'lib/io/event/selector/select.rb', line 74

def transfer
	@loop.transfer
end

#wakeupObject

Wake up the event loop if it is currently sleeping.



37
38
39
40
41
42
43
44
45
# File 'lib/io/event/selector/select.rb', line 37

def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

#yieldObject

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.



89
90
91
92
93
94
95
96
# File 'lib/io/event/selector/select.rb', line 89

def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end