Module: OMQ::Reactor

Defined in:
lib/omq/reactor.rb

Overview

Shared IO reactor for the Ruby backend.

When user code runs inside an Async reactor, engine tasks are spawned directly under the caller’s Async task. When no reactor is available (e.g. bare Thread.new), a single shared IO thread hosts all engine tasks — mirroring libzmq’s IO thread.

Engines obtain the IO thread’s root task via Reactor.root_task and use it as their @parent_task. Blocking operations from the main thread are dispatched to the IO thread via Reactor.run.

Constant Summary collapse

THREAD_NAME =
'omq-io'

Class Method Summary collapse

Class Method Details

.root_taskAsync::Task

Returns the root Async task inside the shared IO thread. Starts the thread exactly once (double-checked lock).

Returns:

  • (Async::Task)


33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/omq/reactor.rb', line 33

def root_task
  return @root_task if @root_task

  @mutex.synchronize do
    return @root_task if @root_task

    ready        = Thread::Queue.new
    @work_queue  = Async::Queue.new
    @thread      = Thread.new { run_reactor(ready) }
    @thread.name = THREAD_NAME
    @root_task   = ready.pop

    at_exit { stop! }
  end

  @root_task
end

.run(timeout: nil, &block) ⇒ Object

Runs a block inside the Async reactor.

Inside an Async reactor: runs directly. Outside: dispatches to the shared IO thread and blocks the calling thread until the result is available.

Returns:

  • (Object)

    the block’s return value



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/omq/reactor.rb', line 60

def run(timeout: nil, &block)
  task = Async::Task.current?

  if task
    if timeout
      task.with_timeout(timeout, IO::TimeoutError) { yield }
    else
      yield
    end
  else
    result = Async::Promise.new
    root_task # ensure started
    @work_queue.push([block, result, timeout])
    result.wait
  end
end

.stop!void

This method returns an undefined value.

Stops the shared IO thread.



102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/omq/reactor.rb', line 102

def stop!
  return unless @thread&.alive?

  max_linger = @lingers.empty? ? 0 : @lingers.keys.max
  @work_queue&.push(nil)
  @thread&.join(max_linger + 1)

  @thread     = nil
  @root_task  = nil
  @work_queue = nil
  @lingers    = Hash.new(0)
end

.track_linger(seconds) ⇒ Object

Registers a socket’s linger value.

Parameters:

  • seconds (Numeric, nil)

    linger value



82
83
84
# File 'lib/omq/reactor.rb', line 82

def track_linger(seconds)
  @lingers[seconds || 0] += 1
end

.untrack_linger(seconds) ⇒ Object

Unregisters a socket’s linger value.

Parameters:

  • seconds (Numeric, nil)

    linger value



91
92
93
94
95
# File 'lib/omq/reactor.rb', line 91

def untrack_linger(seconds)
  key = seconds || 0
  @lingers[key] -= 1
  @lingers.delete(key) if @lingers[key] <= 0
end