Module: OMQ::Reactor
- Defined in:
- lib/omq/reactor.rb
Overview
Shared IO reactor for the Ruby backend.
When user code runs inside an Async reactor, engine tasks are spawned directly under the caller’s Async task. When no reactor is available (e.g. bare Thread.new), a single shared IO thread hosts all engine tasks — mirroring libzmq’s IO thread.
Engines obtain the IO thread’s root task via Reactor.root_task and use it as their @parent_task. Blocking operations from the main thread are dispatched to the IO thread via Reactor.run.
Constant Summary collapse
- THREAD_NAME =
'omq-io'
Class Method Summary collapse
-
.root_task ⇒ Async::Task
Returns the root Async task inside the shared IO thread.
-
.run(timeout: nil, &block) ⇒ Object
Runs a block inside the Async reactor.
-
.stop! ⇒ void
Stops the shared IO thread.
-
.track_linger(seconds) ⇒ Object
Registers a socket’s linger value.
-
.untrack_linger(seconds) ⇒ Object
Unregisters a socket’s linger value.
Class Method Details
.root_task ⇒ Async::Task
Returns the root Async task inside the shared IO thread. Starts the thread exactly once (double-checked lock).
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/omq/reactor.rb', line 33 def root_task return @root_task if @root_task @mutex.synchronize do return @root_task if @root_task ready = Thread::Queue.new @work_queue = Async::Queue.new @thread = Thread.new { run_reactor(ready) } @thread.name = THREAD_NAME @root_task = ready.pop at_exit { stop! } end @root_task end |
.run(timeout: nil, &block) ⇒ Object
Runs a block inside the Async reactor.
Inside an Async reactor: runs directly. Outside: dispatches to the shared IO thread and blocks the calling thread until the result is available.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/omq/reactor.rb', line 60 def run(timeout: nil, &block) task = Async::Task.current? if task if timeout task.with_timeout(timeout, IO::TimeoutError) { yield } else yield end else result = Async::Promise.new root_task # ensure started @work_queue.push([block, result, timeout]) result.wait end end |
.stop! ⇒ void
This method returns an undefined value.
Stops the shared IO thread.
102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/omq/reactor.rb', line 102 def stop! return unless @thread&.alive? max_linger = @lingers.empty? ? 0 : @lingers.keys.max @work_queue&.push(nil) @thread&.join(max_linger + 1) @thread = nil @root_task = nil @work_queue = nil @lingers = Hash.new(0) end |
.track_linger(seconds) ⇒ Object
Registers a socket’s linger value.
82 83 84 |
# File 'lib/omq/reactor.rb', line 82 def track_linger(seconds) @lingers[seconds || 0] += 1 end |
.untrack_linger(seconds) ⇒ Object
Unregisters a socket’s linger value.
91 92 93 94 95 |
# File 'lib/omq/reactor.rb', line 91 def untrack_linger(seconds) key = seconds || 0 @lingers[key] -= 1 @lingers.delete(key) if @lingers[key] <= 0 end |