Class: OMQ::Socket
Overview
Socket base class.
Instance Attribute Summary collapse
-
#engine ⇒ Engine
readonly
The socket’s engine.
-
#last_tcp_port ⇒ Integer?
readonly
Last auto-selected TCP port.
- #options ⇒ Options readonly
Class Method Summary collapse
-
.bind(endpoint, **opts) ⇒ Socket
Creates a new socket and binds it to the given endpoint.
-
.connect(endpoint, **opts) ⇒ Socket
Creates a new socket and connects it to the given endpoint.
Instance Method Summary collapse
-
#all_peers_gone ⇒ Async::Promise
Resolves when all peers disconnect (after having had peers).
-
#attach_endpoints(endpoints, default:) ⇒ Object
Connects or binds based on endpoint prefix convention.
-
#bind(endpoint, parent: nil) ⇒ void
Binds to an endpoint.
-
#close ⇒ nil
Closes the socket and releases all resources.
-
#close_read ⇒ void
Signals end-of-stream on the receive side.
-
#connect(endpoint, parent: nil) ⇒ void
Connects to an endpoint.
-
#connection_count ⇒ Integer
Current number of peer connections.
-
#disconnect(endpoint) ⇒ void
Disconnects from an endpoint.
-
#finalize_init ⇒ Object
Yields
selfif a block was given, then closes. -
#init_engine(socket_type, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) ⇒ Object
Initializes engine and options for a socket type.
-
#initialize(endpoints = nil) {|the| ... } ⇒ Socket
constructor
Use option accessors (e.g. socket.linger = 0) to configure post-construction.
- #inspect ⇒ String
-
#last_endpoint ⇒ String?
Last bound endpoint.
-
#monitor(verbose: false) {|event| ... } ⇒ Async::Task
Yields lifecycle events for this socket.
-
#peer_connected ⇒ Async::Promise
Resolves when first peer completes handshake.
-
#reconnect_enabled=(val) ⇒ void
Disable auto-reconnect for connected endpoints.
-
#set_unbounded ⇒ nil
Set socket to use unbounded pipes (HWM=0).
-
#stop ⇒ nil
Immediate hard stop.
-
#subscriber_joined ⇒ Async::Promise
Resolves when first subscriber joins (PUB/XPUB only).
-
#unbind(endpoint) ⇒ void
Unbinds from an endpoint.
Constructor Details
#initialize(endpoints = nil) {|the| ... } ⇒ Socket
Use option accessors (e.g. socket.linger = 0) to configure post-construction.
83 84 |
# File 'lib/omq/socket.rb', line 83 def initialize(endpoints = nil) end |
Instance Attribute Details
#engine ⇒ Engine (readonly)
Returns the socket’s engine. Exposed for peer tooling (omq-cli, omq-ffi, omq-ractor) that needs to reach into the socket’s internals — not part of the stable user API.
47 48 49 |
# File 'lib/omq/socket.rb', line 47 def engine @engine end |
#last_tcp_port ⇒ Integer? (readonly)
Returns last auto-selected TCP port.
40 41 42 |
# File 'lib/omq/socket.rb', line 40 def last_tcp_port @last_tcp_port end |
Class Method Details
.bind(endpoint, **opts) ⇒ Socket
Creates a new socket and binds it to the given endpoint.
17 18 19 |
# File 'lib/omq/socket.rb', line 17 def self.bind(endpoint, **opts) new("@#{endpoint}", **opts) end |
.connect(endpoint, **opts) ⇒ Socket
Creates a new socket and connects it to the given endpoint.
28 29 30 |
# File 'lib/omq/socket.rb', line 28 def self.connect(endpoint, **opts) new(">#{endpoint}", **opts) end |
Instance Method Details
#all_peers_gone ⇒ Async::Promise
Returns resolves when all peers disconnect (after having had peers).
175 176 177 |
# File 'lib/omq/socket.rb', line 175 def all_peers_gone @engine.all_peers_gone end |
#attach_endpoints(endpoints, default:) ⇒ Object
Connects or binds based on endpoint prefix convention. Called from subclass initializers (including out-of-tree socket types).
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'lib/omq/socket.rb', line 298 def attach_endpoints(endpoints, default:) return unless endpoints if endpoints.is_a?(Array) endpoints.each { |ep| attach_endpoints(ep, default: default) } return end case endpoints when /\A@(.+)\z/ bind($1) when /\A>(.+)\z/ connect($1) else __send__(default, endpoints) end end |
#bind(endpoint, parent: nil) ⇒ void
This method returns an undefined value.
Binds to an endpoint.
114 115 116 117 118 119 120 |
# File 'lib/omq/socket.rb', line 114 def bind(endpoint, parent: nil) ensure_parent_task(parent: parent) Reactor.run do @engine.bind(endpoint) # TODO: use timeout? @last_tcp_port = @engine.last_tcp_port end end |
#close ⇒ nil
Closes the socket and releases all resources. Drains pending sends up to linger seconds, then cascades teardown through the socket-level Async::Barrier — every connection’s per-connection barrier is stopped, cancelling every pump.
256 257 258 259 |
# File 'lib/omq/socket.rb', line 256 def close Reactor.run { @engine.close } # TODO: use timeout? nil end |
#close_read ⇒ void
This method returns an undefined value.
Signals end-of-stream on the receive side. A subsequent #receive call that would otherwise block returns nil.
191 192 193 |
# File 'lib/omq/socket.rb', line 191 def close_read @engine.dequeue_recv_sentinel end |
#connect(endpoint, parent: nil) ⇒ void
This method returns an undefined value.
Connects to an endpoint.
129 130 131 132 |
# File 'lib/omq/socket.rb', line 129 def connect(endpoint, parent: nil) ensure_parent_task(parent: parent) Reactor.run { @engine.connect(endpoint) } # TODO: use timeout? end |
#connection_count ⇒ Integer
Returns current number of peer connections.
181 182 183 |
# File 'lib/omq/socket.rb', line 181 def connection_count @engine.connections.size end |
#disconnect(endpoint) ⇒ void
This method returns an undefined value.
Disconnects from an endpoint.
140 141 142 |
# File 'lib/omq/socket.rb', line 140 def disconnect(endpoint) Reactor.run { @engine.disconnect(endpoint) } # TODO: use timeout? end |
#finalize_init ⇒ Object
Yields self if a block was given, then closes. Called by every subclass initializer so OMQ::PUSH.new { |p| … } behaves like File.open: configure, use, auto-close.
91 92 93 94 95 96 97 98 |
# File 'lib/omq/socket.rb', line 91 def finalize_init return unless block_given? begin yield self ensure close end end |
#init_engine(socket_type, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) ⇒ Object
Initializes engine and options for a socket type. Called from subclass initializers (including out-of-tree socket types).
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/omq/socket.rb', line 322 def init_engine(socket_type, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) @options = Options.new @options.send_hwm = send_hwm if send_hwm @options.recv_hwm = recv_hwm if recv_hwm @options.send_timeout = send_timeout if send_timeout @options.recv_timeout = recv_timeout if recv_timeout @options.conflate = conflate @options.on_mute = on_mute if on_mute @engine = case backend when nil, :ruby Engine.new(socket_type, @options) when :ffi FFI::Engine.new(socket_type, @options) else raise ArgumentError, "unknown backend: #{backend}" end end |
#inspect ⇒ String
287 288 289 |
# File 'lib/omq/socket.rb', line 287 def inspect format("#<%s last_endpoint=%p>", self.class, last_endpoint) end |
#last_endpoint ⇒ String?
Returns last bound endpoint.
157 158 159 |
# File 'lib/omq/socket.rb', line 157 def last_endpoint @engine.last_endpoint end |
#monitor(verbose: false) {|event| ... } ⇒ Async::Task
Yields lifecycle events for this socket.
Spawns a background fiber that reads from an internal event queue. The block receives MonitorEvent instances until the socket is closed or the returned task is stopped.
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/omq/socket.rb', line 218 def monitor(verbose: false, &block) ensure_parent_task queue = Async::LimitedQueue.new(64) @engine.monitor_queue = queue @engine.verbose_monitor = verbose Reactor.run do @engine.parent_task.async(transient: true, annotation: "monitor") do while (event = queue.dequeue) block.call(event) end rescue Async::Stop ensure @engine.monitor_queue = nil block.call(MonitorEvent.new(type: :monitor_stopped)) end end end |
#peer_connected ⇒ Async::Promise
Returns resolves when first peer completes handshake.
163 164 165 |
# File 'lib/omq/socket.rb', line 163 def peer_connected @engine.peer_connected end |
#reconnect_enabled=(val) ⇒ void
This method returns an undefined value.
Disable auto-reconnect for connected endpoints.
244 245 246 |
# File 'lib/omq/socket.rb', line 244 def reconnect_enabled=(val) @engine.reconnect_enabled = val end |
#set_unbounded ⇒ nil
Set socket to use unbounded pipes (HWM=0).
278 279 280 281 282 |
# File 'lib/omq/socket.rb', line 278 def set_unbounded @options.send_hwm = 0 @options.recv_hwm = 0 nil end |
#stop ⇒ nil
Immediate hard stop. Skips the linger drain and cascades teardown through the socket-level Async::Barrier. Intended for crash-path cleanup or when the caller already knows no pending sends matter.
268 269 270 271 |
# File 'lib/omq/socket.rb', line 268 def stop Reactor.run { @engine.stop } # TODO: use timeout? nil end |
#subscriber_joined ⇒ Async::Promise
Returns resolves when first subscriber joins (PUB/XPUB only).
169 170 171 |
# File 'lib/omq/socket.rb', line 169 def subscriber_joined @engine.routing.subscriber_joined end |