Class: ZZQ::Socket
- Inherits:
-
Object
- Object
- ZZQ::Socket
- Defined in:
- lib/zzq/socket.rb
Overview
Abstract base class for Client and Broker. Owns the Engine and exposes the infrastructure API (bind / connect / close / monitor / peer_connected / all_peers_gone). The role-specific API (subscribe / publish / ingest) lives on the subclasses.
ZZQ is pure-Async: every public method that performs I/O must run inside an Async::Task. Either wrap calls in ‘Async do … end`, or use the block form of #connect / #bind which enters `Async{}` for you and closes the socket on block exit.
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Class Method Summary collapse
Instance Method Summary collapse
- #all_peers_gone ⇒ Object
-
#bind(endpoint, **opts, &block) ⇒ Object
Binds to
endpoint. - #close ⇒ Object
-
#coerce_binary(body) ⇒ Object
Coerces
bodyto a frozen BINARY String. -
#connect(endpoint, **opts, &block) ⇒ Object
Connects to
endpoint. - #connection_count ⇒ Object
-
#initialize(**opts) ⇒ Socket
constructor
A new instance of Socket.
- #last_endpoint ⇒ Object
-
#monitor(&block) ⇒ Object
Yields lifecycle events for this socket until it’s closed.
- #peer_connected ⇒ Object
Constructor Details
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
21 22 23 |
# File 'lib/zzq/socket.rb', line 21 def @options end |
Class Method Details
.bind(endpoint, **opts, &block) ⇒ Object
24 25 26 27 28 29 30 31 32 |
# File 'lib/zzq/socket.rb', line 24 def self.bind(endpoint, **opts, &block) sock = new(**opts) if block sock.bind(endpoint, &block) else sock.bind(endpoint) sock end end |
.connect(endpoint, **opts, &block) ⇒ Object
35 36 37 38 39 40 41 42 43 |
# File 'lib/zzq/socket.rb', line 35 def self.connect(endpoint, **opts, &block) sock = new(**opts) if block sock.connect(endpoint, &block) else sock.connect(endpoint) sock end end |
Instance Method Details
#all_peers_gone ⇒ Object
80 |
# File 'lib/zzq/socket.rb', line 80 def all_peers_gone = @engine.all_peers_gone |
#bind(endpoint, **opts, &block) ⇒ Object
Binds to endpoint. If a block is given, wraps the call in ‘Async do … end` (if not already inside one), yields the owning Async::Task to the block, and closes the socket on block exit.
55 56 57 |
# File 'lib/zzq/socket.rb', line 55 def bind(endpoint, **opts, &block) block ? run_in_async(endpoint, :bind, opts, &block) : run_sync(:bind, endpoint, **opts) end |
#close ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/zzq/socket.rb', line 67 def close if Async::Task.current? @engine.close else Async { @engine.close }.wait end nil end |
#coerce_binary(body) ⇒ Object
Coerces body to a frozen BINARY String. MQTT payloads are opaque bytes — uniform frozen+BINARY contract mirrors NNQ.
103 104 105 106 107 |
# File 'lib/zzq/socket.rb', line 103 def coerce_binary(body) body = body.to_str unless body.is_a?(String) body.force_encoding(Encoding::BINARY) unless body.frozen? || body.encoding == Encoding::BINARY body.freeze end |
#connect(endpoint, **opts, &block) ⇒ Object
Connects to endpoint. Block form yields the owning Async::Task and auto-closes on block exit, same as #bind.
62 63 64 |
# File 'lib/zzq/socket.rb', line 62 def connect(endpoint, **opts, &block) block ? run_in_async(endpoint, :connect, opts, &block) : run_sync(:connect, endpoint, **opts) end |
#connection_count ⇒ Object
78 |
# File 'lib/zzq/socket.rb', line 78 def connection_count = @engine.connections.size |
#last_endpoint ⇒ Object
77 |
# File 'lib/zzq/socket.rb', line 77 def last_endpoint = @engine.last_endpoint |
#monitor(&block) ⇒ Object
Yields lifecycle events for this socket until it’s closed. Must be called from within an Async::Task.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/zzq/socket.rb', line 85 def monitor(&block) raise NotInAsyncContext unless Async::Task.current? queue = Async::Queue.new @engine.monitor_queue = queue @engine.spawn_task(annotation: "zzq monitor") do while (event = queue.dequeue) block.call(event) end rescue Async::Stop ensure @engine.monitor_queue = nil block.call(MonitorEvent.new(type: :monitor_stopped)) end end |
#peer_connected ⇒ Object
79 |
# File 'lib/zzq/socket.rb', line 79 def peer_connected = @engine.peer_connected |