Class: OMQ::Engine
- Inherits:
-
Object
- Object
- OMQ::Engine
- Defined in:
- lib/omq/engine.rb,
lib/omq/engine/heartbeat.rb,
lib/omq/engine/reconnect.rb,
lib/omq/engine/recv_pump.rb,
lib/omq/engine/maintenance.rb,
lib/omq/engine/socket_lifecycle.rb,
lib/omq/engine/connection_lifecycle.rb
Overview
Per-socket orchestrator.
Manages connections, transports, and the routing strategy for one OMQ::Socket instance. Each socket type creates one Engine.
Defined Under Namespace
Modules: Heartbeat, Maintenance Classes: ConnectionLifecycle, Reconnect, RecvPump, SocketLifecycle
Class Attribute Summary collapse
-
.transports ⇒ Hash{String => Module}
readonly
Registered transports.
Instance Attribute Summary collapse
-
#connection_wrapper ⇒ Object
Optional proc that wraps new connections (e.g. for serialization).
-
#connections ⇒ Hash{Connection => ConnectionLifecycle}
readonly
Active connections.
-
#lifecycle ⇒ SocketLifecycle
readonly
Socket-level state + signaling.
-
#listeners ⇒ Hash{String => Listener}
readonly
Active listeners keyed by resolved endpoint.
-
#monitor_queue ⇒ Object
writeonly
Sets the attribute monitor_queue.
-
#options ⇒ Options
readonly
Socket options.
-
#socket_type ⇒ Symbol
readonly
Socket type (e.g. :REQ, :PAIR).
-
#verbose_monitor ⇒ Boolean
When true, every monitor event is also printed to stderr for debugging.
Instance Method Summary collapse
- #all_peers_gone ⇒ Object
- #barrier ⇒ Object
-
#bind(endpoint, parent: nil, **opts) ⇒ URI::Generic
Binds to an endpoint.
-
#build_fatal_error(error) ⇒ Object
Constructs a SocketDeadError whose
causeiserror. -
#capture_parent_task(parent: nil) ⇒ Object
Captures the socket’s task tree root and starts the socket-level maintenance task.
-
#close ⇒ void
Closes all connections and listeners gracefully.
- #closed? ⇒ Boolean
-
#connect(endpoint, parent: nil, **opts) ⇒ URI::Generic
Connects to an endpoint.
-
#connection_lost(connection) ⇒ void
Called when a connection is lost.
-
#connection_ready(pipe, endpoint: nil) ⇒ void
Called by inproc transport with a pre-validated DirectPipe.
-
#dequeue_recv ⇒ Array<String>
Dequeues the next received message.
-
#dequeue_recv_sentinel ⇒ Object
Pushes a nil sentinel into the recv queue, unblocking a pending #dequeue_recv with a nil return value.
-
#disconnect(endpoint) ⇒ void
Disconnects from an endpoint.
-
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ void
Emits a lifecycle event to the monitor queue, if one is attached.
-
#emit_verbose_monitor_event(type, **detail) ⇒ void
Emits a verbose-only monitor event (e.g. message traces).
-
#emit_verbose_msg_received(conn, parts) ⇒ Object
Emits a :message_received verbose event and enriches it with the on-wire (pre-decompression) byte size if
connexposeslast_wire_size_in. -
#emit_verbose_msg_sent(conn, parts) ⇒ Object
Emits a :message_sent verbose event and enriches it with the on-wire (post-compression) byte size if
connexposeslast_wire_size_out(installed by ZMTP-Zstd etc.). -
#enqueue_send(parts) ⇒ void
Enqueues a message for sending.
-
#handle_accepted(io, endpoint: nil) ⇒ void
Called by a transport when an incoming connection is accepted.
-
#handle_connected(io, endpoint: nil) ⇒ void
Called by a transport when an outgoing connection is established.
-
#initialize(socket_type, options) ⇒ Engine
constructor
A new instance of Engine.
-
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for
endpointif auto-reconnect is enabled and the endpoint is still dialed. -
#maybe_resolve_all_peers_gone ⇒ Object
Resolves ‘all_peers_gone` if we had peers and now have none.
- #parent_task ⇒ Object
-
#peer_connected ⇒ Object
Delegated to SocketLifecycle.
-
#reconnect_enabled=(value) ⇒ Object
Enables or disables auto-reconnect for dropped connections.
-
#record_disconnect_reason(conn, error) ⇒ Object
Records the disconnect reason on the ConnectionLifecycle for
conn, if any. -
#routing ⇒ Routing
Routing strategy (created lazily on first access).
-
#signal_fatal_error(error) ⇒ Object
Wraps an unexpected pump error as SocketDeadError and unblocks any callers waiting on the recv queue.
-
#spawn_conn_pump_task(conn, annotation:, &block) ⇒ Object
Spawns a per-connection pump task on the connection’s own lifecycle barrier.
-
#spawn_inproc_retry(endpoint) {|interval| ... } ⇒ Object
Spawns an inproc reconnect retry task under the socket’s parent task.
-
#spawn_pump_task(annotation:, parent: Async::Task.current) { ... } ⇒ Async::Task
Spawns a transient pump task with error propagation.
-
#start_recv_pump(conn, recv_queue) {|msg| ... } ⇒ Async::Task?
Starts a recv pump for a connection, or wires the inproc fast path.
-
#stop ⇒ void
Immediate hard stop: skips the linger drain and cascades teardown through the socket-level barrier.
-
#subscribe(prefix) ⇒ Object
Subscribes to a topic prefix on SUB/XSUB sockets.
-
#subscriber_joined ⇒ Object
Delegated to the routing strategy.
-
#transport_for(endpoint) ⇒ Module
Looks up the transport module for an endpoint URI.
-
#transport_object_for(endpoint) ⇒ Dialer, ...
Returns the transport object (Dialer or Listener) for an endpoint.
-
#unbind(endpoint) ⇒ void
Unbinds from an endpoint.
-
#unsubscribe(prefix) ⇒ Object
Unsubscribes from a topic prefix on SUB/XSUB sockets.
Constructor Details
#initialize(socket_type, options) ⇒ Engine
Returns a new instance of Engine.
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/omq/engine.rb', line 83 def initialize(socket_type, ) @socket_type = socket_type @options = @routing = nil @connections = {} # connection => ConnectionLifecycle @dialers = {} # endpoint => Dialer (reconnect intent + connect logic) @listeners = {} # endpoint => Listener @lifecycle = SocketLifecycle.new @fatal_error = nil @monitor_queue = nil @verbose_monitor = false end |
Class Attribute Details
.transports ⇒ Hash{String => Module} (readonly)
Returns registered transports.
27 28 29 |
# File 'lib/omq/engine.rb', line 27 def transports @transports end |
Instance Attribute Details
#connection_wrapper ⇒ Object
Optional proc that wraps new connections (e.g. for serialization). Called with the raw connection; must return the (possibly wrapped) connection.
54 55 56 |
# File 'lib/omq/engine.rb', line 54 def connection_wrapper @connection_wrapper end |
#connections ⇒ Hash{Connection => ConnectionLifecycle} (readonly)
Returns active connections.
48 49 50 |
# File 'lib/omq/engine.rb', line 48 def connections @connections end |
#lifecycle ⇒ SocketLifecycle (readonly)
Returns socket-level state + signaling.
59 60 61 |
# File 'lib/omq/engine.rb', line 59 def lifecycle @lifecycle end |
#listeners ⇒ Hash{String => Listener} (readonly)
Returns active listeners keyed by resolved endpoint.
43 44 45 |
# File 'lib/omq/engine.rb', line 43 def listeners @listeners end |
#monitor_queue=(value) ⇒ Object (writeonly)
Sets the attribute monitor_queue
65 66 67 |
# File 'lib/omq/engine.rb', line 65 def monitor_queue=(value) @monitor_queue = value end |
#options ⇒ Options (readonly)
Returns socket options.
38 39 40 |
# File 'lib/omq/engine.rb', line 38 def @options end |
#socket_type ⇒ Symbol (readonly)
Returns socket type (e.g. :REQ, :PAIR).
33 34 35 |
# File 'lib/omq/engine.rb', line 33 def socket_type @socket_type end |
#verbose_monitor ⇒ Boolean
Returns when true, every monitor event is also printed to stderr for debugging. Set via Socket#monitor.
70 71 72 |
# File 'lib/omq/engine.rb', line 70 def verbose_monitor @verbose_monitor end |
Instance Method Details
#all_peers_gone ⇒ Object
99 |
# File 'lib/omq/engine.rb', line 99 def all_peers_gone = @lifecycle.all_peers_gone |
#barrier ⇒ Object
101 |
# File 'lib/omq/engine.rb', line 101 def = @lifecycle. |
#bind(endpoint, parent: nil, **opts) ⇒ URI::Generic
Binds to an endpoint.
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/omq/engine.rb', line 191 def bind(endpoint, parent: nil, **opts) OMQ.freeze_for_ractors! capture_parent_task(parent: parent) transport = transport_for(endpoint) listener = transport.listener(endpoint, self, **opts) start_accept_loops(listener) @listeners[listener.endpoint] = listener emit_monitor_event(:listening, endpoint: listener.endpoint) URI.parse(listener.endpoint) rescue => error emit_monitor_event(:bind_failed, endpoint: endpoint, detail: { error: error }) raise end |
#build_fatal_error(error) ⇒ Object
Constructs a SocketDeadError whose cause is error. Uses the raise-in-rescue idiom because Ruby only sets cause on an exception when it is raised from inside a rescue block – works regardless of the original caller’s $! state.
507 508 509 510 511 512 513 514 515 |
# File 'lib/omq/engine.rb', line 507 def build_fatal_error(error) raise error rescue begin raise SocketDeadError, "#{@socket_type} socket killed: #{error.}" rescue SocketDeadError => wrapped wrapped end end |
#capture_parent_task(parent: nil) ⇒ Object
Captures the socket’s task tree root and starts the socket-level maintenance task. If parent is given, it is used as the parent for every task spawned under this socket (connection supervisors, reconnect loops, maintenance, monitor). Otherwise the current Async task (or the shared Reactor root, for non-Async callers) is captured automatically.
Idempotent: first call wins. Subsequent calls (including from later bind/connect invocations) with a different parent are silently ignored.
531 532 533 534 535 536 537 |
# File 'lib/omq/engine.rb', line 531 def capture_parent_task(parent: nil) task = @lifecycle.capture_parent_task(parent: parent, linger: @options.linger) return unless task Maintenance.start(@lifecycle., @options.mechanism) end |
#close ⇒ void
This method returns an undefined value.
Closes all connections and listeners gracefully. Drains pending sends up to linger seconds, then cascades teardown through the socket-level OMQ::Engine::SocketLifecycle#barrier — every per-connection barrier is stopped as a side effect, cancelling every pump.
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 |
# File 'lib/omq/engine.rb', line 384 def close return unless @lifecycle.open? @lifecycle.start_closing! stop_listeners unless @connections.empty? if @options.linger.nil? || @options.linger > 0 drain_send_queues(@options.linger) end @lifecycle.finish_closing! if @lifecycle.on_io_thread Reactor.untrack_linger(@options.linger) end stop_listeners emit_monitor_event(:closed) close_monitor_queue end |
#closed? ⇒ Boolean
102 |
# File 'lib/omq/engine.rb', line 102 def closed? = @lifecycle.closed? |
#connect(endpoint, parent: nil, **opts) ⇒ URI::Generic
Connects to an endpoint.
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/omq/engine.rb', line 213 def connect(endpoint, parent: nil, **opts) OMQ.freeze_for_ractors! capture_parent_task(parent: parent) validate_endpoint!(endpoint) if endpoint.start_with?("inproc://") # Inproc connect is synchronous and instant — no Dialer transport = transport_for(endpoint) transport.connect(endpoint, self, **opts) @dialers[endpoint] = :inproc # sentinel for reconnect intent else transport = transport_for(endpoint) @dialers[endpoint] = transport.dialer(endpoint, self, **opts) emit_monitor_event(:connect_delayed, endpoint: endpoint) schedule_reconnect(endpoint, delay: 0) end URI.parse(endpoint) end |
#connection_lost(connection) ⇒ void
This method returns an undefined value.
Called when a connection is lost.
351 352 353 |
# File 'lib/omq/engine.rb', line 351 def connection_lost(connection) @connections[connection]&.lost! end |
#connection_ready(pipe, endpoint: nil) ⇒ void
This method returns an undefined value.
Called by inproc transport with a pre-validated DirectPipe. Skips ZMTP handshake — just registers with routing strategy.
289 290 291 |
# File 'lib/omq/engine.rb', line 289 def connection_ready(pipe, endpoint: nil) ConnectionLifecycle.new(self, endpoint: endpoint).ready_direct!(pipe) end |
#dequeue_recv ⇒ Array<String>
Dequeues the next received message. Blocks until available.
299 300 301 302 303 304 305 306 |
# File 'lib/omq/engine.rb', line 299 def dequeue_recv raise @fatal_error if @fatal_error msg = routing.dequeue_recv raise @fatal_error if msg.nil? && @fatal_error msg end |
#dequeue_recv_sentinel ⇒ Object
Pushes a nil sentinel into the recv queue, unblocking a pending #dequeue_recv with a nil return value.
312 313 314 |
# File 'lib/omq/engine.rb', line 312 def dequeue_recv_sentinel routing.unblock_recv end |
#disconnect(endpoint) ⇒ void
This method returns an undefined value.
Disconnects from an endpoint. Closes connections to that endpoint and stops auto-reconnection for it.
240 241 242 243 |
# File 'lib/omq/engine.rb', line 240 def disconnect(endpoint) @dialers.delete(endpoint) close_connections_at(endpoint) end |
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ void
This method returns an undefined value.
Emits a lifecycle event to the monitor queue, if one is attached.
547 548 549 550 551 552 553 554 |
# File 'lib/omq/engine.rb', line 547 def emit_monitor_event(type, endpoint: nil, detail: nil) return unless @monitor_queue event = MonitorEvent.new type: type, endpoint: endpoint, detail: detail @monitor_queue << event rescue Async::Stop, ClosedQueueError end |
#emit_verbose_monitor_event(type, **detail) ⇒ void
This method returns an undefined value.
Emits a verbose-only monitor event (e.g. message traces). Only emitted when Socket#monitor was called with verbose: true. Uses **detail to avoid Hash allocation when verbose is off.
565 566 567 568 |
# File 'lib/omq/engine.rb', line 565 def emit_verbose_monitor_event(type, **detail) return unless @verbose_monitor emit_monitor_event(type, detail: detail) end |
#emit_verbose_msg_received(conn, parts) ⇒ Object
Emits a :message_received verbose event and enriches it with the on-wire (pre-decompression) byte size if conn exposes last_wire_size_in.
590 591 592 593 594 595 596 597 598 599 600 |
# File 'lib/omq/engine.rb', line 590 def emit_verbose_msg_received(conn, parts) return unless @verbose_monitor detail = { parts: parts } if conn.respond_to? :last_wire_size_in detail[:wire_size] = conn.last_wire_size_in end emit_monitor_event :message_received, detail: detail end |
#emit_verbose_msg_sent(conn, parts) ⇒ Object
Emits a :message_sent verbose event and enriches it with the on-wire (post-compression) byte size if conn exposes last_wire_size_out (installed by ZMTP-Zstd etc.).
574 575 576 577 578 579 580 581 582 583 584 |
# File 'lib/omq/engine.rb', line 574 def emit_verbose_msg_sent(conn, parts) return unless @verbose_monitor detail = { parts: parts } if conn.respond_to? :last_wire_size_out detail[:wire_size] = conn.last_wire_size_out end emit_monitor_event :message_sent, detail: detail end |
#enqueue_send(parts) ⇒ void
This method returns an undefined value.
Enqueues a message for sending. Blocks at HWM.
323 324 325 326 |
# File 'lib/omq/engine.rb', line 323 def enqueue_send(parts) raise @fatal_error if @fatal_error routing.enqueue(parts) end |
#handle_accepted(io, endpoint: nil) ⇒ void
This method returns an undefined value.
Called by a transport when an incoming connection is accepted.
266 267 268 269 |
# File 'lib/omq/engine.rb', line 266 def handle_accepted(io, endpoint: nil) emit_monitor_event(:accepted, endpoint: endpoint) spawn_connection(io, as_server: true, endpoint: endpoint) end |
#handle_connected(io, endpoint: nil) ⇒ void
This method returns an undefined value.
Called by a transport when an outgoing connection is established.
277 278 279 280 |
# File 'lib/omq/engine.rb', line 277 def handle_connected(io, endpoint: nil) emit_monitor_event(:connected, endpoint: endpoint) spawn_connection(io, as_server: false, endpoint: endpoint) end |
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still dialed.
367 368 369 370 371 372 373 374 |
# File 'lib/omq/engine.rb', line 367 def maybe_reconnect(endpoint) return unless endpoint && @dialers.key?(endpoint) return unless @lifecycle.open? && @lifecycle.reconnect_enabled dialer = @dialers[endpoint] Reconnect.schedule(dialer, @options, @lifecycle.parent_task, self) end |
#maybe_resolve_all_peers_gone ⇒ Object
Resolves ‘all_peers_gone` if we had peers and now have none. Called by ConnectionLifecycle during teardown.
359 360 361 |
# File 'lib/omq/engine.rb', line 359 def maybe_resolve_all_peers_gone @lifecycle.maybe_resolve_all_peers_gone(@connections) end |
#parent_task ⇒ Object
100 |
# File 'lib/omq/engine.rb', line 100 def parent_task = @lifecycle.parent_task |
#peer_connected ⇒ Object
Delegated to SocketLifecycle.
98 |
# File 'lib/omq/engine.rb', line 98 def peer_connected = @lifecycle.peer_connected |
#reconnect_enabled=(value) ⇒ Object
Enables or disables auto-reconnect for dropped connections. Delegated to SocketLifecycle. Close paths flip this to false so a lost connection doesn’t schedule a new retry after linger.
111 112 113 |
# File 'lib/omq/engine.rb', line 111 def reconnect_enabled=(value) @lifecycle.reconnect_enabled = value end |
#record_disconnect_reason(conn, error) ⇒ Object
Records the disconnect reason on the ConnectionLifecycle for conn, if any. Called by the recv pump rescue so the upcoming ‘:disconnected` monitor event carries an error reason, without exposing the internal connection map.
152 153 154 |
# File 'lib/omq/engine.rb', line 152 def record_disconnect_reason(conn, error) @connections[conn]&.record_disconnect_reason(error) end |
#routing ⇒ Routing
Returns routing strategy (created lazily on first access).
75 76 77 |
# File 'lib/omq/engine.rb', line 75 def routing @routing ||= Routing.for(@socket_type).new(self) end |
#signal_fatal_error(error) ⇒ Object
Wraps an unexpected pump error as SocketDeadError and unblocks any callers waiting on the recv queue. The original error is preserved as #cause so callers can surface the real reason.
494 495 496 497 498 499 500 |
# File 'lib/omq/engine.rb', line 494 def signal_fatal_error(error) return unless @lifecycle.open? @fatal_error = build_fatal_error(error) routing.unblock_recv rescue nil @lifecycle.peer_connected.resolve(nil) rescue nil end |
#spawn_conn_pump_task(conn, annotation:, &block) ⇒ Object
Spawns a per-connection pump task on the connection’s own lifecycle barrier. When any pump on the barrier exits (e.g. the send pump sees EPIPE and calls #connection_lost), OMQ::Engine::ConnectionLifecycle#tear_down! calls ‘barrier.stop` which cancels every sibling pump for that connection — so a dead peer can no longer leave orphan send pumps blocked on `dequeue` waiting for messages that will never be written.
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 |
# File 'lib/omq/engine.rb', line 466 def spawn_conn_pump_task(conn, annotation:, &block) lifecycle = @connections[conn] unless lifecycle return spawn_pump_task(annotation: annotation, &block) end lifecycle..async(transient: true, annotation: annotation) do yield rescue Async::Stop, Async::Cancel # normal shutdown / sibling tore us down rescue Protocol::ZMTP::Error, *CONNECTION_LOST => error # expected disconnect — stash reason for the :disconnected # monitor event, then let the lifecycle reconnect as usual lifecycle.record_disconnect_reason(error) rescue => error signal_fatal_error(error) end end |
#spawn_inproc_retry(endpoint) {|interval| ... } ⇒ Object
Spawns an inproc reconnect retry task under the socket’s parent task.
173 174 175 176 177 178 179 180 181 182 |
# File 'lib/omq/engine.rb', line 173 def spawn_inproc_retry(endpoint) ri = @options.reconnect_interval ivl = ri.is_a?(Range) ? ri.begin : ri ann = "inproc reconnect #{endpoint}" @lifecycle..async(transient: true, annotation: ann) do yield ivl rescue Async::Stop, Async::Cancel end end |
#spawn_pump_task(annotation:, parent: Async::Task.current) { ... } ⇒ Async::Task
Spawns a transient pump task with error propagation.
Unexpected exceptions are caught and forwarded to #signal_fatal_error so blocked callers (send/recv) see the real error instead of deadlocking.
444 445 446 447 448 449 450 451 452 |
# File 'lib/omq/engine.rb', line 444 def spawn_pump_task(annotation:, parent: Async::Task.current, &block) parent.async(transient: true, annotation: annotation) do yield rescue Async::Stop, Async::Cancel, Protocol::ZMTP::Error, *CONNECTION_LOST # normal shutdown / expected disconnect rescue => error signal_fatal_error(error) end end |
#start_recv_pump(conn, recv_queue) {|msg| ... } ⇒ Async::Task?
Starts a recv pump for a connection, or wires the inproc fast path.
336 337 338 339 340 341 342 343 |
# File 'lib/omq/engine.rb', line 336 def start_recv_pump(conn, recv_queue, &transform) # Spawn on the connection's lifecycle barrier so the recv pump is # torn down together with the rest of its sibling per-connection # pumps when the connection is lost. parent = @connections[conn]&. || @lifecycle. RecvPump.start(parent, conn, recv_queue, self, transform) end |
#stop ⇒ void
This method returns an undefined value.
Immediate hard stop: skips the linger drain and cascades teardown through the socket-level barrier. Intended for crash-path cleanup where #close‘s drain is either unsafe or undesired.
413 414 415 416 417 418 419 420 421 422 423 424 425 426 |
# File 'lib/omq/engine.rb', line 413 def stop return unless @lifecycle.alive? @lifecycle.force_close! if @lifecycle.on_io_thread Reactor.untrack_linger(@options.linger) end stop_listeners emit_monitor_event(:closed) close_monitor_queue end |
#subscribe(prefix) ⇒ Object
Subscribes to a topic prefix on SUB/XSUB sockets. Delegates to the routing strategy so callers don’t have to chain through ‘engine.routing`.
131 132 133 |
# File 'lib/omq/engine.rb', line 131 def subscribe(prefix) routing.subscribe(prefix) end |
#subscriber_joined ⇒ Object
Delegated to the routing strategy. Forwards the PUB/XPUB/SUB/XSUB subscription surface so callers don’t have to chain through ‘engine.routing`.
120 121 122 |
# File 'lib/omq/engine.rb', line 120 def subscriber_joined routing.subscriber_joined end |
#transport_for(endpoint) ⇒ Module
Looks up the transport module for an endpoint URI.
609 610 611 612 613 614 615 616 617 618 |
# File 'lib/omq/engine.rb', line 609 def transport_for(endpoint) scheme = endpoint[/\A([^:]+):\/\//, 1] transport = self.class.transports[scheme] unless transport raise ArgumentError, "unsupported transport: #{endpoint}" end transport end |
#transport_object_for(endpoint) ⇒ Dialer, ...
Returns the transport object (Dialer or Listener) for an endpoint. Used by OMQ::Engine::ConnectionLifecycle#ready! to call #wrap_connection.
163 164 165 |
# File 'lib/omq/engine.rb', line 163 def transport_object_for(endpoint) @dialers[endpoint] || @listeners[endpoint] end |
#unbind(endpoint) ⇒ void
This method returns an undefined value.
Unbinds from an endpoint. Stops the listener and closes all connections that were accepted on it.
252 253 254 255 256 257 |
# File 'lib/omq/engine.rb', line 252 def unbind(endpoint) listener = @listeners.delete(endpoint) or return listener.stop close_connections_at(endpoint) end |
#unsubscribe(prefix) ⇒ Object
Unsubscribes from a topic prefix on SUB/XSUB sockets. Delegates to the routing strategy so callers don’t have to chain through ‘engine.routing`.
142 143 144 |
# File 'lib/omq/engine.rb', line 142 def unsubscribe(prefix) routing.unsubscribe(prefix) end |