Class: OMQ::FFI::Engine
- Inherits:
-
Object
- Object
- OMQ::FFI::Engine
- Defined in:
- lib/omq/ffi/engine.rb
Overview
FFI Engine — wraps a libzmq socket to implement the OMQ Engine contract.
A dedicated I/O thread owns the zmq_socket exclusively (libzmq sockets are not thread-safe). Send and recv flow through queues, with an IO pipe to wake the Async fiber scheduler.
Defined Under Namespace
Classes: RoutingStub
Constant Summary collapse
- L =
Libzmq
Instance Attribute Summary collapse
-
#all_peers_gone ⇒ Async::Promise
readonly
Resolved when all peers have disconnected.
-
#connections ⇒ Array
readonly
Active connections.
- #monitor_queue ⇒ Object writeonly
-
#on_io_thread ⇒ Boolean
(also: #on_io_thread?)
readonly
True when the engine’s parent task lives on the shared Reactor IO thread (i.e. not created under an Async task).
-
#options ⇒ Options
readonly
Socket options.
-
#parent_task ⇒ Async::Task?
readonly
Root of the engine’s task tree.
-
#peer_connected ⇒ Async::Promise
readonly
Resolved when the first peer connects.
- #reconnect_enabled ⇒ Object writeonly
-
#routing ⇒ RoutingStub
readonly
Subscription/group routing interface.
- #verbose_monitor ⇒ Object writeonly
Class Method Summary collapse
-
.linger_to_zmq_ms(linger) ⇒ Integer
Maps an OMQ
lingervalue (seconds, ornil/Float::INFINITYfor “wait forever”) to libzmq’s ZMQ_LINGER int milliseconds (-1 = infinite, 0 = drop, N = N ms).
Instance Method Summary collapse
-
#bind(endpoint) ⇒ URI::Generic
Binds the socket to the given endpoint.
-
#capture_parent_task(parent: nil) ⇒ void
Captures the current Async task as the parent for I/O scheduling.
-
#close ⇒ void
Closes the socket and shuts down the I/O thread.
-
#connect(endpoint) ⇒ URI::Generic
Connects the socket to the given endpoint.
-
#dequeue_recv ⇒ Array<String>
Dequeues the next received message, blocking until one is available.
-
#dequeue_recv_sentinel ⇒ void
Pushes a nil sentinel into the recv queue to unblock a waiting consumer.
-
#disconnect(endpoint) ⇒ void
Disconnects from the given endpoint.
-
#enqueue_send(parts) ⇒ void
Enqueues a multipart message for sending via the I/O thread.
-
#initialize(socket_type, options) ⇒ Engine
constructor
A new instance of Engine.
-
#send_cmd(cmd, *args) ⇒ Object
private
Send a control command to the I/O thread.
-
#subscribe(prefix) ⇒ void
Subscribes to a topic prefix (SUB/XSUB).
-
#subscriber_joined ⇒ Async::Promise
Resolved when a subscriber joins (PUB/XPUB).
-
#unbind(endpoint) ⇒ void
Unbinds from the given endpoint.
-
#unsubscribe(prefix) ⇒ void
Unsubscribes from a topic prefix (SUB/XSUB).
-
#wake_io_thread ⇒ void
Wakes the I/O thread via the internal pipe.
Constructor Details
#initialize(socket_type, options) ⇒ Engine
Returns a new instance of Engine.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/omq/ffi/engine.rb', line 109 def initialize(socket_type, ) @socket_type = socket_type @options = @peer_connected = Async::Promise.new @all_peers_gone = Async::Promise.new @connections = [] @closed = false @parent_task = nil @on_io_thread = false @zmq_socket = L.zmq_socket(OMQ::FFI.context, L::SOCKET_TYPES.fetch(@socket_type)) raise "zmq_socket failed: #{L.zmq_strerror(L.zmq_errno)}" if @zmq_socket.null? @routing = RoutingStub.new(self) # Queues for cross-thread communication @send_queue = Thread::Queue.new # main → io thread @recv_queue = Thread::Queue.new # io thread → main @cmd_queue = Thread::Queue.new # control commands → io thread # Signal pipe: io thread → Async fiber (message received) @recv_signal_r, @recv_signal_w = IO.pipe # Wake pipe: main thread → io thread (send/cmd enqueued) @wake_r, @wake_w = IO.pipe @io_thread = nil end |
Instance Attribute Details
#all_peers_gone ⇒ Async::Promise (readonly)
Returns resolved when all peers have disconnected.
27 28 29 |
# File 'lib/omq/ffi/engine.rb', line 27 def all_peers_gone @all_peers_gone end |
#connections ⇒ Array (readonly)
Returns active connections.
21 22 23 |
# File 'lib/omq/ffi/engine.rb', line 21 def connections @connections end |
#monitor_queue=(value) ⇒ Object (writeonly)
Monitor events are not yet emitted by the FFI backend; these writers exist so Socket#monitor can attach without raising. Wiring libzmq’s zmq_socket_monitor is a TODO.
40 41 42 |
# File 'lib/omq/ffi/engine.rb', line 40 def monitor_queue=(value) @monitor_queue = value end |
#on_io_thread ⇒ Boolean (readonly) Also known as: on_io_thread?
Returns true when the engine’s parent task lives on the shared Reactor IO thread (i.e. not created under an Async task). Writable/Readable check this to pick the fast path.
33 34 35 |
# File 'lib/omq/ffi/engine.rb', line 33 def on_io_thread @on_io_thread end |
#options ⇒ Options (readonly)
Returns socket options.
19 20 21 |
# File 'lib/omq/ffi/engine.rb', line 19 def @options end |
#parent_task ⇒ Async::Task? (readonly)
Returns root of the engine’s task tree.
29 30 31 |
# File 'lib/omq/ffi/engine.rb', line 29 def parent_task @parent_task end |
#peer_connected ⇒ Async::Promise (readonly)
Returns resolved when the first peer connects.
25 26 27 |
# File 'lib/omq/ffi/engine.rb', line 25 def peer_connected @peer_connected end |
#reconnect_enabled=(value) ⇒ Object (writeonly)
36 37 38 |
# File 'lib/omq/ffi/engine.rb', line 36 def reconnect_enabled=(value) @reconnect_enabled = value end |
#routing ⇒ RoutingStub (readonly)
Returns subscription/group routing interface.
23 24 25 |
# File 'lib/omq/ffi/engine.rb', line 23 def routing @routing end |
#verbose_monitor=(value) ⇒ Object (writeonly)
Monitor events are not yet emitted by the FFI backend; these writers exist so Socket#monitor can attach without raising. Wiring libzmq’s zmq_socket_monitor is a TODO.
40 41 42 |
# File 'lib/omq/ffi/engine.rb', line 40 def verbose_monitor=(value) @verbose_monitor = value end |
Class Method Details
.linger_to_zmq_ms(linger) ⇒ Integer
Maps an OMQ linger value (seconds, or nil/Float::INFINITY for “wait forever”) to libzmq’s ZMQ_LINGER int milliseconds (-1 = infinite, 0 = drop, N = N ms).
100 101 102 103 |
# File 'lib/omq/ffi/engine.rb', line 100 def self.linger_to_zmq_ms(linger) return -1 if linger.nil? || linger == Float::INFINITY (linger * 1000).to_i end |
Instance Method Details
#bind(endpoint) ⇒ URI::Generic
Binds the socket to the given endpoint.
146 147 148 149 150 151 152 153 |
# File 'lib/omq/ffi/engine.rb', line 146 def bind(endpoint) sync_identity send_cmd(:bind, endpoint) resolved = get_string_option(L::ZMQ_LAST_ENDPOINT) @connections << :ffi @peer_connected.resolve(:ffi) unless @peer_connected.resolved? URI.parse(resolved) end |
#capture_parent_task(parent: nil) ⇒ void
This method returns an undefined value.
Captures the current Async task as the parent for I/O scheduling. parent: is accepted for API compatibility with the pure-Ruby engine but has no effect: the FFI backend runs its own I/O thread and doesn’t participate in the Async barrier tree.
253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/omq/ffi/engine.rb', line 253 def capture_parent_task(parent: nil) return if @parent_task if parent @parent_task = parent elsif Async::Task.current? @parent_task = Async::Task.current else @parent_task = Reactor.root_task @on_io_thread = true Reactor.track_linger(@options.linger) end end |
#close ⇒ void
This method returns an undefined value.
Closes the socket and shuts down the I/O thread.
Honors ‘options.linger`:
nil → wait forever for Ruby-side queue to drain into libzmq
and for libzmq's own LINGER to flush to the network
0 → drop anything not yet in libzmq's kernel buffers, close fast
N → up to N seconds for drain + N + 1s grace for join
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/omq/ffi/engine.rb', line 221 def close return if @closed @closed = true if @io_thread @cmd_queue.push([:stop]) wake_io_thread linger = @options.linger if linger.nil? @io_thread.join elsif linger.zero? @io_thread.join(0.5) # fast path: zmq_close is non-blocking with LINGER=0 else @io_thread.join(linger + 1.0) end @io_thread.kill if @io_thread.alive? # hard stop if deadline exceeded else # IO thread never started — close socket directly L.zmq_close(@zmq_socket) end @recv_signal_r&.close rescue nil @recv_signal_w&.close rescue nil @wake_r&.close rescue nil @wake_w&.close rescue nil end |
#connect(endpoint) ⇒ URI::Generic
Connects the socket to the given endpoint.
160 161 162 163 164 165 166 |
# File 'lib/omq/ffi/engine.rb', line 160 def connect(endpoint) sync_identity send_cmd(:connect, endpoint) @connections << :ffi @peer_connected.resolve(:ffi) unless @peer_connected.resolved? URI.parse(endpoint) end |
#dequeue_recv ⇒ Array<String>
Dequeues the next received message, blocking until one is available.
285 286 287 288 |
# File 'lib/omq/ffi/engine.rb', line 285 def dequeue_recv ensure_io_thread end |
#dequeue_recv_sentinel ⇒ void
This method returns an undefined value.
Pushes a nil sentinel into the recv queue to unblock a waiting consumer.
294 295 296 297 |
# File 'lib/omq/ffi/engine.rb', line 294 def dequeue_recv_sentinel @recv_queue.push(nil) @recv_signal_w.write_nonblock(".", exception: false) rescue nil end |
#disconnect(endpoint) ⇒ void
This method returns an undefined value.
Disconnects from the given endpoint.
173 174 175 |
# File 'lib/omq/ffi/engine.rb', line 173 def disconnect(endpoint) send_cmd(:disconnect, endpoint) end |
#enqueue_send(parts) ⇒ void
This method returns an undefined value.
Enqueues a multipart message for sending via the I/O thread.
273 274 275 276 277 |
# File 'lib/omq/ffi/engine.rb', line 273 def enqueue_send(parts) ensure_io_thread @send_queue.push(parts) wake_io_thread end |
#send_cmd(cmd, *args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Send a control command to the I/O thread.
303 304 305 306 307 308 309 310 311 |
# File 'lib/omq/ffi/engine.rb', line 303 def send_cmd(cmd, *args) ensure_io_thread result = Thread::Queue.new @cmd_queue.push([cmd, args, result]) wake_io_thread r = result.pop raise r if r.is_a?(Exception) r end |
#subscribe(prefix) ⇒ void
This method returns an undefined value.
Subscribes to a topic prefix (SUB/XSUB). Delegates to the routing stub for API parity with the pure-Ruby Engine.
192 193 194 |
# File 'lib/omq/ffi/engine.rb', line 192 def subscribe(prefix) @routing.subscribe(prefix) end |
#subscriber_joined ⇒ Async::Promise
Returns resolved when a subscriber joins (PUB/XPUB).
207 208 209 |
# File 'lib/omq/ffi/engine.rb', line 207 def subscriber_joined @routing.subscriber_joined end |
#unbind(endpoint) ⇒ void
This method returns an undefined value.
Unbinds from the given endpoint.
182 183 184 |
# File 'lib/omq/ffi/engine.rb', line 182 def unbind(endpoint) send_cmd(:unbind, endpoint) end |
#unsubscribe(prefix) ⇒ void
This method returns an undefined value.
Unsubscribes from a topic prefix (SUB/XSUB).
201 202 203 |
# File 'lib/omq/ffi/engine.rb', line 201 def unsubscribe(prefix) @routing.unsubscribe(prefix) end |
#wake_io_thread ⇒ void
This method returns an undefined value.
Wakes the I/O thread via the internal pipe.
317 318 319 |
# File 'lib/omq/ffi/engine.rb', line 317 def wake_io_thread @wake_w.write_nonblock(".", exception: false) end |