Class: Rage::FiberScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/rage/fiber_scheduler.rb

Defined Under Namespace

Modules: BlockingOperationWait

Constant Summary collapse

MAX_READ =
65536

Instance Method Summary collapse

Constructor Details

#initializeFiberScheduler

Initialize the scheduler, storing the root fiber and an empty DNS cache.



9
10
11
12
# File 'lib/rage/fiber_scheduler.rb', line 9

def initialize
  @root_fiber = Fiber.current
  @dns_cache = {}
end

Instance Method Details

#address_resolve(hostname) ⇒ Object

Resolve a hostname to IP addresses, caching results for 60 seconds.



93
94
95
96
97
98
99
100
101
# File 'lib/rage/fiber_scheduler.rb', line 93

def address_resolve(hostname)
  @dns_cache[hostname] ||= begin
    ::Iodine.run_after(60_000) do
      @dns_cache[hostname] = nil
    end

    Resolv.getaddresses(hostname)
  end
end

#block(_blocker, timeout = nil) ⇒ Object

Block the current fiber until unblocked or timeout.



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/rage/fiber_scheduler.rb', line 104

def block(_blocker, timeout = nil)
  f, fulfilled = Fiber.current, false

  gen = (f.__wait_generation += 1)
  channel = f.__block_channel = "block:#{f.object_id}:#{gen}"

  resume_fiber_block = proc do
    unless fulfilled
      fulfilled = true
      ::Iodine.defer { ::Iodine.unsubscribe(channel) }
      f.resume if f.alive? && gen == f.__wait_generation
    end
  end

  ::Iodine.subscribe(channel, &resume_fiber_block)
  if timeout
    ::Iodine.run_after((timeout * 1000).to_i, &resume_fiber_block)
  end

  Fiber.yield
end

#closeObject

Clean up by closing the worker pool and Iodine scheduler.



190
191
192
193
# File 'lib/rage/fiber_scheduler.rb', line 190

def close
  @worker_pool&.close
  ::Iodine::Scheduler.close
end

#fiber(&block) ⇒ Object

Create and schedule a new non-blocking fiber, handling request and user-spawned fibers differently.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/rage/fiber_scheduler.rb', line 158

def fiber(&block)
  parent = Fiber.current

  fiber = if parent == @root_fiber
    # the fiber to wrap a request in
    Fiber.new(blocking: false) do
      Fiber.current.__set_id
      Rage::Telemetry.tracer.span_core_fiber_dispatch do
        Fiber.current.__set_result(block.call)
      end
    end
  else
    # the fiber was created in the user code
    Fiber.new(blocking: false) do
      Rage::Telemetry.tracer.span_core_fiber_spawn(parent:) do
        Fiber.current.__set_result(block.call)
      end
      # send a message for `Fiber.await` to work
      Iodine.publish(parent.__await_channel, "", Iodine::PubSub::PROCESS) if parent.__await_channel
    rescue Exception => e
      Fiber.current.__set_err(e)
      Iodine.publish(parent.__await_channel, Fiber::AWAIT_ERROR_MESSAGE, Iodine::PubSub::PROCESS) if parent.__await_channel
    end
  end

  fiber.__wait_generation = 0
  fiber.resume

  fiber
end

#fiber_interrupt(fiber, exception) ⇒ Object

Interrupt a fiber by incrementing its generation and raising an exception.



132
133
134
135
# File 'lib/rage/fiber_scheduler.rb', line 132

def fiber_interrupt(fiber, exception)
  fiber.__wait_generation += 1
  fiber.raise(exception)
end

#io_read(io, buffer, length, offset = 0) ⇒ Object

Read data from an I/O object into a buffer, pausing the fiber between reads.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/rage/fiber_scheduler.rb', line 32

def io_read(io, buffer, length, offset = 0)
  length_to_read = if length == 0
    buffer.size > MAX_READ ? MAX_READ : buffer.size
  else
    length
  end

  while true
    string = ::Iodine::Scheduler.read(io.fileno, length_to_read, offset)

    if string.nil?
      return offset
    end

    if string.empty?
      return -Errno::EAGAIN::Errno
    end

    buffer.set_string(string, offset)

    size = string.bytesize
    offset += size
    return offset if size < length_to_read || size >= buffer.size

    Fiber.pause
  end
end

#io_wait(io, events, timeout = nil) ⇒ Object

Wait for I/O events on a file descriptor, yielding the fiber until ready or timeout.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/rage/fiber_scheduler.rb', line 15

def io_wait(io, events, timeout = nil)
  f = Fiber.current
  gen = (f.__wait_generation += 1)

  ::Iodine::Scheduler.attach(io.fileno, events, timeout&.ceil) do |err|
    f.resume(err) if f.alive? && gen == f.__wait_generation
  end

  err = Fiber.defer(io.fileno)
  if err == false || (err && err < 0)
    err
  else
    events
  end
end

#io_write(io, buffer, length, offset = 0) ⇒ Object

Write data from a buffer to an I/O object.



62
63
64
65
66
67
68
69
# File 'lib/rage/fiber_scheduler.rb', line 62

def io_write(io, buffer, length, offset = 0)
  bytes_to_write = length
  bytes_to_write = buffer.size if length == 0

  ::Iodine::Scheduler.write(io.fileno, buffer.get_string, bytes_to_write, offset)

  bytes_to_write - offset
end

#kernel_sleep(duration = nil) ⇒ Object

Pause the current fiber for the specified duration.



73
74
75
76
# File 'lib/rage/fiber_scheduler.rb', line 73

def kernel_sleep(duration = nil)
  block(nil, duration || 0)
  Fiber.pause if duration.nil? || duration < 1
end

#unblock(_blocker, fiber) ⇒ Object

Unblock a fiber by publishing to its block channel.



127
128
129
# File 'lib/rage/fiber_scheduler.rb', line 127

def unblock(_blocker, fiber)
  ::Iodine.publish(fiber.__block_channel, "", Iodine::PubSub::PROCESS) if fiber.__block_channel
end