Class: ZZQ::Client
Overview
MQTT client. Owns exactly one MQTT session at a time (possibly persisted across reconnects), subscriptions, and a QoS tracker for inflight PUB flows.
M3 stub: only the constructor and #build_routing stand. The CONNECT/CONNACK handshake and publish/subscribe plumbing come in M4 / M5.
Instance Attribute Summary
Attributes inherited from Socket
Instance Method Summary collapse
- #build_routing(engine) ⇒ Object
-
#last_connack ⇒ Object
Most recent CONNACK received from the broker, or nil before the first successful connect.
-
#publish(topic, payload, qos: 0, retain: false, properties: {}) ⇒ Object
Publish a message.
- #session_present? ⇒ Boolean
-
#subscribe(filter, qos: 0, no_local: false, retain_as_published: false, retain_handling: 0) ⇒ Object
Subscribe to a topic filter.
Methods inherited from Socket
#all_peers_gone, bind, #bind, #close, #coerce_binary, connect, #connect, #connection_count, #initialize, #last_endpoint, #monitor, #peer_connected
Constructor Details
This class inherits a constructor from ZZQ::Socket
Instance Method Details
#build_routing(engine) ⇒ Object
14 15 16 |
# File 'lib/zzq/client.rb', line 14 def build_routing(engine) Routing::Client.new(engine) end |
#last_connack ⇒ Object
Most recent CONNACK received from the broker, or nil before the first successful connect.
21 |
# File 'lib/zzq/client.rb', line 21 def last_connack = @engine.routing.last_connack |
#publish(topic, payload, qos: 0, retain: false, properties: {}) ⇒ Object
Publish a message. For QoS 0 returns once the packet is written. For QoS 1 blocks until PUBACK; for QoS 2 blocks until PUBCOMP.
28 29 30 31 32 33 34 |
# File 'lib/zzq/client.rb', line 28 def publish(topic, payload, qos: 0, retain: false, properties: {}) raise NotInAsyncContext unless Async::Task.current? @engine.routing.publish( topic: topic, payload: coerce_binary(payload), qos: qos, retain: retain, properties: properties, ) end |
#session_present? ⇒ Boolean
22 |
# File 'lib/zzq/client.rb', line 22 def session_present? = @engine.routing.session_present? |
#subscribe(filter, qos: 0, no_local: false, retain_as_published: false, retain_handling: 0) ⇒ Object
Subscribe to a topic filter. Returns a Subscription whose queue receives matching messages until it is closed.
39 40 41 42 43 44 45 46 47 |
# File 'lib/zzq/client.rb', line 39 def subscribe(filter, qos: 0, no_local: false, retain_as_published: false, retain_handling: 0) raise NotInAsyncContext unless Async::Task.current? @engine.routing.subscribe( filter, qos: qos, no_local: no_local, retain_as_published: retain_as_published, retain_handling: retain_handling, ) end |