Class: ZZQ::Routing::QosTracker

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/routing/qos_tracker.rb

Overview

Outbound inflight tracking + inbound QoS 2 dedup for one session.

Each side of a QoS 1/2 exchange owns one tracker. The publisher side records a PendingPublish per outbound packet id, awaiting a PUBACK (QoS 1) or completing via PUBREC/PUBREL/PUBCOMP (QoS 2). The subscriber side remembers which QoS 2 packet ids have been received-but-not-yet-released so duplicate PUBLISH packets with the same id are dropped.

Defined Under Namespace

Classes: PendingPublish

Instance Method Summary collapse

Constructor Details

#initializeQosTracker

Returns a new instance of QosTracker.



26
27
28
29
# File 'lib/zzq/routing/qos_tracker.rb', line 26

def initialize
  @outbound       = {}   # packet_id => PendingPublish
  @inbound_qos2   = {}   # packet_id => true (received, awaiting PUBREL)
end

Instance Method Details

#inbound_sizeObject



105
# File 'lib/zzq/routing/qos_tracker.rb', line 105

def inbound_size  = @inbound_qos2.size

#inflight_outboundObject

Snapshot of still-pending outbound publications, used on reconnect for retransmission.



101
# File 'lib/zzq/routing/qos_tracker.rb', line 101

def inflight_outbound = @outbound.values

#on_puback(packet_id) ⇒ Object

Called on inbound PUBACK. Returns the PendingPublish that completed, or nil if the id was unknown / stage mismatched.



54
55
56
57
58
59
60
# File 'lib/zzq/routing/qos_tracker.rb', line 54

def on_puback(packet_id)
  pending = @outbound[packet_id]
  return nil unless pending && pending.stage == :waiting_puback
  @outbound.delete(packet_id)
  pending.promise.resolve(:acked) unless pending.promise.resolved?
  pending
end

#on_pubcomp(packet_id) ⇒ Object

Called on inbound PUBCOMP. Completes a QoS 2 exchange.



75
76
77
78
79
80
81
# File 'lib/zzq/routing/qos_tracker.rb', line 75

def on_pubcomp(packet_id)
  pending = @outbound[packet_id]
  return nil unless pending && pending.stage == :waiting_pubcomp
  @outbound.delete(packet_id)
  pending.promise.resolve(:acked) unless pending.promise.resolved?
  pending
end

#on_pubrec(packet_id) ⇒ Object

Called on inbound PUBREC. Transitions the entry to :waiting_pubcomp and returns it so the caller can send PUBREL. Returns nil if unknown / stage mismatched.



66
67
68
69
70
71
# File 'lib/zzq/routing/qos_tracker.rb', line 66

def on_pubrec(packet_id)
  pending = @outbound[packet_id]
  return nil unless pending && pending.stage == :waiting_pubrec
  pending.stage = :waiting_pubcomp
  pending
end

#on_pubrel(packet_id) ⇒ Object

Inbound PUBREL: peer tells us to release the held qos2 id.



94
95
96
# File 'lib/zzq/routing/qos_tracker.rb', line 94

def on_pubrel(packet_id)
  @inbound_qos2.delete(packet_id)
end

#on_qos2_publish(packet_id) ⇒ Object

Inbound QoS 2 PUBLISH dedup. Returns true if this id is a new (non-duplicate) reception; false if it’s a duplicate.



86
87
88
89
90
# File 'lib/zzq/routing/qos_tracker.rb', line 86

def on_qos2_publish(packet_id)
  return false if @inbound_qos2.key?(packet_id)
  @inbound_qos2[packet_id] = true
  true
end

#outbound_sizeObject



104
# File 'lib/zzq/routing/qos_tracker.rb', line 104

def outbound_size = @outbound.size

#track_qos1(packet_id, packet) ⇒ Object

Register an outbound QoS 1 publish. Returns the PendingPublish.



33
34
35
36
37
38
39
# File 'lib/zzq/routing/qos_tracker.rb', line 33

def track_qos1(packet_id, packet)
  promise = Async::Promise.new
  @outbound[packet_id] = PendingPublish.new(
    packet_id: packet_id, packet: packet, stage: :waiting_puback,
    promise: promise,
  )
end

#track_qos2(packet_id, packet) ⇒ Object

Register an outbound QoS 2 publish (awaiting PUBREC).



43
44
45
46
47
48
49
# File 'lib/zzq/routing/qos_tracker.rb', line 43

def track_qos2(packet_id, packet)
  promise = Async::Promise.new
  @outbound[packet_id] = PendingPublish.new(
    packet_id: packet_id, packet: packet, stage: :waiting_pubrec,
    promise: promise,
  )
end