Class: OMQ::Engine
- Inherits:
-
Object
- Object
- OMQ::Engine
- Defined in:
- lib/omq/engine.rb,
lib/omq/engine/heartbeat.rb,
lib/omq/engine/reconnect.rb,
lib/omq/engine/recv_pump.rb,
lib/omq/engine/maintenance.rb,
lib/omq/engine/socket_lifecycle.rb,
lib/omq/engine/connection_lifecycle.rb
Overview
Per-socket orchestrator.
Manages connections, transports, and the routing strategy for one OMQ::Socket instance. Each socket type creates one Engine.
Defined Under Namespace
Modules: Heartbeat, Maintenance Classes: ConnectionLifecycle, Reconnect, RecvPump, SocketLifecycle
Class Attribute Summary collapse
-
.transports ⇒ Hash{String => Module}
readonly
Registered transports.
Instance Attribute Summary collapse
-
#connection_wrapper ⇒ Object
Optional proc that wraps new connections (e.g. for serialization).
-
#connections ⇒ Hash{Connection => ConnectionLifecycle}
readonly
Active connections.
-
#lifecycle ⇒ SocketLifecycle
readonly
Socket-level state + signaling.
-
#listeners ⇒ Hash{String => Listener}
readonly
Active listeners keyed by resolved endpoint.
-
#monitor_queue ⇒ Object
writeonly
Sets the attribute monitor_queue.
-
#options ⇒ Options
readonly
Socket options.
-
#socket_type ⇒ Symbol
readonly
Socket type (e.g. :REQ, :PAIR).
-
#verbose_monitor ⇒ Boolean
When true, every monitor event is also printed to stderr for debugging.
Instance Method Summary collapse
- #all_peers_gone ⇒ Object
- #barrier ⇒ Object
-
#bind(endpoint, parent: nil, **opts) ⇒ URI::Generic
Binds to an endpoint.
-
#build_fatal_error(error) ⇒ Object
Constructs a SocketDeadError whose
causeiserror. -
#capture_parent_task(parent: nil) ⇒ Object
Captures the socket’s task tree root and starts the socket-level maintenance task.
-
#close ⇒ void
Closes all connections and listeners gracefully.
- #closed? ⇒ Boolean
-
#connect(endpoint, parent: nil, **opts) ⇒ URI::Generic
Connects to an endpoint.
-
#connection_lost(connection) ⇒ void
Called when a connection is lost.
-
#connection_ready(pipe, endpoint: nil) ⇒ void
Called by inproc transport with a pre-validated Pipe.
-
#dequeue_recv ⇒ Array<String>
Dequeues the next received message.
-
#dequeue_recv_sentinel ⇒ Object
Pushes a nil sentinel into the recv queue, unblocking a pending #dequeue_recv with a nil return value.
-
#disconnect(endpoint) ⇒ void
Disconnects from an endpoint.
-
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ void
Emits a lifecycle event to the monitor queue, if one is attached.
-
#emit_verbose_monitor_event(type, **detail) ⇒ void
Emits a verbose-only monitor event (e.g. message traces).
-
#emit_verbose_msg_received(conn, parts) ⇒ Object
Emits a :message_received verbose event and enriches it with the on-wire (pre-decompression) byte size if
connexposeslast_wire_size_in. -
#emit_verbose_msg_sent(conn, parts) ⇒ Object
Emits a :message_sent verbose event and enriches it with the on-wire (post-compression) byte size if
connexposeslast_wire_size_out(installed by ZMTP-Zstd etc.). -
#enqueue_send(parts) ⇒ void
Enqueues a message for sending.
-
#handle_accepted(io, endpoint: nil) ⇒ void
Called by a transport when an incoming connection is accepted.
-
#handle_connected(io, endpoint: nil) ⇒ void
Called by a transport when an outgoing connection is established.
-
#initialize(socket_type, options) ⇒ Engine
constructor
A new instance of Engine.
-
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for
endpointif auto-reconnect is enabled and the endpoint is still dialed. -
#maybe_resolve_all_peers_gone ⇒ Object
Resolves ‘all_peers_gone` if we had peers and now have none.
- #on_io_thread? ⇒ Boolean
- #parent_task ⇒ Object
-
#peer_connected ⇒ Object
Delegated to SocketLifecycle.
-
#reconnect_enabled=(value) ⇒ Object
Enables or disables auto-reconnect for dropped connections.
-
#record_disconnect_reason(conn, error) ⇒ Object
Records the disconnect reason on the ConnectionLifecycle for
conn, if any. -
#routing ⇒ Routing
Routing strategy (created lazily on first access).
-
#signal_fatal_error(error) ⇒ Object
Wraps an unexpected pump error as SocketDeadError and unblocks any callers waiting on the recv queue.
-
#spawn_conn_pump_task(conn, annotation:, &block) ⇒ Object
Spawns a per-connection pump task on the connection’s own lifecycle barrier.
-
#spawn_inproc_retry(endpoint) {|interval| ... } ⇒ Object
Spawns an inproc reconnect retry task under the socket’s parent task.
-
#spawn_pump_task(annotation:, parent: Async::Task.current) { ... } ⇒ Async::Task
Spawns a transient pump task with error propagation.
-
#start_recv_pump(conn, recv_queue) {|msg| ... } ⇒ Async::Task?
Starts a recv pump for a connection, or wires the inproc fast path.
-
#stop ⇒ void
Immediate hard stop: skips the linger drain and cascades teardown through the socket-level barrier.
-
#subscribe(prefix) ⇒ Object
Subscribes to a topic prefix on SUB/XSUB sockets.
-
#subscriber_joined ⇒ Object
Delegated to the routing strategy.
-
#transport_for(endpoint) ⇒ Module
Looks up the transport module for an endpoint URI.
-
#transport_object_for(endpoint) ⇒ Dialer, ...
Returns the transport object (Dialer or Listener) for an endpoint.
-
#unbind(endpoint) ⇒ void
Unbinds from an endpoint.
-
#unsubscribe(prefix) ⇒ Object
Unsubscribes from a topic prefix on SUB/XSUB sockets.
Constructor Details
#initialize(socket_type, options) ⇒ Engine
Returns a new instance of Engine.
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/omq/engine.rb', line 83 def initialize(socket_type, ) @socket_type = socket_type @options = @routing = nil @connections = {} # connection => ConnectionLifecycle @dialers = {} # endpoint => Dialer (reconnect intent + connect logic) @listeners = {} # endpoint => Listener @lifecycle = SocketLifecycle.new @fatal_error = nil @monitor_queue = nil @verbose_monitor = false end |
Class Attribute Details
.transports ⇒ Hash{String => Module} (readonly)
Returns registered transports.
27 28 29 |
# File 'lib/omq/engine.rb', line 27 def transports @transports end |
Instance Attribute Details
#connection_wrapper ⇒ Object
Optional proc that wraps new connections (e.g. for serialization). Called with the raw connection; must return the (possibly wrapped) connection.
54 55 56 |
# File 'lib/omq/engine.rb', line 54 def connection_wrapper @connection_wrapper end |
#connections ⇒ Hash{Connection => ConnectionLifecycle} (readonly)
Returns active connections.
48 49 50 |
# File 'lib/omq/engine.rb', line 48 def connections @connections end |
#lifecycle ⇒ SocketLifecycle (readonly)
Returns socket-level state + signaling.
59 60 61 |
# File 'lib/omq/engine.rb', line 59 def lifecycle @lifecycle end |
#listeners ⇒ Hash{String => Listener} (readonly)
Returns active listeners keyed by resolved endpoint.
43 44 45 |
# File 'lib/omq/engine.rb', line 43 def listeners @listeners end |
#monitor_queue=(value) ⇒ Object (writeonly)
Sets the attribute monitor_queue
65 66 67 |
# File 'lib/omq/engine.rb', line 65 def monitor_queue=(value) @monitor_queue = value end |
#options ⇒ Options (readonly)
Returns socket options.
38 39 40 |
# File 'lib/omq/engine.rb', line 38 def @options end |
#socket_type ⇒ Symbol (readonly)
Returns socket type (e.g. :REQ, :PAIR).
33 34 35 |
# File 'lib/omq/engine.rb', line 33 def socket_type @socket_type end |
#verbose_monitor ⇒ Boolean
Returns when true, every monitor event is also printed to stderr for debugging. Set via Socket#monitor.
70 71 72 |
# File 'lib/omq/engine.rb', line 70 def verbose_monitor @verbose_monitor end |
Instance Method Details
#all_peers_gone ⇒ Object
99 |
# File 'lib/omq/engine.rb', line 99 def all_peers_gone = @lifecycle.all_peers_gone |
#barrier ⇒ Object
101 |
# File 'lib/omq/engine.rb', line 101 def = @lifecycle. |
#bind(endpoint, parent: nil, **opts) ⇒ URI::Generic
Binds to an endpoint.
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/omq/engine.rb', line 192 def bind(endpoint, parent: nil, **opts) OMQ.freeze_for_ractors! capture_parent_task(parent: parent) transport = transport_for(endpoint) listener = transport.listener(endpoint, self, **opts) start_accept_loops(listener) @listeners[listener.endpoint] = listener emit_monitor_event(:listening, endpoint: listener.endpoint) URI.parse(listener.endpoint) rescue => error emit_monitor_event(:bind_failed, endpoint: endpoint, detail: { error: error }) raise end |
#build_fatal_error(error) ⇒ Object
Constructs a SocketDeadError whose cause is error. Uses the raise-in-rescue idiom because Ruby only sets cause on an exception when it is raised from inside a rescue block – works regardless of the original caller’s $! state.
508 509 510 511 512 513 514 515 516 |
# File 'lib/omq/engine.rb', line 508 def build_fatal_error(error) raise error rescue begin raise SocketDeadError, "#{@socket_type} socket killed: #{error.}" rescue SocketDeadError => wrapped wrapped end end |
#capture_parent_task(parent: nil) ⇒ Object
Captures the socket’s task tree root and starts the socket-level maintenance task. If parent is given, it is used as the parent for every task spawned under this socket (connection supervisors, reconnect loops, maintenance, monitor). Otherwise the current Async task (or the shared Reactor root, for non-Async callers) is captured automatically.
Idempotent: first call wins. Subsequent calls (including from later bind/connect invocations) with a different parent are silently ignored.
532 533 534 535 536 537 538 |
# File 'lib/omq/engine.rb', line 532 def capture_parent_task(parent: nil) task = @lifecycle.capture_parent_task(parent: parent, linger: @options.linger) return unless task Maintenance.start(@lifecycle., @options.mechanism) end |
#close ⇒ void
This method returns an undefined value.
Closes all connections and listeners gracefully. Drains pending sends up to linger seconds, then cascades teardown through the socket-level OMQ::Engine::SocketLifecycle#barrier — every per-connection barrier is stopped as a side effect, cancelling every pump.
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 |
# File 'lib/omq/engine.rb', line 385 def close return unless @lifecycle.open? @lifecycle.start_closing! stop_listeners unless @connections.empty? if @options.linger.nil? || @options.linger > 0 drain_send_queues(@options.linger) end @lifecycle.finish_closing! if @lifecycle.on_io_thread Reactor.untrack_linger(@options.linger) end stop_listeners emit_monitor_event(:closed) close_monitor_queue end |
#closed? ⇒ Boolean
102 |
# File 'lib/omq/engine.rb', line 102 def closed? = @lifecycle.closed? |
#connect(endpoint, parent: nil, **opts) ⇒ URI::Generic
Connects to an endpoint.
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/omq/engine.rb', line 214 def connect(endpoint, parent: nil, **opts) OMQ.freeze_for_ractors! capture_parent_task(parent: parent) validate_endpoint!(endpoint) if endpoint.start_with?("inproc://") # Inproc connect is synchronous and instant — no Dialer transport = transport_for(endpoint) transport.connect(endpoint, self, **opts) @dialers[endpoint] = :inproc # sentinel for reconnect intent else transport = transport_for(endpoint) @dialers[endpoint] = transport.dialer(endpoint, self, **opts) emit_monitor_event(:connect_delayed, endpoint: endpoint) schedule_reconnect(endpoint, delay: 0) end URI.parse(endpoint) end |
#connection_lost(connection) ⇒ void
This method returns an undefined value.
Called when a connection is lost.
352 353 354 |
# File 'lib/omq/engine.rb', line 352 def connection_lost(connection) @connections[connection]&.lost! end |
#connection_ready(pipe, endpoint: nil) ⇒ void
This method returns an undefined value.
Called by inproc transport with a pre-validated Pipe. Skips ZMTP handshake — just registers with routing strategy.
290 291 292 |
# File 'lib/omq/engine.rb', line 290 def connection_ready(pipe, endpoint: nil) ConnectionLifecycle.new(self, endpoint: endpoint).ready_direct!(pipe) end |
#dequeue_recv ⇒ Array<String>
Dequeues the next received message. Blocks until available.
300 301 302 303 304 305 306 307 |
# File 'lib/omq/engine.rb', line 300 def dequeue_recv raise @fatal_error if @fatal_error msg = routing.dequeue_recv raise @fatal_error if msg.nil? && @fatal_error msg end |
#dequeue_recv_sentinel ⇒ Object
Pushes a nil sentinel into the recv queue, unblocking a pending #dequeue_recv with a nil return value.
313 314 315 |
# File 'lib/omq/engine.rb', line 313 def dequeue_recv_sentinel routing.unblock_recv end |
#disconnect(endpoint) ⇒ void
This method returns an undefined value.
Disconnects from an endpoint. Closes connections to that endpoint and stops auto-reconnection for it.
241 242 243 244 |
# File 'lib/omq/engine.rb', line 241 def disconnect(endpoint) @dialers.delete(endpoint) close_connections_at(endpoint) end |
#emit_monitor_event(type, endpoint: nil, detail: nil) ⇒ void
This method returns an undefined value.
Emits a lifecycle event to the monitor queue, if one is attached.
548 549 550 551 552 553 554 555 |
# File 'lib/omq/engine.rb', line 548 def emit_monitor_event(type, endpoint: nil, detail: nil) return unless @monitor_queue event = MonitorEvent.new type: type, endpoint: endpoint, detail: detail @monitor_queue << event rescue Async::Stop, ClosedQueueError end |
#emit_verbose_monitor_event(type, **detail) ⇒ void
This method returns an undefined value.
Emits a verbose-only monitor event (e.g. message traces). Only emitted when Socket#monitor was called with verbose: true. Uses **detail to avoid Hash allocation when verbose is off.
566 567 568 569 |
# File 'lib/omq/engine.rb', line 566 def emit_verbose_monitor_event(type, **detail) return unless @verbose_monitor emit_monitor_event(type, detail: detail) end |
#emit_verbose_msg_received(conn, parts) ⇒ Object
Emits a :message_received verbose event and enriches it with the on-wire (pre-decompression) byte size if conn exposes last_wire_size_in.
591 592 593 594 595 596 597 598 599 600 601 |
# File 'lib/omq/engine.rb', line 591 def emit_verbose_msg_received(conn, parts) return unless @verbose_monitor detail = { parts: parts } if conn.respond_to? :last_wire_size_in detail[:wire_size] = conn.last_wire_size_in end emit_monitor_event :message_received, detail: detail end |
#emit_verbose_msg_sent(conn, parts) ⇒ Object
Emits a :message_sent verbose event and enriches it with the on-wire (post-compression) byte size if conn exposes last_wire_size_out (installed by ZMTP-Zstd etc.).
575 576 577 578 579 580 581 582 583 584 585 |
# File 'lib/omq/engine.rb', line 575 def emit_verbose_msg_sent(conn, parts) return unless @verbose_monitor detail = { parts: parts } if conn.respond_to? :last_wire_size_out detail[:wire_size] = conn.last_wire_size_out end emit_monitor_event :message_sent, detail: detail end |
#enqueue_send(parts) ⇒ void
This method returns an undefined value.
Enqueues a message for sending. Blocks at HWM.
324 325 326 327 |
# File 'lib/omq/engine.rb', line 324 def enqueue_send(parts) raise @fatal_error if @fatal_error routing.enqueue(parts) end |
#handle_accepted(io, endpoint: nil) ⇒ void
This method returns an undefined value.
Called by a transport when an incoming connection is accepted.
267 268 269 270 |
# File 'lib/omq/engine.rb', line 267 def handle_accepted(io, endpoint: nil) emit_monitor_event(:accepted, endpoint: endpoint) spawn_connection(io, as_server: true, endpoint: endpoint) end |
#handle_connected(io, endpoint: nil) ⇒ void
This method returns an undefined value.
Called by a transport when an outgoing connection is established.
278 279 280 281 |
# File 'lib/omq/engine.rb', line 278 def handle_connected(io, endpoint: nil) emit_monitor_event(:connected, endpoint: endpoint) spawn_connection(io, as_server: false, endpoint: endpoint) end |
#maybe_reconnect(endpoint) ⇒ Object
Schedules a reconnect for endpoint if auto-reconnect is enabled and the endpoint is still dialed.
368 369 370 371 372 373 374 375 |
# File 'lib/omq/engine.rb', line 368 def maybe_reconnect(endpoint) return unless endpoint && @dialers.key?(endpoint) return unless @lifecycle.open? && @lifecycle.reconnect_enabled dialer = @dialers[endpoint] Reconnect.schedule(dialer, @options, @lifecycle.parent_task, self) end |
#maybe_resolve_all_peers_gone ⇒ Object
Resolves ‘all_peers_gone` if we had peers and now have none. Called by ConnectionLifecycle during teardown.
360 361 362 |
# File 'lib/omq/engine.rb', line 360 def maybe_resolve_all_peers_gone @lifecycle.maybe_resolve_all_peers_gone(@connections) end |
#on_io_thread? ⇒ Boolean
103 |
# File 'lib/omq/engine.rb', line 103 def on_io_thread? = @lifecycle.on_io_thread |
#parent_task ⇒ Object
100 |
# File 'lib/omq/engine.rb', line 100 def parent_task = @lifecycle.parent_task |
#peer_connected ⇒ Object
Delegated to SocketLifecycle.
98 |
# File 'lib/omq/engine.rb', line 98 def peer_connected = @lifecycle.peer_connected |
#reconnect_enabled=(value) ⇒ Object
Enables or disables auto-reconnect for dropped connections. Delegated to SocketLifecycle. Close paths flip this to false so a lost connection doesn’t schedule a new retry after linger.
112 113 114 |
# File 'lib/omq/engine.rb', line 112 def reconnect_enabled=(value) @lifecycle.reconnect_enabled = value end |
#record_disconnect_reason(conn, error) ⇒ Object
Records the disconnect reason on the ConnectionLifecycle for conn, if any. Called by the recv pump rescue so the upcoming ‘:disconnected` monitor event carries an error reason, without exposing the internal connection map.
153 154 155 |
# File 'lib/omq/engine.rb', line 153 def record_disconnect_reason(conn, error) @connections[conn]&.record_disconnect_reason(error) end |
#routing ⇒ Routing
Returns routing strategy (created lazily on first access).
75 76 77 |
# File 'lib/omq/engine.rb', line 75 def routing @routing ||= Routing.for(@socket_type).new(self) end |
#signal_fatal_error(error) ⇒ Object
Wraps an unexpected pump error as SocketDeadError and unblocks any callers waiting on the recv queue. The original error is preserved as #cause so callers can surface the real reason.
495 496 497 498 499 500 501 |
# File 'lib/omq/engine.rb', line 495 def signal_fatal_error(error) return unless @lifecycle.open? @fatal_error = build_fatal_error(error) routing.unblock_recv rescue nil @lifecycle.peer_connected.resolve(nil) rescue nil end |
#spawn_conn_pump_task(conn, annotation:, &block) ⇒ Object
Spawns a per-connection pump task on the connection’s own lifecycle barrier. When any pump on the barrier exits (e.g. the send pump sees EPIPE and calls #connection_lost), OMQ::Engine::ConnectionLifecycle#tear_down! calls ‘barrier.stop` which cancels every sibling pump for that connection — so a dead peer can no longer leave orphan send pumps blocked on `dequeue` waiting for messages that will never be written.
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 |
# File 'lib/omq/engine.rb', line 467 def spawn_conn_pump_task(conn, annotation:, &block) lifecycle = @connections[conn] unless lifecycle return spawn_pump_task(annotation: annotation, &block) end lifecycle..async(transient: true, annotation: annotation) do yield rescue Async::Stop, Async::Cancel # normal shutdown / sibling tore us down rescue Protocol::ZMTP::Error, *CONNECTION_LOST => error # expected disconnect — stash reason for the :disconnected # monitor event, then let the lifecycle reconnect as usual lifecycle.record_disconnect_reason(error) rescue => error signal_fatal_error(error) end end |
#spawn_inproc_retry(endpoint) {|interval| ... } ⇒ Object
Spawns an inproc reconnect retry task under the socket’s parent task.
174 175 176 177 178 179 180 181 182 183 |
# File 'lib/omq/engine.rb', line 174 def spawn_inproc_retry(endpoint) ri = @options.reconnect_interval ivl = ri.is_a?(Range) ? ri.begin : ri ann = "inproc reconnect #{endpoint}" @lifecycle..async(transient: true, annotation: ann) do yield ivl rescue Async::Stop, Async::Cancel end end |
#spawn_pump_task(annotation:, parent: Async::Task.current) { ... } ⇒ Async::Task
Spawns a transient pump task with error propagation.
Unexpected exceptions are caught and forwarded to #signal_fatal_error so blocked callers (send/recv) see the real error instead of deadlocking.
445 446 447 448 449 450 451 452 453 |
# File 'lib/omq/engine.rb', line 445 def spawn_pump_task(annotation:, parent: Async::Task.current, &block) parent.async(transient: true, annotation: annotation) do yield rescue Async::Stop, Async::Cancel, Protocol::ZMTP::Error, *CONNECTION_LOST # normal shutdown / expected disconnect rescue => error signal_fatal_error(error) end end |
#start_recv_pump(conn, recv_queue) {|msg| ... } ⇒ Async::Task?
Starts a recv pump for a connection, or wires the inproc fast path.
337 338 339 340 341 342 343 344 |
# File 'lib/omq/engine.rb', line 337 def start_recv_pump(conn, recv_queue, &transform) # Spawn on the connection's lifecycle barrier so the recv pump is # torn down together with the rest of its sibling per-connection # pumps when the connection is lost. parent = @connections[conn]&. || @lifecycle. RecvPump.start(parent, conn, recv_queue, self, transform) end |
#stop ⇒ void
This method returns an undefined value.
Immediate hard stop: skips the linger drain and cascades teardown through the socket-level barrier. Intended for crash-path cleanup where #close‘s drain is either unsafe or undesired.
414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/omq/engine.rb', line 414 def stop return unless @lifecycle.alive? @lifecycle.force_close! if @lifecycle.on_io_thread Reactor.untrack_linger(@options.linger) end stop_listeners emit_monitor_event(:closed) close_monitor_queue end |
#subscribe(prefix) ⇒ Object
Subscribes to a topic prefix on SUB/XSUB sockets. Delegates to the routing strategy so callers don’t have to chain through ‘engine.routing`.
132 133 134 |
# File 'lib/omq/engine.rb', line 132 def subscribe(prefix) routing.subscribe(prefix) end |
#subscriber_joined ⇒ Object
Delegated to the routing strategy. Forwards the PUB/XPUB/SUB/XSUB subscription surface so callers don’t have to chain through ‘engine.routing`.
121 122 123 |
# File 'lib/omq/engine.rb', line 121 def subscriber_joined routing.subscriber_joined end |
#transport_for(endpoint) ⇒ Module
Looks up the transport module for an endpoint URI.
610 611 612 613 614 615 616 617 618 619 |
# File 'lib/omq/engine.rb', line 610 def transport_for(endpoint) scheme = endpoint[/\A([^:]+):\/\//, 1] transport = self.class.transports[scheme] unless transport raise ArgumentError, "unsupported transport: #{endpoint}" end transport end |
#transport_object_for(endpoint) ⇒ Dialer, ...
Returns the transport object (Dialer or Listener) for an endpoint. Used by OMQ::Engine::ConnectionLifecycle#ready! to call #wrap_connection.
164 165 166 |
# File 'lib/omq/engine.rb', line 164 def transport_object_for(endpoint) @dialers[endpoint] || @listeners[endpoint] end |
#unbind(endpoint) ⇒ void
This method returns an undefined value.
Unbinds from an endpoint. Stops the listener and closes all connections that were accepted on it.
253 254 255 256 257 258 |
# File 'lib/omq/engine.rb', line 253 def unbind(endpoint) listener = @listeners.delete(endpoint) or return listener.stop close_connections_at(endpoint) end |
#unsubscribe(prefix) ⇒ Object
Unsubscribes from a topic prefix on SUB/XSUB sockets. Delegates to the routing strategy so callers don’t have to chain through ‘engine.routing`.
143 144 145 |
# File 'lib/omq/engine.rb', line 143 def unsubscribe(prefix) routing.unsubscribe(prefix) end |