Class: Rage::FiberScheduler
- Inherits:
-
Object
- Object
- Rage::FiberScheduler
- Defined in:
- lib/rage/fiber_scheduler.rb
Defined Under Namespace
Modules: BlockingOperationWait
Constant Summary collapse
- MAX_READ =
65536
Instance Method Summary collapse
-
#address_resolve(hostname) ⇒ Object
Resolve a hostname to IP addresses, caching results for 60 seconds.
-
#block(_blocker, timeout = nil) ⇒ Object
Block the current fiber until unblocked or timeout.
-
#close ⇒ Object
Clean up by closing the worker pool and Iodine scheduler.
-
#fiber(&block) ⇒ Object
Create and schedule a new non-blocking fiber, handling request and user-spawned fibers differently.
-
#fiber_interrupt(fiber, exception) ⇒ Object
Interrupt a fiber by incrementing its generation and raising an exception.
-
#initialize ⇒ FiberScheduler
constructor
Initialize the scheduler, storing the root fiber and an empty DNS cache.
-
#io_read(io, buffer, length, offset = 0) ⇒ Object
Read data from an I/O object into a buffer, pausing the fiber between reads.
-
#io_wait(io, events, timeout = nil) ⇒ Object
Wait for I/O events on a file descriptor, yielding the fiber until ready or timeout.
-
#io_write(io, buffer, length, offset = 0) ⇒ Object
Write data from a buffer to an I/O object.
-
#kernel_sleep(duration = nil) ⇒ Object
Pause the current fiber for the specified duration.
-
#unblock(_blocker, fiber) ⇒ Object
Unblock a fiber by publishing to its block channel.
Constructor Details
#initialize ⇒ FiberScheduler
Initialize the scheduler, storing the root fiber and an empty DNS cache.
9 10 11 12 |
# File 'lib/rage/fiber_scheduler.rb', line 9 def initialize @root_fiber = Fiber.current @dns_cache = {} end |
Instance Method Details
#address_resolve(hostname) ⇒ Object
Resolve a hostname to IP addresses, caching results for 60 seconds.
93 94 95 96 97 98 99 100 101 |
# File 'lib/rage/fiber_scheduler.rb', line 93 def address_resolve(hostname) @dns_cache[hostname] ||= begin ::Iodine.run_after(60_000) do @dns_cache[hostname] = nil end Resolv.getaddresses(hostname) end end |
#block(_blocker, timeout = nil) ⇒ Object
Block the current fiber until unblocked or timeout.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/rage/fiber_scheduler.rb', line 104 def block(_blocker, timeout = nil) f, fulfilled = Fiber.current, false gen = (f.__wait_generation += 1) channel = f.__block_channel = "block:#{f.object_id}:#{gen}" resume_fiber_block = proc do unless fulfilled fulfilled = true ::Iodine.defer { ::Iodine.unsubscribe(channel) } f.resume if f.alive? && gen == f.__wait_generation end end ::Iodine.subscribe(channel, &resume_fiber_block) if timeout ::Iodine.run_after((timeout * 1000).to_i, &resume_fiber_block) end Fiber.yield end |
#close ⇒ Object
Clean up by closing the worker pool and Iodine scheduler.
190 191 192 193 |
# File 'lib/rage/fiber_scheduler.rb', line 190 def close @worker_pool&.close ::Iodine::Scheduler.close end |
#fiber(&block) ⇒ Object
Create and schedule a new non-blocking fiber, handling request and user-spawned fibers differently.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/rage/fiber_scheduler.rb', line 158 def fiber(&block) parent = Fiber.current fiber = if parent == @root_fiber # the fiber to wrap a request in Fiber.new(blocking: false) do Fiber.current.__set_id Rage::Telemetry.tracer.span_core_fiber_dispatch do Fiber.current.__set_result(block.call) end end else # the fiber was created in the user code Fiber.new(blocking: false) do Rage::Telemetry.tracer.span_core_fiber_spawn(parent:) do Fiber.current.__set_result(block.call) end # send a message for `Fiber.await` to work Iodine.publish(parent.__await_channel, "", Iodine::PubSub::PROCESS) if parent.__await_channel rescue Exception => e Fiber.current.__set_err(e) Iodine.publish(parent.__await_channel, Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.__await_channel end end fiber.__wait_generation = 0 fiber.resume fiber end |
#fiber_interrupt(fiber, exception) ⇒ Object
Interrupt a fiber by incrementing its generation and raising an exception.
132 133 134 135 |
# File 'lib/rage/fiber_scheduler.rb', line 132 def fiber_interrupt(fiber, exception) fiber.__wait_generation += 1 fiber.raise(exception) end |
#io_read(io, buffer, length, offset = 0) ⇒ Object
Read data from an I/O object into a buffer, pausing the fiber between reads.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/rage/fiber_scheduler.rb', line 32 def io_read(io, buffer, length, offset = 0) length_to_read = if length == 0 buffer.size > MAX_READ ? MAX_READ : buffer.size else length end while true string = ::Iodine::Scheduler.read(io.fileno, length_to_read, offset) if string.nil? return offset end if string.empty? return -Errno::EAGAIN::Errno end buffer.set_string(string, offset) size = string.bytesize offset += size return offset if size < length_to_read || size >= buffer.size Fiber.pause end end |
#io_wait(io, events, timeout = nil) ⇒ Object
Wait for I/O events on a file descriptor, yielding the fiber until ready or timeout.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/rage/fiber_scheduler.rb', line 15 def io_wait(io, events, timeout = nil) f = Fiber.current gen = (f.__wait_generation += 1) ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) do |err| f.resume(err) if f.alive? && gen == f.__wait_generation end err = Fiber.defer(io.fileno) if err == false || (err && err < 0) err else events end end |
#io_write(io, buffer, length, offset = 0) ⇒ Object
Write data from a buffer to an I/O object.
62 63 64 65 66 67 68 69 |
# File 'lib/rage/fiber_scheduler.rb', line 62 def io_write(io, buffer, length, offset = 0) bytes_to_write = length bytes_to_write = buffer.size if length == 0 ::Iodine::Scheduler.write(io.fileno, buffer.get_string, bytes_to_write, offset) bytes_to_write - offset end |
#kernel_sleep(duration = nil) ⇒ Object
Pause the current fiber for the specified duration.
73 74 75 76 |
# File 'lib/rage/fiber_scheduler.rb', line 73 def kernel_sleep(duration = nil) block(nil, duration || 0) Fiber.pause if duration.nil? || duration < 1 end |
#unblock(_blocker, fiber) ⇒ Object
Unblock a fiber by publishing to its block channel.
127 128 129 |
# File 'lib/rage/fiber_scheduler.rb', line 127 def unblock(_blocker, fiber) ::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS) if fiber.__block_channel end |