Class: ZZQ::Routing::Client
- Inherits:
-
Object
- Object
- ZZQ::Routing::Client
- Defined in:
- lib/zzq/routing/client.rb
Overview
Client-side routing.
Responsibilities:
* CONNECT / CONNACK handshake on +handle_connected+.
* Per-connection read loop dispatching PINGRESP, DISCONNECT,
PUBLISH (fan-out to matching subscriptions), SUBACK/UNSUBACK
(resolve pending acks), and PUBACK/PUBREC/PUBREL/PUBCOMP
(drive outbound QoS 1/2 state machines).
* Keepalive fiber.
* QoS 0/1/2 publish and subscribe.
Constant Summary collapse
- HANDSHAKE_TIMEOUT =
seconds to wait for CONNACK
10- SUBSCRIBE_TIMEOUT =
seconds to wait for SUBACK / UNSUBACK
30- PUBLISH_TIMEOUT =
seconds to wait for terminal ack on qos>0
30
Instance Attribute Summary collapse
-
#last_connack ⇒ Protocol::MQTT::Packet::Connack?
readonly
The most recent CONNACK received, or nil before the first connect.
Instance Method Summary collapse
- #close ⇒ Object
- #connection_added(_conn) ⇒ Object
- #connection_removed(conn) ⇒ Object
-
#handle_connected(mqtt, lifecycle) ⇒ Object
Called by Engine#handle_connected.
-
#initialize(engine) ⇒ Client
constructor
A new instance of Client.
-
#publish(topic:, payload:, qos: 0, retain: false, properties: {}) ⇒ Object
Send a PUBLISH.
- #session_present? ⇒ Boolean
-
#subscribe(filter, qos: 0, no_local: false, retain_as_published: false, retain_handling: 0) ⇒ Object
Send a SUBSCRIBE, wait for SUBACK, return a Subscription.
Constructor Details
#initialize(engine) ⇒ Client
Returns a new instance of Client.
36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/zzq/routing/client.rb', line 36 def initialize(engine) @engine = engine @conn = nil @keepalive = nil @last_connack = nil @subscriptions = [] # ZZQ::Subscription @packet_ids = PacketIdAllocator.new( receive_maximum: @engine..receive_maximum, ) @qos_tracker = QosTracker.new @pending_ctl = {} # packet_id => Async::Promise (SUBACK / UNSUBACK) @inbound_qos2 = {} # packet_id => Message awaiting PUBREL (manual-ack path; M9) end |
Instance Attribute Details
#last_connack ⇒ Protocol::MQTT::Packet::Connack? (readonly)
Returns the most recent CONNACK received, or nil before the first connect.
33 34 35 |
# File 'lib/zzq/routing/client.rb', line 33 def last_connack @last_connack end |
Instance Method Details
#close ⇒ Object
104 105 106 107 108 109 110 111 112 |
# File 'lib/zzq/routing/client.rb', line 104 def close return unless @conn begin @conn.write_packet(Protocol::MQTT::Packet::Disconnect.new) rescue IOError, Errno::EPIPE, Errno::ECONNRESET, ClosedError # peer already gone; fine end @conn = nil end |
#connection_added(_conn) ⇒ Object
85 |
# File 'lib/zzq/routing/client.rb', line 85 def connection_added(_conn); end |
#connection_removed(conn) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/zzq/routing/client.rb', line 88 def connection_removed(conn) return unless @conn.equal?(conn) @conn = nil @subscriptions.each(&:close) @subscriptions.clear @pending_ctl.each_value { |p| p.resolve(nil) unless p.resolved? } @pending_ctl.clear # Wake any fiber blocked on a publish ack. Don't clear the # tracker yet — M6 retransmit-on-reconnect will want to inspect # +inflight_outbound+. For now, just unblock with nil. @qos_tracker.inflight_outbound.each do |pending| pending.promise.resolve(nil) unless pending.promise.resolved? end end |
#handle_connected(mqtt, lifecycle) ⇒ Object
Called by Engine#handle_connected. Runs inside the caller’s Async task (Socket#connect → Engine.connect → Transport.connect → Engine.handle_connected), so we can synchronously drive the CONNECT/CONNACK exchange and raise to the caller on refusal.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/zzq/routing/client.rb', line 60 def handle_connected(mqtt, lifecycle) opts = @engine. mqtt.version = opts.version connect_packet = build_connect(opts) Async::Task.current.with_timeout(HANDSHAKE_TIMEOUT) do mqtt.write_packet(connect_packet) reply = mqtt.read_packet raise ConnectionRefused.new(0, "no CONNACK from broker") if reply.nil? raise ConnectionRefused.new(0, "expected CONNACK, got #{reply.class.name}") unless reply.is_a?(Protocol::MQTT::Packet::Connack) unless success_connack?(reply) raise ConnectionRefused.new(reply.reason_code) end @last_connack = reply end @conn = mqtt lifecycle.handshake_succeeded! start_read_loop(mqtt, lifecycle) start_keepalive(mqtt, lifecycle) end |
#publish(topic:, payload:, qos: 0, retain: false, properties: {}) ⇒ Object
Send a PUBLISH. For QoS 0 returns immediately after write. For QoS 1 blocks the fiber until PUBACK. For QoS 2 blocks until PUBCOMP.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/zzq/routing/client.rb', line 118 def publish(topic:, payload:, qos: 0, retain: false, properties: {}) raise ClosedError, "client not connected" unless @conn if qos.zero? pkt = Protocol::MQTT::Packet::Publish.new( topic: topic, payload: payload, qos: 0, retain: retain, properties: properties, ) @conn.write_packet(pkt) @keepalive&.note_sent return nil end packet_id = @packet_ids.acquire pkt = Protocol::MQTT::Packet::Publish.new( topic: topic, payload: payload, qos: qos, retain: retain, packet_id: packet_id, properties: properties, ) pending = qos == 1 \ ? @qos_tracker.track_qos1(packet_id, pkt) \ : @qos_tracker.track_qos2(packet_id, pkt) begin @conn.write_packet(pkt) @keepalive&.note_sent result = Async::Task.current.with_timeout(PUBLISH_TIMEOUT) { pending.promise.wait } raise ClosedError, "connection lost before publish ack" if result.nil? ensure @packet_ids.release(packet_id) end nil end |
#session_present? ⇒ Boolean
51 52 53 |
# File 'lib/zzq/routing/client.rb', line 51 def session_present? @last_connack&.session_present || false end |
#subscribe(filter, qos: 0, no_local: false, retain_as_published: false, retain_handling: 0) ⇒ Object
Send a SUBSCRIBE, wait for SUBACK, return a Subscription.
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/zzq/routing/client.rb', line 151 def subscribe(filter, qos: 0, no_local: false, retain_as_published: false, retain_handling: 0) raise ClosedError, "client not connected" unless @conn packet_id = @packet_ids.acquire filter_entry = { filter: filter, qos: qos, no_local: no_local, retain_as_published: retain_as_published, retain_handling: retain_handling, } subscription = Subscription.new( filters: [filter], hwm: @engine..recv_hwm, on_close: ->(sub) { send_unsubscribe(sub) }, ) @subscriptions << subscription promise = Async::Promise.new @pending_ctl[packet_id] = promise begin @conn.write_packet(Protocol::MQTT::Packet::Subscribe.new( packet_id: packet_id, filters: [filter_entry], )) @keepalive&.note_sent suback = Async::Task.current.with_timeout(SUBSCRIBE_TIMEOUT) { promise.wait } raise ClosedError, "connection lost before SUBACK" if suback.nil? reason = suback.reason_codes.first if reason >= 0x80 @subscriptions.delete(subscription) raise Error, "SUBSCRIBE refused: reason=0x#{reason.to_s(16)}" end ensure @packet_ids.release(packet_id) end subscription end |