Class: UringMachine::FiberScheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/uringmachine/fiber_scheduler.rb

Overview

Implements the ‘Fiber::Scheduler` interface for creating fiber-based concurrent applications in Ruby, in tight integration with the standard Ruby I/O and locking APIs.

Constant Summary collapse

BLOCKING_OP_SUPPORT =
DEFAULT_THREAD_POOL =

The blocking operation thread pool is shared by all fiber schedulers.

BLOCKING_OP_SUPPORT && BlockingOperationThreadPool.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(machine = nil, thread_pool = DEFAULT_THREAD_POOL) ⇒ void

Instantiates a scheduler with the given UringMachine instance.

machine = UM.new
scheduler = UM::FiberScheduler.new(machine)
Fiber.set_scheduler(scheduler)

Parameters:

  • machine (UringMachine, nil) (defaults to: nil)

    UringMachine instance



100
101
102
103
104
105
# File 'lib/uringmachine/fiber_scheduler.rb', line 100

def initialize(machine = nil, thread_pool = DEFAULT_THREAD_POOL)
  @machine = machine || UM.new
  @thread_pool = thread_pool
  @fiber_map = ObjectSpace::WeakMap.new
  @thread = Thread.current
end

Instance Attribute Details

#fiber_mapObject (readonly)

WeakMap holding references scheduler fibers as keys.



90
91
92
# File 'lib/uringmachine/fiber_scheduler.rb', line 90

def fiber_map
  @fiber_map
end

#machineObject (readonly)

UringMachine instance associated with scheduler.



87
88
89
# File 'lib/uringmachine/fiber_scheduler.rb', line 87

def machine
  @machine
end

Instance Method Details

#address_resolve(hostname) ⇒ Array<Addrinfo>

Resolves an hostname.

Parameters:

  • hostname (String)

    hostname to resolve

Returns:

  • (Array<Addrinfo>)

    array of resolved addresses



399
400
401
402
# File 'lib/uringmachine/fiber_scheduler.rb', line 399

def address_resolve(hostname)
  # p address_resolve: [hostname]
  Resolv.getaddresses(hostname)
end

#block(blocker, timeout = nil) ⇒ bool

Blocks the current fiber by yielding to the machine. This hook is called when a synchronization mechanism blocks, e.g. a mutex, a queue, etc.

Parameters:

  • blocker (any)

    blocker object

  • timeout (Number, nil) (defaults to: nil)

    optional timeout

Returns:

  • (bool)

    was the operation successful



170
171
172
173
174
175
176
177
178
179
180
# File 'lib/uringmachine/fiber_scheduler.rb', line 170

def block(blocker, timeout = nil)
  # p block: [blocker, timeout]
  if timeout
    @machine.timeout(timeout, Timeout::Error) { @machine.yield }
  else
    @machine.yield
  end
  true
rescue Timeout::Error
  false
end

#blocking_operation_wait(op) ⇒ void

This method returns an undefined value.

Runs the given operation in a separate thread, so as not to block other fibers.

Parameters:

  • op (callable)

    blocking operation



157
158
159
160
# File 'lib/uringmachine/fiber_scheduler.rb', line 157

def blocking_operation_wait(op)
  # p blocking_operation_wait: [op]
  @thread_pool.process(@machine, op)
end

#fiber(&block) ⇒ Fiber

Creates a new fiber with the given block. The created fiber is added to the fiber map, scheduled on the scheduler machine, and started before this method returns (by calling snooze).

Parameters:

  • block (Proc)

    fiber block

Returns:



118
119
120
121
122
123
124
125
126
# File 'lib/uringmachine/fiber_scheduler.rb', line 118

def fiber(&block)
  # p fiber: [block]
  fiber = Fiber.new(blocking: false) { @machine.run(fiber, &block) }

  @fiber_map[fiber] = true
  @machine.schedule(fiber, nil)
  @machine.snooze
  fiber
end

#fiber_interrupt(fiber, exception) ⇒ void

This method returns an undefined value.

Interrupts the given fiber with an exception.

Parameters:

  • fiber (Fiber)

    fiber to interrupt

  • exception (Exception)

    Exception



389
390
391
392
393
# File 'lib/uringmachine/fiber_scheduler.rb', line 389

def fiber_interrupt(fiber, exception)
  # p fiber_interrupt: [fiber, exception]
  @machine.schedule(fiber, exception)
  @machine.wakeup
end

#instance_variables_to_inspectObject

:nodoc:



108
109
110
# File 'lib/uringmachine/fiber_scheduler.rb', line 108

def instance_variables_to_inspect
  [:@machine]
end

#io_close(fd) ⇒ Integer

Closes the given fd.

Parameters:

  • fd (Integer)

    file descriptor

Returns:

  • (Integer)

    file descriptor



364
365
366
367
368
369
# File 'lib/uringmachine/fiber_scheduler.rb', line 364

def io_close(fd)
  # p io_close: [fd]
  @machine.close_async(fd)
rescue Errno => e
  -e.errno
end

#io_pread(io, buffer, from, length, offset) ⇒ Integer

Reads from the given IO at the given file offset

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    read buffer

  • from (Integer)

    read offset

  • length (Integer)

    read length

  • offset (Integer)

    buffer offset

Returns:

  • (Integer)

    bytes read



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/uringmachine/fiber_scheduler.rb', line 286

def io_pread(io, buffer, from, length, offset)
  # p io_pread: [io, buffer, from, length, offset]
  length = buffer.size if length == 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.read(io.fileno, buffer, length, offset, from)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.read(io.fileno, buffer, length, offset, from)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#io_pwrite(io, buffer, from, length, offset) ⇒ Integer

Writes to the given IO at the given file offset.

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    write buffer

  • from (Integer)

    file offset

  • length (Integer)

    write length

  • offset (Integer)

    buffer offset

Returns:

  • (Integer)

    bytes written



340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# File 'lib/uringmachine/fiber_scheduler.rb', line 340

def io_pwrite(io, buffer, from, length, offset)
  # p io_pwrite: [io, buffer, from, length, offset]
  length = buffer.size if length == 0
  buffer = buffer.slice(offset) if offset > 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.write(io.fileno, buffer, length, from)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.write(io.fileno, buffer, length, from)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#io_read(io, buffer, length, offset) ⇒ Integer

Reads from the given IO.

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    read buffer

  • length (Integer)

    read length

  • offset (Integer)

    buffer offset

Returns:

  • (Integer)

    bytes read



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/uringmachine/fiber_scheduler.rb', line 259

def io_read(io, buffer, length, offset)
  # p io_read: [io, buffer, length, offset]
  length = buffer.size if length == 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.read(io.fileno, buffer, length, offset)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.read(io.fileno, buffer, length, offset)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#io_select(rios, wios, eios, timeout = nil) ⇒ Object

Selects the first ready IOs from the given sets of IOs.

Parameters:

  • rios (Array<IO>)

    readable IOs

  • wios (Array<IO>)

    writable IOs

  • eios (Array<IO>)

    exceptable IOs

  • timeout (Number, nil) (defaults to: nil)

    optional timeout



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# File 'lib/uringmachine/fiber_scheduler.rb', line 234

def io_select(rios, wios, eios, timeout = nil)
  # p io_select: [rios, wios, eios, timeout]
  map_r = map_fds(rios)
  map_w = map_fds(wios)
  map_e = map_fds(eios)

  r, w, e = nil
  if timeout
    @machine.timeout(timeout, Timeout::Error) {
      r, w, e = @machine.select(map_r.keys, map_w.keys, map_e.keys)
    }
  else
    r, w, e = @machine.select(map_r.keys, map_w.keys, map_e.keys)
  end

  [unmap_fds(r, map_r), unmap_fds(w, map_w), unmap_fds(e, map_e)]
end

#io_wait(io, events, timeout = nil) ⇒ void

This method returns an undefined value.

Waits for the given io to become ready.

Parameters:

  • io (IO)

    IO object

  • events (Number)

    readiness bitmask

  • timeout (Number, nil) (defaults to: nil)

    optional timeout



216
217
218
219
220
221
222
223
224
225
226
# File 'lib/uringmachine/fiber_scheduler.rb', line 216

def io_wait(io, events, timeout = nil)
  # p io_wait: [io, events, timeout]
  timeout ||= io.timeout
  if timeout
    @machine.timeout(timeout, Timeout::Error) {
      @machine.poll(io.fileno, events)
    }
  else
    @machine.poll(io.fileno, events)
  end
end

#io_write(io, buffer, length, offset) ⇒ Integer

Writes to the given IO.

Parameters:

  • io (IO)

    IO object

  • buffer (IO::Buffer)

    write buffer

  • length (Integer)

    write length

  • offset (Integer)

    write offset

Returns:

  • (Integer)

    bytes written



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# File 'lib/uringmachine/fiber_scheduler.rb', line 312

def io_write(io, buffer, length, offset)
  # p io_write: [io, buffer, length, offset]
  length = buffer.size if length == 0
  buffer = buffer.slice(offset) if offset > 0

  if (timeout = io.timeout)
    @machine.timeout(timeout, Timeout::Error) do
      @machine.write(io.fileno, buffer, length)
    rescue Errno::EINTR
      retry
    end
  else
    @machine.write(io.fileno, buffer, length)
  end
rescue Errno::EINTR
  retry
rescue Errno => e
  -e.errno
end

#join(*fibers) ⇒ void

This method returns an undefined value.

Waits for the given fibers to terminate. If no fibers are given, waits for all fibers to terminate.

Parameters:

  • fibers (Array<Fiber>)

    fibers to terminate



141
142
143
144
145
146
147
148
# File 'lib/uringmachine/fiber_scheduler.rb', line 141

def join(*fibers)
  if fibers.empty?
    fibers = @fiber_map.keys
    @fiber_map = ObjectSpace::WeakMap.new
  end

  @machine.await(fibers)
end

#kernel_sleep(duration = nil) ⇒ void

This method returns an undefined value.

Sleeps for the given duration.

Parameters:

  • duration (Number, nil) (defaults to: nil)

    sleep duration



199
200
201
202
# File 'lib/uringmachine/fiber_scheduler.rb', line 199

def kernel_sleep(duration = nil)
  # p kernel_sleep: duration
  duration ? @machine.sleep(duration) : @machine.yield
end

#process_wait(pid, flags) ⇒ Process::Status

Waits for a process to terminate.

Parameters:

  • pid (Integer)

    process pid (0 for any child process)

  • flags (Integer)

    waitpid flags

Returns:

  • (Process::Status)

    terminated process status



378
379
380
381
# File 'lib/uringmachine/fiber_scheduler.rb', line 378

def process_wait(pid, flags)
  flags = UM::WEXITED if flags == 0
  @machine.waitid_status(UM::P_PID, pid, flags)
end

#scheduler_closevoid

This method returns an undefined value.

Waits for all fiber to terminate. Called upon thread termination or when the thread’s fiber scheduler is changed.



132
133
134
# File 'lib/uringmachine/fiber_scheduler.rb', line 132

def scheduler_close
  join()
end

#timeout_after(duration, exception, message, &block) ⇒ any

Run the given block with a timeout.

Parameters:

  • duration (Number)

    timeout duration

  • exception (Class)

    exception Class

  • message (String)

    exception message

  • block (Proc)

    block to run

Returns:

  • (any)

    block return value



411
412
413
414
# File 'lib/uringmachine/fiber_scheduler.rb', line 411

def timeout_after(duration, exception, message, &block)
  # p timeout_after:  [duration, exception, message, block]
  @machine.timeout(duration, exception, &block)
end

#unblock(blocker, fiber) ⇒ void

This method returns an undefined value.

Unblocks the given fiber by scheduling it. This hook is called when a synchronization mechanism unblocks, e.g. a mutex, a queue, etc.

Parameters:

  • blocker (any)

    blocker object

  • fiber (Fiber)

    fiber to resume



189
190
191
192
193
# File 'lib/uringmachine/fiber_scheduler.rb', line 189

def unblock(blocker, fiber)
  # p unblock: [blocker, fiber]
  @machine.schedule(fiber, nil)
  @machine.wakeup if Thread.current != @thread
end

#yieldObject

Yields to the next runnable fiber.



205
206
207
208
# File 'lib/uringmachine/fiber_scheduler.rb', line 205

def yield
  # p yield: []
  @machine.snooze
end