Class: OMQ::FFI::Engine
- Inherits:
-
Object
- Object
- OMQ::FFI::Engine
- Defined in:
- lib/omq/ffi/engine.rb
Overview
FFI Engine — wraps a libzmq socket to implement the OMQ Engine contract.
A dedicated I/O thread owns the zmq_socket exclusively (libzmq sockets are not thread-safe). Send and recv flow through queues, with an IO pipe to wake the Async fiber scheduler.
Defined Under Namespace
Classes: RoutingStub
Constant Summary collapse
- L =
Libzmq
Instance Attribute Summary collapse
-
#all_peers_gone ⇒ Async::Promise
readonly
Resolved when all peers have disconnected.
-
#connections ⇒ Array
readonly
Active connections.
- #monitor_queue ⇒ Object writeonly
-
#options ⇒ Options
readonly
Socket options.
-
#parent_task ⇒ Async::Task?
readonly
Root of the engine’s task tree.
-
#peer_connected ⇒ Async::Promise
readonly
Resolved when the first peer connects.
- #reconnect_enabled ⇒ Object writeonly
-
#routing ⇒ RoutingStub
readonly
Subscription/group routing interface.
- #verbose_monitor ⇒ Object writeonly
Class Method Summary collapse
-
.linger_to_zmq_ms(linger) ⇒ Integer
Maps an OMQ
lingervalue (seconds, ornil/Float::INFINITYfor “wait forever”) to libzmq’s ZMQ_LINGER int milliseconds (-1 = infinite, 0 = drop, N = N ms).
Instance Method Summary collapse
-
#bind(endpoint) ⇒ URI::Generic
Binds the socket to the given endpoint.
-
#capture_parent_task(parent: nil) ⇒ void
Captures the current Async task as the parent for I/O scheduling.
-
#close ⇒ void
Closes the socket and shuts down the I/O thread.
-
#connect(endpoint) ⇒ URI::Generic
Connects the socket to the given endpoint.
-
#dequeue_recv ⇒ Array<String>
Dequeues the next received message, blocking until one is available.
-
#dequeue_recv_sentinel ⇒ void
Pushes a nil sentinel into the recv queue to unblock a waiting consumer.
-
#disconnect(endpoint) ⇒ void
Disconnects from the given endpoint.
-
#enqueue_send(parts) ⇒ void
Enqueues a multipart message for sending via the I/O thread.
-
#initialize(socket_type, options) ⇒ Engine
constructor
A new instance of Engine.
-
#send_cmd(cmd, *args) ⇒ Object
private
Send a control command to the I/O thread.
-
#subscribe(prefix) ⇒ void
Subscribes to a topic prefix (SUB/XSUB).
-
#subscriber_joined ⇒ Async::Promise
Resolved when a subscriber joins (PUB/XPUB).
-
#unbind(endpoint) ⇒ void
Unbinds from the given endpoint.
-
#unsubscribe(prefix) ⇒ void
Unsubscribes from a topic prefix (SUB/XSUB).
-
#wake_io_thread ⇒ void
Wakes the I/O thread via the internal pipe.
Constructor Details
#initialize(socket_type, options) ⇒ Engine
Returns a new instance of Engine.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/omq/ffi/engine.rb', line 104 def initialize(socket_type, ) @socket_type = socket_type @options = @peer_connected = Async::Promise.new @all_peers_gone = Async::Promise.new @connections = [] @closed = false @parent_task = nil @on_io_thread = false @zmq_socket = L.zmq_socket(OMQ::FFI.context, L::SOCKET_TYPES.fetch(@socket_type)) raise "zmq_socket failed: #{L.zmq_strerror(L.zmq_errno)}" if @zmq_socket.null? @routing = RoutingStub.new(self) # Queues for cross-thread communication @send_queue = Thread::Queue.new # main → io thread @recv_queue = Thread::Queue.new # io thread → main @cmd_queue = Thread::Queue.new # control commands → io thread # Signal pipe: io thread → Async fiber (message received) @recv_signal_r, @recv_signal_w = IO.pipe # Wake pipe: main thread → io thread (send/cmd enqueued) @wake_r, @wake_w = IO.pipe @io_thread = nil end |
Instance Attribute Details
#all_peers_gone ⇒ Async::Promise (readonly)
Returns resolved when all peers have disconnected.
27 28 29 |
# File 'lib/omq/ffi/engine.rb', line 27 def all_peers_gone @all_peers_gone end |
#connections ⇒ Array (readonly)
Returns active connections.
21 22 23 |
# File 'lib/omq/ffi/engine.rb', line 21 def connections @connections end |
#monitor_queue=(value) ⇒ Object (writeonly)
Monitor events are not yet emitted by the FFI backend; these writers exist so Socket#monitor can attach without raising. Wiring libzmq’s zmq_socket_monitor is a TODO.
35 36 37 |
# File 'lib/omq/ffi/engine.rb', line 35 def monitor_queue=(value) @monitor_queue = value end |
#options ⇒ Options (readonly)
Returns socket options.
19 20 21 |
# File 'lib/omq/ffi/engine.rb', line 19 def @options end |
#parent_task ⇒ Async::Task? (readonly)
Returns root of the engine’s task tree.
29 30 31 |
# File 'lib/omq/ffi/engine.rb', line 29 def parent_task @parent_task end |
#peer_connected ⇒ Async::Promise (readonly)
Returns resolved when the first peer connects.
25 26 27 |
# File 'lib/omq/ffi/engine.rb', line 25 def peer_connected @peer_connected end |
#reconnect_enabled=(value) ⇒ Object (writeonly)
31 32 33 |
# File 'lib/omq/ffi/engine.rb', line 31 def reconnect_enabled=(value) @reconnect_enabled = value end |
#routing ⇒ RoutingStub (readonly)
Returns subscription/group routing interface.
23 24 25 |
# File 'lib/omq/ffi/engine.rb', line 23 def routing @routing end |
#verbose_monitor=(value) ⇒ Object (writeonly)
Monitor events are not yet emitted by the FFI backend; these writers exist so Socket#monitor can attach without raising. Wiring libzmq’s zmq_socket_monitor is a TODO.
35 36 37 |
# File 'lib/omq/ffi/engine.rb', line 35 def verbose_monitor=(value) @verbose_monitor = value end |
Class Method Details
.linger_to_zmq_ms(linger) ⇒ Integer
Maps an OMQ linger value (seconds, or nil/Float::INFINITY for “wait forever”) to libzmq’s ZMQ_LINGER int milliseconds (-1 = infinite, 0 = drop, N = N ms).
95 96 97 98 |
# File 'lib/omq/ffi/engine.rb', line 95 def self.linger_to_zmq_ms(linger) return -1 if linger.nil? || linger == Float::INFINITY (linger * 1000).to_i end |
Instance Method Details
#bind(endpoint) ⇒ URI::Generic
Binds the socket to the given endpoint.
141 142 143 144 145 146 147 148 |
# File 'lib/omq/ffi/engine.rb', line 141 def bind(endpoint) sync_identity send_cmd(:bind, endpoint) resolved = get_string_option(L::ZMQ_LAST_ENDPOINT) @connections << :ffi @peer_connected.resolve(:ffi) unless @peer_connected.resolved? URI.parse(resolved) end |
#capture_parent_task(parent: nil) ⇒ void
This method returns an undefined value.
Captures the current Async task as the parent for I/O scheduling. parent: is accepted for API compatibility with the pure-Ruby engine but has no effect: the FFI backend runs its own I/O thread and doesn’t participate in the Async barrier tree.
248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/omq/ffi/engine.rb', line 248 def capture_parent_task(parent: nil) return if @parent_task if parent @parent_task = parent elsif Async::Task.current? @parent_task = Async::Task.current else @parent_task = Reactor.root_task @on_io_thread = true Reactor.track_linger(@options.linger) end end |
#close ⇒ void
This method returns an undefined value.
Closes the socket and shuts down the I/O thread.
Honors ‘options.linger`:
nil → wait forever for Ruby-side queue to drain into libzmq
and for libzmq's own LINGER to flush to the network
0 → drop anything not yet in libzmq's kernel buffers, close fast
N → up to N seconds for drain + N + 1s grace for join
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/omq/ffi/engine.rb', line 216 def close return if @closed @closed = true if @io_thread @cmd_queue.push([:stop]) wake_io_thread linger = @options.linger if linger.nil? @io_thread.join elsif linger.zero? @io_thread.join(0.5) # fast path: zmq_close is non-blocking with LINGER=0 else @io_thread.join(linger + 1.0) end @io_thread.kill if @io_thread.alive? # hard stop if deadline exceeded else # IO thread never started — close socket directly L.zmq_close(@zmq_socket) end @recv_signal_r&.close rescue nil @recv_signal_w&.close rescue nil @wake_r&.close rescue nil @wake_w&.close rescue nil end |
#connect(endpoint) ⇒ URI::Generic
Connects the socket to the given endpoint.
155 156 157 158 159 160 161 |
# File 'lib/omq/ffi/engine.rb', line 155 def connect(endpoint) sync_identity send_cmd(:connect, endpoint) @connections << :ffi @peer_connected.resolve(:ffi) unless @peer_connected.resolved? URI.parse(endpoint) end |
#dequeue_recv ⇒ Array<String>
Dequeues the next received message, blocking until one is available.
280 281 282 283 |
# File 'lib/omq/ffi/engine.rb', line 280 def dequeue_recv ensure_io_thread end |
#dequeue_recv_sentinel ⇒ void
This method returns an undefined value.
Pushes a nil sentinel into the recv queue to unblock a waiting consumer.
289 290 291 292 |
# File 'lib/omq/ffi/engine.rb', line 289 def dequeue_recv_sentinel @recv_queue.push(nil) @recv_signal_w.write_nonblock(".", exception: false) rescue nil end |
#disconnect(endpoint) ⇒ void
This method returns an undefined value.
Disconnects from the given endpoint.
168 169 170 |
# File 'lib/omq/ffi/engine.rb', line 168 def disconnect(endpoint) send_cmd(:disconnect, endpoint) end |
#enqueue_send(parts) ⇒ void
This method returns an undefined value.
Enqueues a multipart message for sending via the I/O thread.
268 269 270 271 272 |
# File 'lib/omq/ffi/engine.rb', line 268 def enqueue_send(parts) ensure_io_thread @send_queue.push(parts) wake_io_thread end |
#send_cmd(cmd, *args) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Send a control command to the I/O thread.
298 299 300 301 302 303 304 305 306 |
# File 'lib/omq/ffi/engine.rb', line 298 def send_cmd(cmd, *args) ensure_io_thread result = Thread::Queue.new @cmd_queue.push([cmd, args, result]) wake_io_thread r = result.pop raise r if r.is_a?(Exception) r end |
#subscribe(prefix) ⇒ void
This method returns an undefined value.
Subscribes to a topic prefix (SUB/XSUB). Delegates to the routing stub for API parity with the pure-Ruby Engine.
187 188 189 |
# File 'lib/omq/ffi/engine.rb', line 187 def subscribe(prefix) @routing.subscribe(prefix) end |
#subscriber_joined ⇒ Async::Promise
Returns resolved when a subscriber joins (PUB/XPUB).
202 203 204 |
# File 'lib/omq/ffi/engine.rb', line 202 def subscriber_joined @routing.subscriber_joined end |
#unbind(endpoint) ⇒ void
This method returns an undefined value.
Unbinds from the given endpoint.
177 178 179 |
# File 'lib/omq/ffi/engine.rb', line 177 def unbind(endpoint) send_cmd(:unbind, endpoint) end |
#unsubscribe(prefix) ⇒ void
This method returns an undefined value.
Unsubscribes from a topic prefix (SUB/XSUB).
196 197 198 |
# File 'lib/omq/ffi/engine.rb', line 196 def unsubscribe(prefix) @routing.unsubscribe(prefix) end |
#wake_io_thread ⇒ void
This method returns an undefined value.
Wakes the I/O thread via the internal pipe.
312 313 314 |
# File 'lib/omq/ffi/engine.rb', line 312 def wake_io_thread @wake_w.write_nonblock(".", exception: false) end |