Class: IO::Event::Selector::Select

Inherits:
Object
  • Object
show all
Defined in:
lib/io/event/selector/select.rb

Overview

A pure-Ruby implementation of the event selector.

Defined Under Namespace

Classes: Optional, Waiter

Constant Summary collapse

EAGAIN =
-Errno::EAGAIN::Errno
EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(loop) ⇒ Select

Initialize the selector with the given event loop fiber.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/io/event/selector/select.rb', line 14

def initialize(loop)
	@loop = loop
	
	@waiting = Hash.new.compare_by_identity
	
	# Flag indicating whether the selector is currently blocked in a system call.
	# Set to true when blocked in ::IO.select, false otherwise.
	# Used by wakeup() to determine if an interrupt signal is needed.
	@blocked = false
	
	@ready = Queue.new
	@interrupt = Interrupt.attach(self)
	
	@idle_duration = 0.0
end

Instance Attribute Details

#idle_durationObject (readonly)

Returns the value of attribute idle_duration.



34
35
36
# File 'lib/io/event/selector/select.rb', line 34

def idle_duration
  @idle_duration
end

#loopObject (readonly)

Returns the value of attribute loop.



31
32
33
# File 'lib/io/event/selector/select.rb', line 31

def loop
  @loop
end

#The event loop fiber.(eventloopfiber.) ⇒ Object (readonly)



31
# File 'lib/io/event/selector/select.rb', line 31

attr :loop

Instance Method Details

#closeObject

Close the selector and release any resources.



48
49
50
51
52
53
# File 'lib/io/event/selector/select.rb', line 48

def close
	@interrupt.close
	
	@loop = nil
	@waiting = nil
end

#io_read(fiber, io, buffer, length, offset = 0) ⇒ Object

Read from the given IO to the buffer.



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/io/event/selector/select.rb', line 190

def io_read(fiber, io, buffer, length, offset = 0)
	# Ensure offset is within the bounds of the buffer to avoid ArgumentError
	if offset > buffer.size
		return -Errno::EINVAL::Errno
	end
	
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.read(io, 0, offset)}
			
			if result < 0
				if length > 0 and again?(result)
					self.io_wait(fiber, io, IO::READABLE)
				else
					return result
				end
			elsif result == 0
				break
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

#io_select(readable, writable, priority, timeout) ⇒ Object

Wait for multiple IO objects to become readable or writable.



172
173
174
175
176
# File 'lib/io/event/selector/select.rb', line 172

def io_select(readable, writable, priority, timeout)
	Thread.new do
		IO.select(readable, writable, priority, timeout)
	end.value
end

#io_wait(fiber, io, events) ⇒ Object

Wait for the given IO to become readable or writable.



159
160
161
162
163
164
165
# File 'lib/io/event/selector/select.rb', line 159

def io_wait(fiber, io, events)
	waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
	
	@loop.transfer || false
ensure
	waiter&.invalidate
end

#io_write(fiber, io, buffer, length, offset = 0) ⇒ Object

Write to the given IO from the buffer.



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/io/event/selector/select.rb', line 225

def io_write(fiber, io, buffer, length, offset = 0)
	# Ensure offset is within the bounds of the buffer to avoid ArgumentError
	if offset > buffer.size
		return -Errno::EINVAL::Errno
	end
	
	total = 0
	
	Selector.nonblock(io) do
		while true
			result = Fiber.blocking{buffer.write(io, 0, offset)}
			
			if result < 0
				if length > 0 and again?(result)
					self.io_wait(fiber, io, IO::WRITABLE)
				else
					return result
				end
			elsif result == 0
				break result
			else
				total += result
				break if total >= length
				offset += result
			end
		end
	end
	
	return total
end

#process_wait(fiber, pid, flags) ⇒ Object

Wait for a process to change state.



262
263
264
265
266
# File 'lib/io/event/selector/select.rb', line 262

def process_wait(fiber, pid, flags)
	Thread.new do
		Process::Status.wait(pid, flags)
	end.value
end

#push(fiber) ⇒ Object

Append the given fiber into the ready list.



95
96
97
# File 'lib/io/event/selector/select.rb', line 95

def push(fiber)
	@ready.push(fiber)
end

#raise(fiber, *arguments, **options) ⇒ Object

Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.



100
101
102
103
104
105
106
107
# File 'lib/io/event/selector/select.rb', line 100

def raise(fiber, *arguments, **options)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.raise(*arguments, **options)
ensure
	optional.nullify
end

#ready?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/io/event/selector/select.rb', line 110

def ready?
	!@ready.empty?
end

#resume(fiber, *arguments) ⇒ Object

Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.



75
76
77
78
79
80
81
82
# File 'lib/io/event/selector/select.rb', line 75

def resume(fiber, *arguments)
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	fiber.transfer(*arguments)
ensure
	optional.nullify
end

#select(duration = nil) ⇒ Object

Wait for IO events or a timeout.



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/io/event/selector/select.rb', line 285

def select(duration = nil)
	if pop_ready
		# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
		duration = 0
	end
	
	readable = Array.new
	writable = Array.new
	priority = Array.new
	
	@waiting.delete_if do |io, waiter|
		if io.closed?
			# When an IO is closed, we silently drop it. Ruby 4's `rb_thread_io_close_interrupt` will take care of interrupting any fibers waiting on the closed IO, so we don't need to do anything here.
			true
		else
			waiter.each do |fiber, events|
				if (events & IO::READABLE) > 0
					readable << io
				end
				
				if (events & IO::WRITABLE) > 0
					writable << io
				end
				
				if (events & IO::PRIORITY) > 0
					priority << io
				end
			end
			
			false
		end
	end
	
	duration = 0 unless @ready.empty?
	error = nil
	
	if duration&.>(0)
		start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
	else
		@idle_duration = 0.0
	end
	
	# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
	Thread.handle_interrupt(::Exception => :on_blocking) do
		@blocked = true
		readable, writable, priority = ::IO.select(readable, writable, priority, duration)
	rescue ::Exception => error
		# Requeue below...
	ensure
		@blocked = false
		if start_time
			end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
			@idle_duration = end_time - start_time
		end
	end
	
	if error
		if error.is_a?(IOError) || error.is_a?(Errno::EBADF)
			# This can happen if an IO is closed while we're blocked in ::IO.select. Ruby 4's `rb_thread_io_close_interrupt` will take care of interrupting any fibers waiting on the closed IO, so we don't need to do anything here, except try again:
			return 0
		end
		
		# For all other errors (e.g. thread interrupts), re-queue on the scheduler thread:
		Thread.current.raise(error)
		return 0
	end
	
	ready = Hash.new(0).compare_by_identity
	
	readable&.each do |io|
		# Skip any IO that was closed/reused after IO.select returned - its fd number
		# may now belong to a different file, so resuming the waiter would be wrong:
		ready[io] |= IO::READABLE unless io.closed?
	end
	
	writable&.each do |io|
		ready[io] |= IO::WRITABLE unless io.closed?
	end
	
	priority&.each do |io|
		ready[io] |= IO::PRIORITY unless io.closed?
	end
	
	ready.each do |io, events|
		@waiting.delete(io).dispatch(events) do |waiter|
			# Re-schedule the waiting IO:
			waiter.tail = @waiting[io]
			@waiting[io] = waiter
		end
	end
	
	return ready.size
end

#This is the amount of time the event loop was idle during the last select call.=(istheamountoftimetheeventloopwasidleduringthelastselectcall. = (value)) ⇒ Object



34
# File 'lib/io/event/selector/select.rb', line 34

attr :idle_duration

#transferObject

Transfer from the current fiber to the event loop.



70
71
72
# File 'lib/io/event/selector/select.rb', line 70

def transfer
	@loop.transfer
end

#wakeupObject

Wake up the event loop if it is currently sleeping.



37
38
39
40
41
42
43
44
45
# File 'lib/io/event/selector/select.rb', line 37

def wakeup
	if @blocked
		@interrupt.signal
		
		return true
	end
	
	return false
end

#yieldObject

Yield from the current fiber back to the event loop. Put the current fiber into the ready list.



85
86
87
88
89
90
91
92
# File 'lib/io/event/selector/select.rb', line 85

def yield
	optional = Optional.new(Fiber.current)
	@ready.push(optional)
	
	@loop.transfer
ensure
	optional.nullify
end