Class: OMQ::Socket
Overview
Socket base class.
Direct Known Subclasses
CHANNEL, CLIENT, DEALER, DISH, GATHER, PAIR, PEER, PUB, PULL, PUSH, RADIO, REP, REQ, ROUTER, SCATTER, SERVER, SUB, XPUB, XSUB
Instance Attribute Summary collapse
-
#engine ⇒ Engine
readonly
The socket’s engine.
- #options ⇒ Options readonly
Class Method Summary collapse
-
.bind(endpoint, **opts) ⇒ Socket
Creates a new socket and binds it to the given endpoint.
-
.connect(endpoint, **opts) ⇒ Socket
Creates a new socket and connects it to the given endpoint.
Instance Method Summary collapse
-
#all_peers_gone ⇒ Async::Promise
Resolves when all peers disconnect (after having had peers).
-
#attach_endpoints(endpoints, default:) ⇒ Object
Connects or binds based on endpoint prefix convention.
-
#bind(endpoint, parent: nil, **opts) ⇒ URI::Generic
Binds to an endpoint.
-
#close ⇒ nil
Closes the socket and releases all resources.
-
#close_read ⇒ void
Signals end-of-stream on the receive side.
-
#connect(endpoint, parent: nil, **opts) ⇒ URI::Generic
Connects to an endpoint.
-
#connection_count ⇒ Integer
Current number of peer connections.
-
#disconnect(endpoint) ⇒ void
Disconnects from an endpoint.
-
#finalize_init ⇒ Object
Yields
selfif a block was given, then closes. -
#init_engine(socket_type, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) ⇒ Object
Initializes engine and options for a socket type.
-
#initialize(endpoints = nil) {|the| ... } ⇒ Socket
constructor
Use option accessors (e.g. socket.linger = 0) to configure post-construction.
- #inspect ⇒ String
-
#monitor(verbose: false) {|event| ... } ⇒ Async::Task
Yields lifecycle events for this socket.
-
#peer_connected ⇒ Async::Promise
Resolves when first peer completes handshake.
-
#reconnect_enabled=(val) ⇒ void
Disable auto-reconnect for connected endpoints.
-
#set_unbounded ⇒ nil
Set socket to use unbounded pipes (HWM=0).
-
#stop ⇒ nil
Immediate hard stop.
-
#subscriber_joined ⇒ Async::Promise
Resolves when first subscriber joins (PUB/XPUB only).
-
#unbind(endpoint) ⇒ void
Unbinds from an endpoint.
Constructor Details
#initialize(endpoints = nil) {|the| ... } ⇒ Socket
Use option accessors (e.g. socket.linger = 0) to configure post-construction.
78 79 |
# File 'lib/omq/socket.rb', line 78 def initialize(endpoints = nil) end |
Instance Attribute Details
#engine ⇒ Engine (readonly)
Returns the socket’s engine. Exposed for peer tooling (omq-cli, omq-ffi, omq-ractor) that needs to reach into the socket’s internals — not part of the stable user API.
42 43 44 |
# File 'lib/omq/socket.rb', line 42 def engine @engine end |
Class Method Details
.bind(endpoint, **opts) ⇒ Socket
Creates a new socket and binds it to the given endpoint.
17 18 19 |
# File 'lib/omq/socket.rb', line 17 def self.bind(endpoint, **opts) new("@#{endpoint}", **opts) end |
.connect(endpoint, **opts) ⇒ Socket
Creates a new socket and connects it to the given endpoint.
28 29 30 |
# File 'lib/omq/socket.rb', line 28 def self.connect(endpoint, **opts) new(">#{endpoint}", **opts) end |
Instance Method Details
#all_peers_gone ⇒ Async::Promise
Returns resolves when all peers disconnect (after having had peers).
160 161 162 |
# File 'lib/omq/socket.rb', line 160 def all_peers_gone @engine.all_peers_gone end |
#attach_endpoints(endpoints, default:) ⇒ Object
Connects or binds based on endpoint prefix convention. Called from subclass initializers (including out-of-tree socket types).
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
# File 'lib/omq/socket.rb', line 283 def attach_endpoints(endpoints, default:) return unless endpoints if endpoints.is_a?(Array) endpoints.each { |ep| attach_endpoints(ep, default: default) } return end case endpoints when /\A@(.+)\z/ bind($1) when /\A>(.+)\z/ connect($1) else __send__(default, endpoints) end end |
#bind(endpoint, parent: nil, **opts) ⇒ URI::Generic
Binds to an endpoint.
109 110 111 112 |
# File 'lib/omq/socket.rb', line 109 def bind(endpoint, parent: nil, **opts) ensure_parent_task(parent: parent) Reactor.run { @engine.bind(endpoint, **opts) } # TODO: use timeout? end |
#close ⇒ nil
Closes the socket and releases all resources. Drains pending sends up to linger seconds, then cascades teardown through the socket-level Async::Barrier — every connection’s per-connection barrier is stopped, cancelling every pump.
241 242 243 244 |
# File 'lib/omq/socket.rb', line 241 def close Reactor.run { @engine.close } # TODO: use timeout? nil end |
#close_read ⇒ void
This method returns an undefined value.
Signals end-of-stream on the receive side. A subsequent #receive call that would otherwise block returns nil.
176 177 178 |
# File 'lib/omq/socket.rb', line 176 def close_read @engine.dequeue_recv_sentinel end |
#connect(endpoint, parent: nil, **opts) ⇒ URI::Generic
Connects to an endpoint.
121 122 123 124 |
# File 'lib/omq/socket.rb', line 121 def connect(endpoint, parent: nil, **opts) ensure_parent_task(parent: parent) Reactor.run { @engine.connect(endpoint, **opts) } # TODO: use timeout? end |
#connection_count ⇒ Integer
Returns current number of peer connections.
166 167 168 |
# File 'lib/omq/socket.rb', line 166 def connection_count @engine.connections.size end |
#disconnect(endpoint) ⇒ void
This method returns an undefined value.
Disconnects from an endpoint.
132 133 134 |
# File 'lib/omq/socket.rb', line 132 def disconnect(endpoint) Reactor.run { @engine.disconnect(endpoint) } # TODO: use timeout? end |
#finalize_init ⇒ Object
Yields self if a block was given, then closes. Called by every subclass initializer so OMQ::PUSH.new { |p| … } behaves like File.open: configure, use, auto-close.
86 87 88 89 90 91 92 93 |
# File 'lib/omq/socket.rb', line 86 def finalize_init return unless block_given? begin yield self ensure close end end |
#init_engine(socket_type, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) ⇒ Object
Initializes engine and options for a socket type. Called from subclass initializers (including out-of-tree socket types).
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/omq/socket.rb', line 307 def init_engine(socket_type, send_hwm: nil, recv_hwm: nil, send_timeout: nil, recv_timeout: nil, conflate: false, on_mute: nil, backend: nil) @options = Options.new @options.send_hwm = send_hwm if send_hwm @options.recv_hwm = recv_hwm if recv_hwm @options.send_timeout = send_timeout if send_timeout @options.recv_timeout = recv_timeout if recv_timeout @options.conflate = conflate @options.on_mute = on_mute if on_mute @engine = case backend when nil, :ruby Engine.new(socket_type, @options) when :ffi FFI::Engine.new(socket_type, @options) else raise ArgumentError, "unknown backend: #{backend}" end end |
#inspect ⇒ String
272 273 274 |
# File 'lib/omq/socket.rb', line 272 def inspect format("#<%s bound=%p>", self.class, @engine.listeners.keys) end |
#monitor(verbose: false) {|event| ... } ⇒ Async::Task
Yields lifecycle events for this socket.
Spawns a background fiber that reads from an internal event queue. The block receives MonitorEvent instances until the socket is closed or the returned task is stopped.
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/omq/socket.rb', line 203 def monitor(verbose: false, &block) ensure_parent_task queue = Async::LimitedQueue.new(64) @engine.monitor_queue = queue @engine.verbose_monitor = verbose Reactor.run do @engine.parent_task.async(transient: true, annotation: "monitor") do while (event = queue.dequeue) block.call(event) end rescue Async::Stop ensure @engine.monitor_queue = nil block.call(MonitorEvent.new(type: :monitor_stopped)) end end end |
#peer_connected ⇒ Async::Promise
Returns resolves when first peer completes handshake.
148 149 150 |
# File 'lib/omq/socket.rb', line 148 def peer_connected @engine.peer_connected end |
#reconnect_enabled=(val) ⇒ void
This method returns an undefined value.
Disable auto-reconnect for connected endpoints.
229 230 231 |
# File 'lib/omq/socket.rb', line 229 def reconnect_enabled=(val) @engine.reconnect_enabled = val end |
#set_unbounded ⇒ nil
Set socket to use unbounded pipes (HWM=0).
263 264 265 266 267 |
# File 'lib/omq/socket.rb', line 263 def set_unbounded @options.send_hwm = 0 @options.recv_hwm = 0 nil end |
#stop ⇒ nil
Immediate hard stop. Skips the linger drain and cascades teardown through the socket-level Async::Barrier. Intended for crash-path cleanup or when the caller already knows no pending sends matter.
253 254 255 256 |
# File 'lib/omq/socket.rb', line 253 def stop Reactor.run { @engine.stop } # TODO: use timeout? nil end |
#subscriber_joined ⇒ Async::Promise
Returns resolves when first subscriber joins (PUB/XPUB only).
154 155 156 |
# File 'lib/omq/socket.rb', line 154 def subscriber_joined @engine.subscriber_joined end |