Class: ZZQ::Routing::PacketIdAllocator

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/routing/packet_id_allocator.rb

Overview

u16 packet-id allocator with an optional receive-maximum cap. Thread-unsafe (designed to be owned by a single connection’s writer fiber). Packet id 0 is reserved (MQTT-2.2.1-3); we wrap 1..65535.

When the outstanding count reaches receive_maximum, #acquire blocks the caller fiber until a later #release frees a slot.

Instance Method Summary collapse

Constructor Details

#initialize(receive_maximum: 65_535) ⇒ PacketIdAllocator

Returns a new instance of PacketIdAllocator.



15
16
17
18
19
20
# File 'lib/zzq/routing/packet_id_allocator.rb', line 15

def initialize(receive_maximum: 65_535)
  @receive_maximum = receive_maximum
  @outstanding     = {}  # packet_id => true
  @next            = 0
  @free_cond       = Async::Condition.new
end

Instance Method Details

#acquireObject

Acquire a fresh packet id. Blocks the calling fiber if the receive-maximum quota is exhausted.



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/zzq/routing/packet_id_allocator.rb', line 25

def acquire
  while @outstanding.size >= @receive_maximum
    @free_cond.wait
  end
  id = loop do
    @next = (@next % 0xFFFF) + 1
    break @next unless @outstanding.key?(@next)
  end
  @outstanding[id] = true
  id
end

#outstanding?(id) ⇒ Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/zzq/routing/packet_id_allocator.rb', line 46

def outstanding?(id)
  @outstanding.key?(id)
end

#release(id) ⇒ Object

Mark id free. Wakes one waiter if any.



39
40
41
42
43
# File 'lib/zzq/routing/packet_id_allocator.rb', line 39

def release(id)
  return false unless @outstanding.delete(id)
  @free_cond.signal
  true
end

#sizeObject



51
# File 'lib/zzq/routing/packet_id_allocator.rb', line 51

def size = @outstanding.size