Class: ZZQ::Routing::QosTracker
- Inherits:
-
Object
- Object
- ZZQ::Routing::QosTracker
- Defined in:
- lib/zzq/routing/qos_tracker.rb
Overview
Outbound inflight tracking + inbound QoS 2 dedup for one session.
Each side of a QoS 1/2 exchange owns one tracker. The publisher side records a PendingPublish per outbound packet id, awaiting a PUBACK (QoS 1) or completing via PUBREC/PUBREL/PUBCOMP (QoS 2). The subscriber side remembers which QoS 2 packet ids have been received-but-not-yet-released so duplicate PUBLISH packets with the same id are dropped.
Defined Under Namespace
Classes: PendingPublish
Instance Method Summary collapse
- #inbound_size ⇒ Object
-
#inflight_outbound ⇒ Object
Snapshot of still-pending outbound publications, used on reconnect for retransmission.
-
#initialize ⇒ QosTracker
constructor
A new instance of QosTracker.
-
#on_puback(packet_id) ⇒ Object
Called on inbound PUBACK.
-
#on_pubcomp(packet_id) ⇒ Object
Called on inbound PUBCOMP.
-
#on_pubrec(packet_id) ⇒ Object
Called on inbound PUBREC.
-
#on_pubrel(packet_id) ⇒ Object
Inbound PUBREL: peer tells us to release the held qos2 id.
-
#on_qos2_publish(packet_id) ⇒ Object
Inbound QoS 2 PUBLISH dedup.
- #outbound_size ⇒ Object
-
#track_qos1(packet_id, packet) ⇒ Object
Register an outbound QoS 1 publish.
-
#track_qos2(packet_id, packet) ⇒ Object
Register an outbound QoS 2 publish (awaiting PUBREC).
Constructor Details
#initialize ⇒ QosTracker
Returns a new instance of QosTracker.
26 27 28 29 |
# File 'lib/zzq/routing/qos_tracker.rb', line 26 def initialize @outbound = {} # packet_id => PendingPublish @inbound_qos2 = {} # packet_id => true (received, awaiting PUBREL) end |
Instance Method Details
#inbound_size ⇒ Object
105 |
# File 'lib/zzq/routing/qos_tracker.rb', line 105 def inbound_size = @inbound_qos2.size |
#inflight_outbound ⇒ Object
Snapshot of still-pending outbound publications, used on reconnect for retransmission.
101 |
# File 'lib/zzq/routing/qos_tracker.rb', line 101 def inflight_outbound = @outbound.values |
#on_puback(packet_id) ⇒ Object
Called on inbound PUBACK. Returns the PendingPublish that completed, or nil if the id was unknown / stage mismatched.
54 55 56 57 58 59 60 |
# File 'lib/zzq/routing/qos_tracker.rb', line 54 def on_puback(packet_id) pending = @outbound[packet_id] return nil unless pending && pending.stage == :waiting_puback @outbound.delete(packet_id) pending.promise.resolve(:acked) unless pending.promise.resolved? pending end |
#on_pubcomp(packet_id) ⇒ Object
Called on inbound PUBCOMP. Completes a QoS 2 exchange.
75 76 77 78 79 80 81 |
# File 'lib/zzq/routing/qos_tracker.rb', line 75 def on_pubcomp(packet_id) pending = @outbound[packet_id] return nil unless pending && pending.stage == :waiting_pubcomp @outbound.delete(packet_id) pending.promise.resolve(:acked) unless pending.promise.resolved? pending end |
#on_pubrec(packet_id) ⇒ Object
Called on inbound PUBREC. Transitions the entry to :waiting_pubcomp and returns it so the caller can send PUBREL. Returns nil if unknown / stage mismatched.
66 67 68 69 70 71 |
# File 'lib/zzq/routing/qos_tracker.rb', line 66 def on_pubrec(packet_id) pending = @outbound[packet_id] return nil unless pending && pending.stage == :waiting_pubrec pending.stage = :waiting_pubcomp pending end |
#on_pubrel(packet_id) ⇒ Object
Inbound PUBREL: peer tells us to release the held qos2 id.
94 95 96 |
# File 'lib/zzq/routing/qos_tracker.rb', line 94 def on_pubrel(packet_id) @inbound_qos2.delete(packet_id) end |
#on_qos2_publish(packet_id) ⇒ Object
Inbound QoS 2 PUBLISH dedup. Returns true if this id is a new (non-duplicate) reception; false if it’s a duplicate.
86 87 88 89 90 |
# File 'lib/zzq/routing/qos_tracker.rb', line 86 def on_qos2_publish(packet_id) return false if @inbound_qos2.key?(packet_id) @inbound_qos2[packet_id] = true true end |
#outbound_size ⇒ Object
104 |
# File 'lib/zzq/routing/qos_tracker.rb', line 104 def outbound_size = @outbound.size |
#track_qos1(packet_id, packet) ⇒ Object
Register an outbound QoS 1 publish. Returns the PendingPublish.
33 34 35 36 37 38 39 |
# File 'lib/zzq/routing/qos_tracker.rb', line 33 def track_qos1(packet_id, packet) promise = Async::Promise.new @outbound[packet_id] = PendingPublish.new( packet_id: packet_id, packet: packet, stage: :waiting_puback, promise: promise, ) end |
#track_qos2(packet_id, packet) ⇒ Object
Register an outbound QoS 2 publish (awaiting PUBREC).
43 44 45 46 47 48 49 |
# File 'lib/zzq/routing/qos_tracker.rb', line 43 def track_qos2(packet_id, packet) promise = Async::Promise.new @outbound[packet_id] = PendingPublish.new( packet_id: packet_id, packet: packet, stage: :waiting_pubrec, promise: promise, ) end |