Module: OMQ::Reactor

Defined in:
lib/omq/reactor.rb

Overview

Shared IO reactor for the Ruby backend.

When user code runs inside an Async reactor, engine tasks are spawned directly under the caller’s Async task. When no reactor is available (e.g. bare Thread.new), a single shared IO thread hosts all engine tasks — mirroring libzmq’s IO thread.

Engines obtain the IO thread’s root task via Reactor.root_task and use it as their @parent_task. Blocking operations from the main thread are dispatched to the IO thread via Reactor.run.

Constant Summary collapse

THREAD_NAME =
'omq-io'

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.lingersHash{Numeric => Integer} (readonly)

Returns linger value → active socket count.

Returns:

  • (Hash{Numeric => Integer})

    linger value → active socket count



30
31
32
# File 'lib/omq/reactor.rb', line 30

def lingers
  @lingers
end

Class Method Details

.root_taskAsync::Task

Returns the root Async task inside the shared IO thread. Starts the thread exactly once (double-checked lock).

Returns:

  • (Async::Task)


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/omq/reactor.rb', line 38

def root_task
  return @root_task if @root_task

  @mutex.synchronize do
    return @root_task if @root_task

    ready        = Thread::Queue.new
    @work_queue  = Async::Queue.new
    @thread      = Thread.new { run_reactor(ready) }
    @thread.name = THREAD_NAME
    @root_task   = ready.pop

    at_exit { stop! }
  end

  @root_task
end

.run(timeout: nil, &block) ⇒ Object

Runs a block inside the Async reactor.

Inside an Async reactor: runs directly. Outside: dispatches to the shared IO thread and blocks the calling thread until the result is available.

Returns:

  • (Object)

    the block’s return value



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/omq/reactor.rb', line 65

def run(timeout: nil, &block)
  task = Async::Task.current?

  if task
    if timeout
      task.with_timeout(timeout, IO::TimeoutError) { yield }
    else
      yield
    end
  else
    result = Async::Promise.new
    root_task # ensure started
    @work_queue << [block, result, timeout]
    result.wait
  end
end

.stop!void

This method returns an undefined value.

Stops the shared IO thread.



110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/omq/reactor.rb', line 110

def stop!
  return unless @thread&.alive?

  max_linger = @lingers.empty? ? 0 : @lingers.keys.max

  @work_queue << nil if @work_queue
  @thread&.join(max_linger + 1)

  @thread     = nil
  @root_task  = nil
  @work_queue = nil
  @lingers.clear
end

.track_linger(seconds) ⇒ Object

Registers a socket’s linger value.

Parameters:

  • seconds (Numeric, nil)

    linger value



87
88
89
# File 'lib/omq/reactor.rb', line 87

def track_linger(seconds)
  @lingers[seconds || 0] += 1
end

.untrack_linger(seconds) ⇒ Object

Unregisters a socket’s linger value.

Parameters:

  • seconds (Numeric, nil)

    linger value



96
97
98
99
100
101
102
103
# File 'lib/omq/reactor.rb', line 96

def untrack_linger(seconds)
  key            = seconds || 0
  @lingers[key] -= 1

  if @lingers[key] <= 0
    @lingers.delete(key)
  end
end