Class: OMQ::Socket
Overview
Socket base class.
Instance Attribute Summary collapse
-
#engine ⇒ Engine
readonly
The socket’s engine.
-
#last_tcp_port ⇒ Integer?
readonly
Last auto-selected TCP port.
- #options ⇒ Options readonly
Class Method Summary collapse
-
.bind(endpoint, **opts) ⇒ Socket
Creates a new socket and binds it to the given endpoint.
-
.connect(endpoint, **opts) ⇒ Socket
Creates a new socket and connects it to the given endpoint.
Instance Method Summary collapse
-
#all_peers_gone ⇒ Async::Promise
Resolves when all peers disconnect (after having had peers).
-
#attach_endpoints(endpoints, default:) ⇒ Object
Connects or binds based on endpoint prefix convention.
-
#bind(endpoint, parent: nil) ⇒ void
Binds to an endpoint.
-
#close ⇒ nil
Closes the socket and releases all resources.
-
#close_read ⇒ void
Signals end-of-stream on the receive side.
-
#connect(endpoint, parent: nil) ⇒ void
Connects to an endpoint.
-
#connection_count ⇒ Integer
Current number of peer connections.
-
#disconnect(endpoint) ⇒ void
Disconnects from an endpoint.
-
#init_engine(socket_type, linger:, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) ⇒ Object
Initializes engine and options for a socket type.
-
#initialize(endpoints = nil, linger: 0) ⇒ Socket
constructor
A new instance of Socket.
- #inspect ⇒ String
-
#last_endpoint ⇒ String?
Last bound endpoint.
-
#monitor(verbose: false) {|event| ... } ⇒ Async::Task
Yields lifecycle events for this socket.
-
#peer_connected ⇒ Async::Promise
Resolves when first peer completes handshake.
-
#reconnect_enabled=(val) ⇒ void
Disable auto-reconnect for connected endpoints.
-
#set_unbounded ⇒ nil
Set socket to use unbounded pipes (HWM=0).
-
#stop ⇒ nil
Immediate hard stop.
-
#subscriber_joined ⇒ Async::Promise
Resolves when first subscriber joins (PUB/XPUB only).
-
#unbind(endpoint) ⇒ void
Unbinds from an endpoint.
Constructor Details
#initialize(endpoints = nil, linger: 0) ⇒ Socket
Returns a new instance of Socket.
78 79 |
# File 'lib/omq/socket.rb', line 78 def initialize(endpoints = nil, linger: 0) end |
Instance Attribute Details
#engine ⇒ Engine (readonly)
Returns the socket’s engine. Exposed for peer tooling (omq-cli, omq-ffi, omq-ractor) that needs to reach into the socket’s internals — not part of the stable user API.
47 48 49 |
# File 'lib/omq/socket.rb', line 47 def engine @engine end |
#last_tcp_port ⇒ Integer? (readonly)
Returns last auto-selected TCP port.
40 41 42 |
# File 'lib/omq/socket.rb', line 40 def last_tcp_port @last_tcp_port end |
Class Method Details
.bind(endpoint, **opts) ⇒ Socket
Creates a new socket and binds it to the given endpoint.
17 18 19 |
# File 'lib/omq/socket.rb', line 17 def self.bind(endpoint, **opts) new("@#{endpoint}", **opts) end |
.connect(endpoint, **opts) ⇒ Socket
Creates a new socket and connects it to the given endpoint.
28 29 30 |
# File 'lib/omq/socket.rb', line 28 def self.connect(endpoint, **opts) new(">#{endpoint}", **opts) end |
Instance Method Details
#all_peers_gone ⇒ Async::Promise
Returns resolves when all peers disconnect (after having had peers).
156 157 158 |
# File 'lib/omq/socket.rb', line 156 def all_peers_gone @engine.all_peers_gone end |
#attach_endpoints(endpoints, default:) ⇒ Object
Connects or binds based on endpoint prefix convention. Called from subclass initializers (including out-of-tree socket types).
279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/omq/socket.rb', line 279 def attach_endpoints(endpoints, default:) return unless endpoints case endpoints when /\A@(.+)\z/ bind($1) when /\A>(.+)\z/ connect($1) else __send__(default, endpoints) end end |
#bind(endpoint, parent: nil) ⇒ void
This method returns an undefined value.
Binds to an endpoint.
95 96 97 98 99 100 101 |
# File 'lib/omq/socket.rb', line 95 def bind(endpoint, parent: nil) ensure_parent_task(parent: parent) Reactor.run do @engine.bind(endpoint) # TODO: use timeout? @last_tcp_port = @engine.last_tcp_port end end |
#close ⇒ nil
Closes the socket and releases all resources. Drains pending sends up to linger seconds, then cascades teardown through the socket-level Async::Barrier — every connection’s per-connection barrier is stopped, cancelling every pump.
237 238 239 240 |
# File 'lib/omq/socket.rb', line 237 def close Reactor.run { @engine.close } # TODO: use timeout? nil end |
#close_read ⇒ void
This method returns an undefined value.
Signals end-of-stream on the receive side. A subsequent #receive call that would otherwise block returns nil.
172 173 174 |
# File 'lib/omq/socket.rb', line 172 def close_read @engine.dequeue_recv_sentinel end |
#connect(endpoint, parent: nil) ⇒ void
This method returns an undefined value.
Connects to an endpoint.
110 111 112 113 |
# File 'lib/omq/socket.rb', line 110 def connect(endpoint, parent: nil) ensure_parent_task(parent: parent) Reactor.run { @engine.connect(endpoint) } # TODO: use timeout? end |
#connection_count ⇒ Integer
Returns current number of peer connections.
162 163 164 |
# File 'lib/omq/socket.rb', line 162 def connection_count @engine.connections.size end |
#disconnect(endpoint) ⇒ void
This method returns an undefined value.
Disconnects from an endpoint.
121 122 123 |
# File 'lib/omq/socket.rb', line 121 def disconnect(endpoint) Reactor.run { @engine.disconnect(endpoint) } # TODO: use timeout? end |
#init_engine(socket_type, linger:, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) ⇒ Object
Initializes engine and options for a socket type. Called from subclass initializers (including out-of-tree socket types).
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/omq/socket.rb', line 299 def init_engine(socket_type, linger:, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) @options = Options.new(linger: linger) @options.send_hwm = send_hwm if send_hwm @options.recv_hwm = recv_hwm if recv_hwm @options.send_timeout = send_timeout if send_timeout @options.recv_timeout = recv_timeout if recv_timeout @options.conflate = conflate @options.on_mute = on_mute if on_mute @engine = case backend when nil, :ruby Engine.new(socket_type, @options) when :ffi FFI::Engine.new(socket_type, @options) else raise ArgumentError, "unknown backend: #{backend}" end end |
#inspect ⇒ String
268 269 270 |
# File 'lib/omq/socket.rb', line 268 def inspect format("#<%s last_endpoint=%p>", self.class, last_endpoint) end |
#last_endpoint ⇒ String?
Returns last bound endpoint.
138 139 140 |
# File 'lib/omq/socket.rb', line 138 def last_endpoint @engine.last_endpoint end |
#monitor(verbose: false) {|event| ... } ⇒ Async::Task
Yields lifecycle events for this socket.
Spawns a background fiber that reads from an internal event queue. The block receives MonitorEvent instances until the socket is closed or the returned task is stopped.
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/omq/socket.rb', line 199 def monitor(verbose: false, &block) ensure_parent_task queue = Async::LimitedQueue.new(64) @engine.monitor_queue = queue @engine.verbose_monitor = verbose Reactor.run do @engine.parent_task.async(transient: true, annotation: "monitor") do while (event = queue.dequeue) block.call(event) end rescue Async::Stop ensure @engine.monitor_queue = nil block.call(MonitorEvent.new(type: :monitor_stopped)) end end end |
#peer_connected ⇒ Async::Promise
Returns resolves when first peer completes handshake.
144 145 146 |
# File 'lib/omq/socket.rb', line 144 def peer_connected @engine.peer_connected end |
#reconnect_enabled=(val) ⇒ void
This method returns an undefined value.
Disable auto-reconnect for connected endpoints.
225 226 227 |
# File 'lib/omq/socket.rb', line 225 def reconnect_enabled=(val) @engine.reconnect_enabled = val end |
#set_unbounded ⇒ nil
Set socket to use unbounded pipes (HWM=0).
259 260 261 262 263 |
# File 'lib/omq/socket.rb', line 259 def set_unbounded @options.send_hwm = 0 @options.recv_hwm = 0 nil end |
#stop ⇒ nil
Immediate hard stop. Skips the linger drain and cascades teardown through the socket-level Async::Barrier. Intended for crash-path cleanup or when the caller already knows no pending sends matter.
249 250 251 252 |
# File 'lib/omq/socket.rb', line 249 def stop Reactor.run { @engine.stop } # TODO: use timeout? nil end |
#subscriber_joined ⇒ Async::Promise
Returns resolves when first subscriber joins (PUB/XPUB only).
150 151 152 |
# File 'lib/omq/socket.rb', line 150 def subscriber_joined @engine.routing.subscriber_joined end |