Class: OMQ::FFI::Engine

Inherits:
Object
  • Object
show all
Defined in:
lib/omq/ffi/engine.rb

Overview

FFI Engine — wraps a libzmq socket to implement the OMQ Engine contract.

A dedicated I/O thread owns the zmq_socket exclusively (libzmq sockets are not thread-safe). Send and recv flow through queues, with an IO pipe to wake the Async fiber scheduler.

Defined Under Namespace

Classes: RoutingStub

Constant Summary collapse

L =
Libzmq

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket_type, options) ⇒ Engine

Returns a new instance of Engine.

Parameters:

  • socket_type (Symbol)

    e.g. :REQ, :PAIR

  • options (Options)


109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/omq/ffi/engine.rb', line 109

def initialize(socket_type, options)
  @socket_type    = socket_type
  @options        = options
  @peer_connected = Async::Promise.new
  @all_peers_gone = Async::Promise.new
  @connections    = []
  @closed         = false
  @parent_task    = nil
  @on_io_thread   = false

  @zmq_socket = L.zmq_socket(OMQ::FFI.context, L::SOCKET_TYPES.fetch(@socket_type))
  raise "zmq_socket failed: #{L.zmq_strerror(L.zmq_errno)}" if @zmq_socket.null?

  apply_options

  @routing = RoutingStub.new(self)

  # Queues for cross-thread communication
  @send_queue = Thread::Queue.new   # main → io thread
  @recv_queue = Thread::Queue.new   # io thread → main
  @cmd_queue  = Thread::Queue.new   # control commands → io thread

  # Signal pipe: io thread → Async fiber (message received)
  @recv_signal_r, @recv_signal_w = IO.pipe
  # Wake pipe: main thread → io thread (send/cmd enqueued)
  @wake_r, @wake_w = IO.pipe

  @io_thread = nil
end

Instance Attribute Details

#all_peers_goneAsync::Promise (readonly)

Returns resolved when all peers have disconnected.

Returns:

  • (Async::Promise)

    resolved when all peers have disconnected



27
28
29
# File 'lib/omq/ffi/engine.rb', line 27

def all_peers_gone
  @all_peers_gone
end

#connectionsArray (readonly)

Returns active connections.

Returns:

  • (Array)

    active connections



21
22
23
# File 'lib/omq/ffi/engine.rb', line 21

def connections
  @connections
end

#monitor_queue=(value) ⇒ Object (writeonly)

Note:

Monitor events are not yet emitted by the FFI backend; these writers exist so Socket#monitor can attach without raising. Wiring libzmq’s zmq_socket_monitor is a TODO.



40
41
42
# File 'lib/omq/ffi/engine.rb', line 40

def monitor_queue=(value)
  @monitor_queue = value
end

#on_io_threadBoolean (readonly) Also known as: on_io_thread?

Returns true when the engine’s parent task lives on the shared Reactor IO thread (i.e. not created under an Async task). Writable/Readable check this to pick the fast path.

Returns:

  • (Boolean)

    true when the engine’s parent task lives on the shared Reactor IO thread (i.e. not created under an Async task). Writable/Readable check this to pick the fast path.



33
34
35
# File 'lib/omq/ffi/engine.rb', line 33

def on_io_thread
  @on_io_thread
end

#optionsOptions (readonly)

Returns socket options.

Returns:

  • (Options)

    socket options



19
20
21
# File 'lib/omq/ffi/engine.rb', line 19

def options
  @options
end

#parent_taskAsync::Task? (readonly)

Returns root of the engine’s task tree.

Returns:

  • (Async::Task, nil)

    root of the engine’s task tree



29
30
31
# File 'lib/omq/ffi/engine.rb', line 29

def parent_task
  @parent_task
end

#peer_connectedAsync::Promise (readonly)

Returns resolved when the first peer connects.

Returns:

  • (Async::Promise)

    resolved when the first peer connects



25
26
27
# File 'lib/omq/ffi/engine.rb', line 25

def peer_connected
  @peer_connected
end

#reconnect_enabled=(value) ⇒ Object (writeonly)

Parameters:

  • value (Boolean)

    enables or disables automatic reconnection



36
37
38
# File 'lib/omq/ffi/engine.rb', line 36

def reconnect_enabled=(value)
  @reconnect_enabled = value
end

#routingRoutingStub (readonly)

Returns subscription/group routing interface.

Returns:

  • (RoutingStub)

    subscription/group routing interface



23
24
25
# File 'lib/omq/ffi/engine.rb', line 23

def routing
  @routing
end

#verbose_monitor=(value) ⇒ Object (writeonly)

Note:

Monitor events are not yet emitted by the FFI backend; these writers exist so Socket#monitor can attach without raising. Wiring libzmq’s zmq_socket_monitor is a TODO.



40
41
42
# File 'lib/omq/ffi/engine.rb', line 40

def verbose_monitor=(value)
  @verbose_monitor = value
end

Class Method Details

.linger_to_zmq_ms(linger) ⇒ Integer

Maps an OMQ linger value (seconds, or nil/Float::INFINITY for “wait forever”) to libzmq’s ZMQ_LINGER int milliseconds (-1 = infinite, 0 = drop, N = N ms).

Parameters:

  • linger (Numeric, nil)

Returns:

  • (Integer)


100
101
102
103
# File 'lib/omq/ffi/engine.rb', line 100

def self.linger_to_zmq_ms(linger)
  return -1 if linger.nil? || linger == Float::INFINITY
  (linger * 1000).to_i
end

Instance Method Details

#bind(endpoint) ⇒ URI::Generic

Binds the socket to the given endpoint.

Parameters:

  • endpoint (String)

    ZMQ endpoint URL (e.g. “tcp://*:5555”)

Returns:

  • (URI::Generic)

    resolved endpoint URI (with auto-selected port for “tcp://host:0”)



146
147
148
149
150
151
152
153
# File 'lib/omq/ffi/engine.rb', line 146

def bind(endpoint)
  sync_identity
  send_cmd(:bind, endpoint)
  resolved = get_string_option(L::ZMQ_LAST_ENDPOINT)
  @connections << :ffi
  @peer_connected.resolve(:ffi) unless @peer_connected.resolved?
  URI.parse(resolved)
end

#capture_parent_task(parent: nil) ⇒ void

This method returns an undefined value.

Captures the current Async task as the parent for I/O scheduling. parent: is accepted for API compatibility with the pure-Ruby engine but has no effect: the FFI backend runs its own I/O thread and doesn’t participate in the Async barrier tree.



253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/omq/ffi/engine.rb', line 253

def capture_parent_task(parent: nil)
  return if @parent_task
  if parent
    @parent_task = parent
  elsif Async::Task.current?
    @parent_task = Async::Task.current
  else
    @parent_task  = Reactor.root_task
    @on_io_thread = true
    Reactor.track_linger(@options.linger)
  end
end

#closevoid

This method returns an undefined value.

Closes the socket and shuts down the I/O thread.

Honors ‘options.linger`:

nil → wait forever for Ruby-side queue to drain into libzmq
      and for libzmq's own LINGER to flush to the network
0   → drop anything not yet in libzmq's kernel buffers, close fast
N   → up to N seconds for drain + N + 1s grace for join


221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/omq/ffi/engine.rb', line 221

def close
  return if @closed
  @closed = true
  if @io_thread
    @cmd_queue.push([:stop])
    wake_io_thread
    linger = @options.linger
    if linger.nil?
      @io_thread.join
    elsif linger.zero?
      @io_thread.join(0.5) # fast path: zmq_close is non-blocking with LINGER=0
    else
      @io_thread.join(linger + 1.0)
    end
    @io_thread.kill if @io_thread.alive? # hard stop if deadline exceeded
  else
    # IO thread never started — close socket directly
    L.zmq_close(@zmq_socket)
  end
  @recv_signal_r&.close rescue nil
  @recv_signal_w&.close rescue nil
  @wake_r&.close rescue nil
  @wake_w&.close rescue nil
end

#connect(endpoint) ⇒ URI::Generic

Connects the socket to the given endpoint.

Parameters:

  • endpoint (String)

    ZMQ endpoint URL

Returns:

  • (URI::Generic)

    parsed endpoint URI



160
161
162
163
164
165
166
# File 'lib/omq/ffi/engine.rb', line 160

def connect(endpoint)
  sync_identity
  send_cmd(:connect, endpoint)
  @connections << :ffi
  @peer_connected.resolve(:ffi) unless @peer_connected.resolved?
  URI.parse(endpoint)
end

#dequeue_recvArray<String>

Dequeues the next received message, blocking until one is available.

Returns:

  • (Array<String>)

    multipart message



285
286
287
288
# File 'lib/omq/ffi/engine.rb', line 285

def dequeue_recv
  ensure_io_thread
  wait_for_message
end

#dequeue_recv_sentinelvoid

This method returns an undefined value.

Pushes a nil sentinel into the recv queue to unblock a waiting consumer.



294
295
296
297
# File 'lib/omq/ffi/engine.rb', line 294

def dequeue_recv_sentinel
  @recv_queue.push(nil)
  @recv_signal_w.write_nonblock(".", exception: false) rescue nil
end

#disconnect(endpoint) ⇒ void

This method returns an undefined value.

Disconnects from the given endpoint.

Parameters:

  • endpoint (String)

    ZMQ endpoint URL



173
174
175
# File 'lib/omq/ffi/engine.rb', line 173

def disconnect(endpoint)
  send_cmd(:disconnect, endpoint)
end

#enqueue_send(parts) ⇒ void

This method returns an undefined value.

Enqueues a multipart message for sending via the I/O thread.

Parameters:

  • parts (Array<String>)

    message frames



273
274
275
276
277
# File 'lib/omq/ffi/engine.rb', line 273

def enqueue_send(parts)
  ensure_io_thread
  @send_queue.push(parts)
  wake_io_thread
end

#send_cmd(cmd, *args) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Send a control command to the I/O thread.



303
304
305
306
307
308
309
310
311
# File 'lib/omq/ffi/engine.rb', line 303

def send_cmd(cmd, *args)
  ensure_io_thread
  result = Thread::Queue.new
  @cmd_queue.push([cmd, args, result])
  wake_io_thread
  r = result.pop
  raise r if r.is_a?(Exception)
  r
end

#subscribe(prefix) ⇒ void

This method returns an undefined value.

Subscribes to a topic prefix (SUB/XSUB). Delegates to the routing stub for API parity with the pure-Ruby Engine.

Parameters:

  • prefix (String)


192
193
194
# File 'lib/omq/ffi/engine.rb', line 192

def subscribe(prefix)
  @routing.subscribe(prefix)
end

#subscriber_joinedAsync::Promise

Returns resolved when a subscriber joins (PUB/XPUB).

Returns:

  • (Async::Promise)

    resolved when a subscriber joins (PUB/XPUB).



207
208
209
# File 'lib/omq/ffi/engine.rb', line 207

def subscriber_joined
  @routing.subscriber_joined
end

#unbind(endpoint) ⇒ void

This method returns an undefined value.

Unbinds from the given endpoint.

Parameters:

  • endpoint (String)

    ZMQ endpoint URL



182
183
184
# File 'lib/omq/ffi/engine.rb', line 182

def unbind(endpoint)
  send_cmd(:unbind, endpoint)
end

#unsubscribe(prefix) ⇒ void

This method returns an undefined value.

Unsubscribes from a topic prefix (SUB/XSUB).

Parameters:

  • prefix (String)


201
202
203
# File 'lib/omq/ffi/engine.rb', line 201

def unsubscribe(prefix)
  @routing.unsubscribe(prefix)
end

#wake_io_threadvoid

This method returns an undefined value.

Wakes the I/O thread via the internal pipe.



317
318
319
# File 'lib/omq/ffi/engine.rb', line 317

def wake_io_thread
  @wake_w.write_nonblock(".", exception: false)
end