Class: ZZQ::Routing::PacketIdAllocator
- Inherits:
-
Object
- Object
- ZZQ::Routing::PacketIdAllocator
- Defined in:
- lib/zzq/routing/packet_id_allocator.rb
Overview
u16 packet-id allocator with an optional receive-maximum cap. Thread-unsafe (designed to be owned by a single connection’s writer fiber). Packet id 0 is reserved (MQTT-2.2.1-3); we wrap 1..65535.
When the outstanding count reaches receive_maximum, #acquire blocks the caller fiber until a later #release frees a slot.
Instance Method Summary collapse
-
#acquire ⇒ Object
Acquire a fresh packet id.
-
#initialize(receive_maximum: 65_535) ⇒ PacketIdAllocator
constructor
A new instance of PacketIdAllocator.
- #outstanding?(id) ⇒ Boolean
-
#release(id) ⇒ Object
Mark
idfree. - #size ⇒ Object
Constructor Details
#initialize(receive_maximum: 65_535) ⇒ PacketIdAllocator
Returns a new instance of PacketIdAllocator.
15 16 17 18 19 20 |
# File 'lib/zzq/routing/packet_id_allocator.rb', line 15 def initialize(receive_maximum: 65_535) @receive_maximum = receive_maximum @outstanding = {} # packet_id => true @next = 0 @free_cond = Async::Condition.new end |
Instance Method Details
#acquire ⇒ Object
Acquire a fresh packet id. Blocks the calling fiber if the receive-maximum quota is exhausted.
25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/zzq/routing/packet_id_allocator.rb', line 25 def acquire while @outstanding.size >= @receive_maximum @free_cond.wait end id = loop do @next = (@next % 0xFFFF) + 1 break @next unless @outstanding.key?(@next) end @outstanding[id] = true id end |
#outstanding?(id) ⇒ Boolean
46 47 48 |
# File 'lib/zzq/routing/packet_id_allocator.rb', line 46 def outstanding?(id) @outstanding.key?(id) end |
#release(id) ⇒ Object
Mark id free. Wakes one waiter if any.
39 40 41 42 43 |
# File 'lib/zzq/routing/packet_id_allocator.rb', line 39 def release(id) return false unless @outstanding.delete(id) @free_cond.signal true end |
#size ⇒ Object
51 |
# File 'lib/zzq/routing/packet_id_allocator.rb', line 51 def size = @outstanding.size |