Class: OMQ::FFI::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/ffi/engine.rb

Overview

FFI Engine — wraps a libzmq socket to implement the OMQ Engine contract.

A dedicated I/O thread owns the zmq_socket exclusively (libzmq sockets are not thread-safe). Send and recv flow through queues, with an IO pipe to wake the Async fiber scheduler.

Defined Under Namespace

Classes: RoutingStub

Constant Summary collapse

L =
Libzmq

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket_type, options) ⇒ Engine

Returns a new instance of Engine.

Parameters:

  • socket_type (Symbol)

    e.g. :REQ, :PAIR

  • options (Options)


104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/omq/ffi/engine.rb', line 104

def initialize(socket_type, options)
  @socket_type    = socket_type
  @options        = options
  @peer_connected = Async::Promise.new
  @all_peers_gone = Async::Promise.new
  @connections    = []
  @closed         = false
  @parent_task    = nil
  @on_io_thread   = false

  @zmq_socket = L.zmq_socket(OMQ::FFI.context, L::SOCKET_TYPES.fetch(@socket_type))
  raise "zmq_socket failed: #{L.zmq_strerror(L.zmq_errno)}" if @zmq_socket.null?

  apply_options

  @routing = RoutingStub.new(self)

  # Queues for cross-thread communication
  @send_queue = Thread::Queue.new   # main → io thread
  @recv_queue = Thread::Queue.new   # io thread → main
  @cmd_queue  = Thread::Queue.new   # control commands → io thread

  # Signal pipe: io thread → Async fiber (message received)
  @recv_signal_r, @recv_signal_w = IO.pipe
  # Wake pipe: main thread → io thread (send/cmd enqueued)
  @wake_r, @wake_w = IO.pipe

  @io_thread = nil
end

Instance Attribute Details

#all_peers_goneAsync::Promise (readonly)

Returns resolved when all peers have disconnected.

Returns:

  • (Async::Promise)

    resolved when all peers have disconnected



27
28
29
# File 'lib/omq/ffi/engine.rb', line 27

def all_peers_gone
  @all_peers_gone
end

#connectionsArray (readonly)

Returns active connections.

Returns:

  • (Array)

    active connections



21
22
23
# File 'lib/omq/ffi/engine.rb', line 21

def connections
  @connections
end

#monitor_queue=(value) ⇒ Object (writeonly)

Note:

Monitor events are not yet emitted by the FFI backend; these writers exist so Socket#monitor can attach without raising. Wiring libzmq’s zmq_socket_monitor is a TODO.



35
36
37
# File 'lib/omq/ffi/engine.rb', line 35

def monitor_queue=(value)
  @monitor_queue = value
end

#optionsOptions (readonly)

Returns socket options.

Returns:

  • (Options)

    socket options



19
20
21
# File 'lib/omq/ffi/engine.rb', line 19

def options
  @options
end

#parent_taskAsync::Task? (readonly)

Returns root of the engine’s task tree.

Returns:

  • (Async::Task, nil)

    root of the engine’s task tree



29
30
31
# File 'lib/omq/ffi/engine.rb', line 29

def parent_task
  @parent_task
end

#peer_connectedAsync::Promise (readonly)

Returns resolved when the first peer connects.

Returns:

  • (Async::Promise)

    resolved when the first peer connects



25
26
27
# File 'lib/omq/ffi/engine.rb', line 25

def peer_connected
  @peer_connected
end

#reconnect_enabled=(value) ⇒ Object (writeonly)

Parameters:

  • value (Boolean)

    enables or disables automatic reconnection



31
32
33
# File 'lib/omq/ffi/engine.rb', line 31

def reconnect_enabled=(value)
  @reconnect_enabled = value
end

#routingRoutingStub (readonly)

Returns subscription/group routing interface.

Returns:

  • (RoutingStub)

    subscription/group routing interface



23
24
25
# File 'lib/omq/ffi/engine.rb', line 23

def routing
  @routing
end

#verbose_monitor=(value) ⇒ Object (writeonly)

Note:

Monitor events are not yet emitted by the FFI backend; these writers exist so Socket#monitor can attach without raising. Wiring libzmq’s zmq_socket_monitor is a TODO.



35
36
37
# File 'lib/omq/ffi/engine.rb', line 35

def verbose_monitor=(value)
  @verbose_monitor = value
end

Class Method Details

.linger_to_zmq_ms(linger) ⇒ Integer

Maps an OMQ linger value (seconds, or nil/Float::INFINITY for “wait forever”) to libzmq’s ZMQ_LINGER int milliseconds (-1 = infinite, 0 = drop, N = N ms).

Parameters:

  • linger (Numeric, nil)

Returns:

  • (Integer)


95
96
97
98
# File 'lib/omq/ffi/engine.rb', line 95

def self.linger_to_zmq_ms(linger)
  return -1 if linger.nil? || linger == Float::INFINITY
  (linger * 1000).to_i
end

Instance Method Details

#bind(endpoint) ⇒ URI::Generic

Binds the socket to the given endpoint.

Parameters:

  • endpoint (String)

    ZMQ endpoint URL (e.g. “tcp://*:5555”)

Returns:

  • (URI::Generic)

    resolved endpoint URI (with auto-selected port for “tcp://host:0”)



141
142
143
144
145
146
147
148
# File 'lib/omq/ffi/engine.rb', line 141

def bind(endpoint)
  sync_identity
  send_cmd(:bind, endpoint)
  resolved = get_string_option(L::ZMQ_LAST_ENDPOINT)
  @connections << :ffi
  @peer_connected.resolve(:ffi) unless @peer_connected.resolved?
  URI.parse(resolved)
end

#capture_parent_task(parent: nil) ⇒ void

This method returns an undefined value.

Captures the current Async task as the parent for I/O scheduling. parent: is accepted for API compatibility with the pure-Ruby engine but has no effect: the FFI backend runs its own I/O thread and doesn’t participate in the Async barrier tree.



248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/omq/ffi/engine.rb', line 248

def capture_parent_task(parent: nil)
  return if @parent_task
  if parent
    @parent_task = parent
  elsif Async::Task.current?
    @parent_task = Async::Task.current
  else
    @parent_task  = Reactor.root_task
    @on_io_thread = true
    Reactor.track_linger(@options.linger)
  end
end

#closevoid

This method returns an undefined value.

Closes the socket and shuts down the I/O thread.

Honors ‘options.linger`:

nil → wait forever for Ruby-side queue to drain into libzmq
      and for libzmq's own LINGER to flush to the network
0   → drop anything not yet in libzmq's kernel buffers, close fast
N   → up to N seconds for drain + N + 1s grace for join


216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/omq/ffi/engine.rb', line 216

def close
  return if @closed
  @closed = true
  if @io_thread
    @cmd_queue.push([:stop])
    wake_io_thread
    linger = @options.linger
    if linger.nil?
      @io_thread.join
    elsif linger.zero?
      @io_thread.join(0.5) # fast path: zmq_close is non-blocking with LINGER=0
    else
      @io_thread.join(linger + 1.0)
    end
    @io_thread.kill if @io_thread.alive? # hard stop if deadline exceeded
  else
    # IO thread never started — close socket directly
    L.zmq_close(@zmq_socket)
  end
  @recv_signal_r&.close rescue nil
  @recv_signal_w&.close rescue nil
  @wake_r&.close rescue nil
  @wake_w&.close rescue nil
end

#connect(endpoint) ⇒ URI::Generic

Connects the socket to the given endpoint.

Parameters:

  • endpoint (String)

    ZMQ endpoint URL

Returns:

  • (URI::Generic)

    parsed endpoint URI



155
156
157
158
159
160
161
# File 'lib/omq/ffi/engine.rb', line 155

def connect(endpoint)
  sync_identity
  send_cmd(:connect, endpoint)
  @connections << :ffi
  @peer_connected.resolve(:ffi) unless @peer_connected.resolved?
  URI.parse(endpoint)
end

#dequeue_recvArray<String>

Dequeues the next received message, blocking until one is available.

Returns:

  • (Array<String>)

    multipart message



280
281
282
283
# File 'lib/omq/ffi/engine.rb', line 280

def dequeue_recv
  ensure_io_thread
  wait_for_message
end

#dequeue_recv_sentinelvoid

This method returns an undefined value.

Pushes a nil sentinel into the recv queue to unblock a waiting consumer.



289
290
291
292
# File 'lib/omq/ffi/engine.rb', line 289

def dequeue_recv_sentinel
  @recv_queue.push(nil)
  @recv_signal_w.write_nonblock(".", exception: false) rescue nil
end

#disconnect(endpoint) ⇒ void

This method returns an undefined value.

Disconnects from the given endpoint.

Parameters:

  • endpoint (String)

    ZMQ endpoint URL



168
169
170
# File 'lib/omq/ffi/engine.rb', line 168

def disconnect(endpoint)
  send_cmd(:disconnect, endpoint)
end

#enqueue_send(parts) ⇒ void

This method returns an undefined value.

Enqueues a multipart message for sending via the I/O thread.

Parameters:

  • parts (Array<String>)

    message frames



268
269
270
271
272
# File 'lib/omq/ffi/engine.rb', line 268

def enqueue_send(parts)
  ensure_io_thread
  @send_queue.push(parts)
  wake_io_thread
end

#send_cmd(cmd, *args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Send a control command to the I/O thread.



298
299
300
301
302
303
304
305
306
# File 'lib/omq/ffi/engine.rb', line 298

def send_cmd(cmd, *args)
  ensure_io_thread
  result = Thread::Queue.new
  @cmd_queue.push([cmd, args, result])
  wake_io_thread
  r = result.pop
  raise r if r.is_a?(Exception)
  r
end

#subscribe(prefix) ⇒ void

This method returns an undefined value.

Subscribes to a topic prefix (SUB/XSUB). Delegates to the routing stub for API parity with the pure-Ruby Engine.

Parameters:

  • prefix (String)


187
188
189
# File 'lib/omq/ffi/engine.rb', line 187

def subscribe(prefix)
  @routing.subscribe(prefix)
end

#subscriber_joinedAsync::Promise

Returns resolved when a subscriber joins (PUB/XPUB).

Returns:

  • (Async::Promise)

    resolved when a subscriber joins (PUB/XPUB).



202
203
204
# File 'lib/omq/ffi/engine.rb', line 202

def subscriber_joined
  @routing.subscriber_joined
end

#unbind(endpoint) ⇒ void

This method returns an undefined value.

Unbinds from the given endpoint.

Parameters:

  • endpoint (String)

    ZMQ endpoint URL



177
178
179
# File 'lib/omq/ffi/engine.rb', line 177

def unbind(endpoint)
  send_cmd(:unbind, endpoint)
end

#unsubscribe(prefix) ⇒ void

This method returns an undefined value.

Unsubscribes from a topic prefix (SUB/XSUB).

Parameters:

  • prefix (String)


196
197
198
# File 'lib/omq/ffi/engine.rb', line 196

def unsubscribe(prefix)
  @routing.unsubscribe(prefix)
end

#wake_io_threadvoid

This method returns an undefined value.

Wakes the I/O thread via the internal pipe.



312
313
314
# File 'lib/omq/ffi/engine.rb', line 312

def wake_io_thread
  @wake_w.write_nonblock(".", exception: false)
end