Class: IO::Event::Selector::Select
- Inherits:
-
Object
- Object
- IO::Event::Selector::Select
- Defined in:
- lib/io/event/selector/select.rb
Overview
A pure-Ruby implementation of the event selector.
Defined Under Namespace
Constant Summary collapse
- EAGAIN =
-Errno::EAGAIN::Errno
- EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno
Instance Attribute Summary collapse
-
#idle_duration ⇒ Object
readonly
Returns the value of attribute idle_duration.
-
#loop ⇒ Object
readonly
Returns the value of attribute loop.
- #The event loop fiber.(eventloopfiber.) ⇒ Object readonly
Instance Method Summary collapse
-
#close ⇒ Object
Close the selector and release any resources.
-
#initialize(loop) ⇒ Select
constructor
Initialize the selector with the given event loop fiber.
-
#io_read(fiber, io, buffer, length, offset = 0) ⇒ Object
Read from the given IO to the buffer.
-
#io_select(readable, writable, priority, timeout) ⇒ Object
Wait for multiple IO objects to become readable or writable.
-
#io_wait(fiber, io, events) ⇒ Object
Wait for the given IO to become readable or writable.
-
#io_write(fiber, io, buffer, length, offset = 0) ⇒ Object
Write to the given IO from the buffer.
-
#process_wait(fiber, pid, flags) ⇒ Object
Wait for a process to change state.
-
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
-
#raise(fiber, *arguments, **options) ⇒ Object
Transfer to the given fiber and raise an exception.
- #ready? ⇒ Boolean
-
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber.
-
#select(duration = nil) ⇒ Object
Wait for IO events or a timeout.
- #This is the amount of time the event loop was idle during the last select call.=(istheamountoftimetheeventloopwasidleduringthelastselectcall. = (value)) ⇒ Object
-
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
-
#wakeup ⇒ Object
Wake up the event loop if it is currently sleeping.
-
#yield ⇒ Object
Yield from the current fiber back to the event loop.
Constructor Details
#initialize(loop) ⇒ Select
Initialize the selector with the given event loop fiber.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/io/event/selector/select.rb', line 14 def initialize(loop) @loop = loop @waiting = Hash.new.compare_by_identity # Flag indicating whether the selector is currently blocked in a system call. # Set to true when blocked in ::IO.select, false otherwise. # Used by wakeup() to determine if an interrupt signal is needed. @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) @idle_duration = 0.0 end |
Instance Attribute Details
#idle_duration ⇒ Object (readonly)
Returns the value of attribute idle_duration.
34 35 36 |
# File 'lib/io/event/selector/select.rb', line 34 def idle_duration @idle_duration end |
#loop ⇒ Object (readonly)
Returns the value of attribute loop.
31 32 33 |
# File 'lib/io/event/selector/select.rb', line 31 def loop @loop end |
#The event loop fiber.(eventloopfiber.) ⇒ Object (readonly)
31 |
# File 'lib/io/event/selector/select.rb', line 31 attr :loop |
Instance Method Details
#close ⇒ Object
Close the selector and release any resources.
48 49 50 51 52 53 |
# File 'lib/io/event/selector/select.rb', line 48 def close @interrupt.close @loop = nil @waiting = nil end |
#io_read(fiber, io, buffer, length, offset = 0) ⇒ Object
Read from the given IO to the buffer.
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/io/event/selector/select.rb', line 190 def io_read(fiber, io, buffer, length, offset = 0) # Ensure offset is within the bounds of the buffer to avoid ArgumentError if offset > buffer.size return -Errno::EINVAL::Errno end total = 0 Selector.nonblock(io) do while true result = Fiber.blocking{buffer.read(io, 0, offset)} if result < 0 if length > 0 and again?(result) self.io_wait(fiber, io, IO::READABLE) else return result end elsif result == 0 break else total += result break if total >= length offset += result end end end return total end |
#io_select(readable, writable, priority, timeout) ⇒ Object
Wait for multiple IO objects to become readable or writable.
172 173 174 175 176 |
# File 'lib/io/event/selector/select.rb', line 172 def io_select(readable, writable, priority, timeout) Thread.new do IO.select(readable, writable, priority, timeout) end.value end |
#io_wait(fiber, io, events) ⇒ Object
Wait for the given IO to become readable or writable.
159 160 161 162 163 164 165 |
# File 'lib/io/event/selector/select.rb', line 159 def io_wait(fiber, io, events) waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io]) @loop.transfer || false ensure waiter&.invalidate end |
#io_write(fiber, io, buffer, length, offset = 0) ⇒ Object
Write to the given IO from the buffer.
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/io/event/selector/select.rb', line 225 def io_write(fiber, io, buffer, length, offset = 0) # Ensure offset is within the bounds of the buffer to avoid ArgumentError if offset > buffer.size return -Errno::EINVAL::Errno end total = 0 Selector.nonblock(io) do while true result = Fiber.blocking{buffer.write(io, 0, offset)} if result < 0 if length > 0 and again?(result) self.io_wait(fiber, io, IO::WRITABLE) else return result end elsif result == 0 break result else total += result break if total >= length offset += result end end end return total end |
#process_wait(fiber, pid, flags) ⇒ Object
Wait for a process to change state.
262 263 264 265 266 |
# File 'lib/io/event/selector/select.rb', line 262 def process_wait(fiber, pid, flags) Thread.new do Process::Status.wait(pid, flags) end.value end |
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
95 96 97 |
# File 'lib/io/event/selector/select.rb', line 95 def push(fiber) @ready.push(fiber) end |
#raise(fiber, *arguments, **options) ⇒ Object
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
100 101 102 103 104 105 106 107 |
# File 'lib/io/event/selector/select.rb', line 100 def raise(fiber, *arguments, **) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments, **) ensure optional.nullify end |
#ready? ⇒ Boolean
110 111 112 |
# File 'lib/io/event/selector/select.rb', line 110 def ready? !@ready.empty? end |
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
75 76 77 78 79 80 81 82 |
# File 'lib/io/event/selector/select.rb', line 75 def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end |
#select(duration = nil) ⇒ Object
Wait for IO events or a timeout.
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# File 'lib/io/event/selector/select.rb', line 285 def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end readable = Array.new writable = Array.new priority = Array.new @waiting.delete_if do |io, waiter| if io.closed? # When an IO is closed, we silently drop it. Ruby 4's `rb_thread_io_close_interrupt` will take care of interrupting any fibers waiting on the closed IO, so we don't need to do anything here. true else waiter.each do |fiber, events| if (events & IO::READABLE) > 0 readable << io end if (events & IO::WRITABLE) > 0 writable << io end if (events & IO::PRIORITY) > 0 priority << io end end false end end duration = 0 unless @ready.empty? error = nil if duration&.>(0) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) else @idle_duration = 0.0 end # We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR. Thread.handle_interrupt(::Exception => :on_blocking) do @blocked = true readable, writable, priority = ::IO.select(readable, writable, priority, duration) rescue ::Exception => error # Requeue below... ensure @blocked = false if start_time end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @idle_duration = end_time - start_time end end if error if error.is_a?(IOError) || error.is_a?(Errno::EBADF) # This can happen if an IO is closed while we're blocked in ::IO.select. Ruby 4's `rb_thread_io_close_interrupt` will take care of interrupting any fibers waiting on the closed IO, so we don't need to do anything here, except try again: return 0 end # For all other errors (e.g. thread interrupts), re-queue on the scheduler thread: Thread.current.raise(error) return 0 end ready = Hash.new(0).compare_by_identity readable&.each do |io| # Skip any IO that was closed/reused after IO.select returned - its fd number # may now belong to a different file, so resuming the waiter would be wrong: ready[io] |= IO::READABLE unless io.closed? end writable&.each do |io| ready[io] |= IO::WRITABLE unless io.closed? end priority&.each do |io| ready[io] |= IO::PRIORITY unless io.closed? end ready.each do |io, events| @waiting.delete(io).dispatch(events) do |waiter| # Re-schedule the waiting IO: waiter.tail = @waiting[io] @waiting[io] = waiter end end return ready.size end |
#This is the amount of time the event loop was idle during the last select call.=(istheamountoftimetheeventloopwasidleduringthelastselectcall. = (value)) ⇒ Object
34 |
# File 'lib/io/event/selector/select.rb', line 34 attr :idle_duration |
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
70 71 72 |
# File 'lib/io/event/selector/select.rb', line 70 def transfer @loop.transfer end |
#wakeup ⇒ Object
Wake up the event loop if it is currently sleeping.
37 38 39 40 41 42 43 44 45 |
# File 'lib/io/event/selector/select.rb', line 37 def wakeup if @blocked @interrupt.signal return true end return false end |