Class: IO::Event::Selector::Select
- Inherits:
-
Object
- Object
- IO::Event::Selector::Select
- Defined in:
- lib/io/event/selector/select.rb
Overview
A pure-Ruby implementation of the event selector.
Defined Under Namespace
Constant Summary collapse
- EAGAIN =
-Errno::EAGAIN::Errno
- EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno
Instance Attribute Summary collapse
-
#idle_duration ⇒ Object
readonly
Returns the value of attribute idle_duration.
-
#loop ⇒ Object
readonly
Returns the value of attribute loop.
- #The event loop fiber.(eventloopfiber.) ⇒ Object readonly
Instance Method Summary collapse
-
#close ⇒ Object
Close the selector and release any resources.
-
#initialize(loop) ⇒ Select
constructor
Initialize the selector with the given event loop fiber.
-
#io_read(fiber, io, buffer, length, offset = 0) ⇒ Object
Read from the given IO to the buffer.
-
#io_select(readable, writable, priority, timeout) ⇒ Object
Wait for multiple IO objects to become readable or writable.
-
#io_wait(fiber, io, events) ⇒ Object
Wait for the given IO to become readable or writable.
-
#io_write(fiber, io, buffer, length, offset = 0) ⇒ Object
Write to the given IO from the buffer.
-
#process_wait(fiber, pid, flags) ⇒ Object
Wait for a process to change state.
-
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
-
#raise(fiber, *arguments, **options) ⇒ Object
Transfer to the given fiber and raise an exception.
- #ready? ⇒ Boolean
-
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber.
-
#select(duration = nil) ⇒ Object
Wait for IO events or a timeout.
- #This is the amount of time the event loop was idle during the last select call.=(istheamountoftimetheeventloopwasidleduringthelastselectcall. = (value)) ⇒ Object
-
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
-
#wakeup ⇒ Object
Wake up the event loop if it is currently sleeping.
-
#yield ⇒ Object
Yield from the current fiber back to the event loop.
Constructor Details
#initialize(loop) ⇒ Select
Initialize the selector with the given event loop fiber.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/io/event/selector/select.rb', line 14 def initialize(loop) @loop = loop @waiting = Hash.new.compare_by_identity # Flag indicating whether the selector is currently blocked in a system call. # Set to true when blocked in ::IO.select, false otherwise. # Used by wakeup() to determine if an interrupt signal is needed. @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) @idle_duration = 0.0 end |
Instance Attribute Details
#idle_duration ⇒ Object (readonly)
Returns the value of attribute idle_duration.
34 35 36 |
# File 'lib/io/event/selector/select.rb', line 34 def idle_duration @idle_duration end |
#loop ⇒ Object (readonly)
Returns the value of attribute loop.
31 32 33 |
# File 'lib/io/event/selector/select.rb', line 31 def loop @loop end |
#The event loop fiber.(eventloopfiber.) ⇒ Object (readonly)
31 |
# File 'lib/io/event/selector/select.rb', line 31 attr :loop |
Instance Method Details
#close ⇒ Object
Close the selector and release any resources.
48 49 50 51 52 53 |
# File 'lib/io/event/selector/select.rb', line 48 def close @interrupt.close @loop = nil @waiting = nil end |
#io_read(fiber, io, buffer, length, offset = 0) ⇒ Object
Read from the given IO to the buffer.
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/io/event/selector/select.rb', line 198 def io_read(fiber, io, buffer, length, offset = 0) # Ensure offset is within the bounds of the buffer to avoid ArgumentError if offset > buffer.size return -Errno::EINVAL::Errno end total = 0 Selector.nonblock(io) do while true result = Fiber.blocking{buffer.read(io, 0, offset)} if result < 0 if length > 0 and again?(result) self.io_wait(fiber, io, IO::READABLE) else return result end elsif result == 0 break else total += result break if total >= length offset += result end end end return total end |
#io_select(readable, writable, priority, timeout) ⇒ Object
Wait for multiple IO objects to become readable or writable.
180 181 182 183 184 |
# File 'lib/io/event/selector/select.rb', line 180 def io_select(readable, writable, priority, timeout) Thread.new do IO.select(readable, writable, priority, timeout) end.value end |
#io_wait(fiber, io, events) ⇒ Object
Wait for the given IO to become readable or writable.
167 168 169 170 171 172 173 |
# File 'lib/io/event/selector/select.rb', line 167 def io_wait(fiber, io, events) waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io]) @loop.transfer || false ensure waiter&.invalidate end |
#io_write(fiber, io, buffer, length, offset = 0) ⇒ Object
Write to the given IO from the buffer.
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/io/event/selector/select.rb', line 233 def io_write(fiber, io, buffer, length, offset = 0) # Ensure offset is within the bounds of the buffer to avoid ArgumentError if offset > buffer.size return -Errno::EINVAL::Errno end total = 0 Selector.nonblock(io) do while true result = Fiber.blocking{buffer.write(io, 0, offset)} if result < 0 if length > 0 and again?(result) self.io_wait(fiber, io, IO::WRITABLE) else return result end elsif result == 0 break result else total += result break if total >= length offset += result end end end return total end |
#process_wait(fiber, pid, flags) ⇒ Object
Wait for a process to change state.
270 271 272 273 274 |
# File 'lib/io/event/selector/select.rb', line 270 def process_wait(fiber, pid, flags) Thread.new do Process::Status.wait(pid, flags) end.value end |
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
99 100 101 |
# File 'lib/io/event/selector/select.rb', line 99 def push(fiber) @ready.push(fiber) end |
#raise(fiber, *arguments, **options) ⇒ Object
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
104 105 106 107 108 109 110 111 |
# File 'lib/io/event/selector/select.rb', line 104 def raise(fiber, *arguments, **) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments, **) ensure optional.nullify end |
#ready? ⇒ Boolean
114 115 116 |
# File 'lib/io/event/selector/select.rb', line 114 def ready? !@ready.empty? end |
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
79 80 81 82 83 84 85 86 |
# File 'lib/io/event/selector/select.rb', line 79 def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end |
#select(duration = nil) ⇒ Object
Wait for IO events or a timeout.
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
# File 'lib/io/event/selector/select.rb', line 293 def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end readable = Array.new writable = Array.new priority = Array.new @waiting.delete_if do |io, waiter| if io.closed? # When an IO is closed, we silently drop it. Ruby 4's `rb_thread_io_close_interrupt` will take care of interrupting any fibers waiting on the closed IO, so we don't need to do anything here. true else waiter.each do |fiber, events| if (events & IO::READABLE) > 0 readable << io end if (events & IO::WRITABLE) > 0 writable << io end if (events & IO::PRIORITY) > 0 priority << io end end false end end duration = 0 unless @ready.empty? error = nil if duration&.>(0) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) else @idle_duration = 0.0 end # We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR. Thread.handle_interrupt(::Exception => :on_blocking) do @blocked = true readable, writable, priority = ::IO.select(readable, writable, priority, duration) rescue ::Exception => error # Requeue below... ensure @blocked = false if start_time end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @idle_duration = end_time - start_time end end if error if error.is_a?(IOError) || error.is_a?(Errno::EBADF) # This can happen if an IO is closed while we're blocked in ::IO.select. Ruby 4's `rb_thread_io_close_interrupt` will take care of interrupting any fibers waiting on the closed IO, so we don't need to do anything here, except try again: return 0 end # For all other errors (e.g. thread interrupts), re-queue on the scheduler thread: Thread.current.raise(error) return 0 end ready = Hash.new(0).compare_by_identity readable&.each do |io| # Skip any IO that was closed/reused after IO.select returned - its fd number # may now belong to a different file, so resuming the waiter would be wrong: ready[io] |= IO::READABLE unless io.closed? end writable&.each do |io| ready[io] |= IO::WRITABLE unless io.closed? end priority&.each do |io| ready[io] |= IO::PRIORITY unless io.closed? end ready.each do |io, events| @waiting.delete(io).dispatch(events) do |waiter| # Re-schedule the waiting IO: waiter.tail = @waiting[io] @waiting[io] = waiter end end return ready.size end |
#This is the amount of time the event loop was idle during the last select call.=(istheamountoftimetheeventloopwasidleduringthelastselectcall. = (value)) ⇒ Object
34 |
# File 'lib/io/event/selector/select.rb', line 34 attr :idle_duration |
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
74 75 76 |
# File 'lib/io/event/selector/select.rb', line 74 def transfer @loop.transfer end |
#wakeup ⇒ Object
Wake up the event loop if it is currently sleeping.
37 38 39 40 41 42 43 44 45 |
# File 'lib/io/event/selector/select.rb', line 37 def wakeup if @blocked @interrupt.signal return true end return false end |