Module: OMQ::Reactor
- Defined in:
- lib/omq/reactor.rb
Overview
Shared IO reactor for the Ruby backend.
When user code runs inside an Async reactor, engine tasks are spawned directly under the caller’s Async task. When no reactor is available (e.g. bare Thread.new), a single shared IO thread hosts all engine tasks — mirroring libzmq’s IO thread.
Engines obtain the IO thread’s root task via Reactor.root_task and use it as their @parent_task. Blocking operations from the main thread are dispatched to the IO thread via Reactor.run.
Constant Summary collapse
- THREAD_NAME =
'omq-io'
Class Attribute Summary collapse
-
.lingers ⇒ Hash{Numeric => Integer}
readonly
Linger value → active socket count.
Class Method Summary collapse
-
.root_task ⇒ Async::Task
Returns the root Async task inside the shared IO thread.
-
.run(timeout: nil, &block) ⇒ Object
Runs a block inside the Async reactor.
-
.stop! ⇒ void
Stops the shared IO thread.
-
.track_linger(seconds) ⇒ Object
Registers a socket’s linger value.
-
.untrack_linger(seconds) ⇒ Object
Unregisters a socket’s linger value.
Class Attribute Details
.lingers ⇒ Hash{Numeric => Integer} (readonly)
Returns linger value → active socket count.
30 31 32 |
# File 'lib/omq/reactor.rb', line 30 def lingers @lingers end |
Class Method Details
.root_task ⇒ Async::Task
Returns the root Async task inside the shared IO thread. Starts the thread exactly once (double-checked lock).
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/omq/reactor.rb', line 38 def root_task return @root_task if @root_task @mutex.synchronize do return @root_task if @root_task ready = Thread::Queue.new @work_queue = Async::Queue.new @thread = Thread.new { run_reactor(ready) } @thread.name = THREAD_NAME @root_task = ready.pop at_exit { stop! } end @root_task end |
.run(timeout: nil, &block) ⇒ Object
Runs a block inside the Async reactor.
Inside an Async reactor: runs directly. Outside: dispatches to the shared IO thread and blocks the calling thread until the result is available.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/omq/reactor.rb', line 65 def run(timeout: nil, &block) task = Async::Task.current? if task if timeout task.with_timeout(timeout, IO::TimeoutError) { yield } else yield end else result = Async::Promise.new root_task # ensure started @work_queue << [block, result, timeout] result.wait end end |
.stop! ⇒ void
This method returns an undefined value.
Stops the shared IO thread.
110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/omq/reactor.rb', line 110 def stop! return unless @thread&.alive? max_linger = @lingers.empty? ? 0 : @lingers.keys.max @work_queue << nil if @work_queue @thread&.join(max_linger + 1) @thread = nil @root_task = nil @work_queue = nil @lingers.clear end |
.track_linger(seconds) ⇒ Object
Registers a socket’s linger value.
87 88 89 |
# File 'lib/omq/reactor.rb', line 87 def track_linger(seconds) @lingers[seconds || 0] += 1 end |
.untrack_linger(seconds) ⇒ Object
Unregisters a socket’s linger value.
96 97 98 99 100 101 102 103 |
# File 'lib/omq/reactor.rb', line 96 def untrack_linger(seconds) key = seconds || 0 @lingers[key] -= 1 if @lingers[key] <= 0 @lingers.delete(key) end end |