Class: Takagi::Message::RetransmissionManager
- Inherits:
-
Object
- Object
- Takagi::Message::RetransmissionManager
- Defined in:
- lib/takagi/message/retransmission_manager.rb
Overview
Implements CoAP retransmission logic as per RFC 7252 Section 4.2
For CON (Confirmable) messages, the client MUST retransmit the message until it receives an ACK, RST, or the transmission times out.
RFC 7252 §4.2: Retransmission uses exponential back-off with random factor RFC 7252 §4.8: Default transmission parameters
Defined Under Namespace
Classes: PendingTransmission
Constant Summary collapse
- ACK_TIMEOUT =
RFC 7252 §4.8: Transmission parameters
2.0- ACK_RANDOM_FACTOR =
Initial timeout in seconds
1.5- MAX_RETRANSMIT =
Random factor for timeout calculation
4
Instance Method Summary collapse
-
#handle_response(message_id, response_data = nil) ⇒ Object
Handle incoming ACK or RST to cancel retransmission.
-
#initialize(logger: nil) ⇒ RetransmissionManager
constructor
A new instance of RetransmissionManager.
-
#send_confirmable(message_id, message_data, socket, host, port, &callback) ⇒ Object
Send a CON message with automatic retransmission.
-
#start ⇒ Object
Start the retransmission manager background thread.
-
#stats ⇒ Object
Get statistics about pending transmissions.
-
#stop ⇒ Object
Stop the retransmission manager.
Constructor Details
#initialize(logger: nil) ⇒ RetransmissionManager
Returns a new instance of RetransmissionManager.
35 36 37 38 39 40 41 |
# File 'lib/takagi/message/retransmission_manager.rb', line 35 def initialize(logger: nil) @pending = {} @mutex = Mutex.new @logger = logger || Takagi.logger @running = false @thread = nil end |
Instance Method Details
#handle_response(message_id, response_data = nil) ⇒ Object
Handle incoming ACK or RST to cancel retransmission
94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/takagi/message/retransmission_manager.rb', line 94 def handle_response(, response_data = nil) transmission = nil @mutex.synchronize do transmission = @pending.delete() end return unless transmission @logger.debug "Received response for MID: #{}, canceling retransmission" transmission.callback&.call(response_data, nil) end |
#send_confirmable(message_id, message_data, socket, host, port, &callback) ⇒ Object
Send a CON message with automatic retransmission
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/takagi/message/retransmission_manager.rb', line 66 def send_confirmable(, , socket, host, port, &callback) initial_timeout = calculate_timeout(0) transmission = PendingTransmission.new( , , socket, host, port, 0, # attempt initial_timeout, Time.now.to_f + initial_timeout, callback ) @mutex.synchronize do @pending[] = transmission end # Send initial transmission transmit(transmission) @logger.debug "Scheduled CON message (MID: #{}) with timeout #{initial_timeout}s" end |
#start ⇒ Object
Start the retransmission manager background thread
44 45 46 47 48 49 50 |
# File 'lib/takagi/message/retransmission_manager.rb', line 44 def start return if @running @running = true @thread = Thread.new { run_retransmission_loop } @logger.debug 'Retransmission manager started' end |
#stats ⇒ Object
Get statistics about pending transmissions
108 109 110 111 112 113 114 115 |
# File 'lib/takagi/message/retransmission_manager.rb', line 108 def stats @mutex.synchronize do { pending_count: @pending.size, message_ids: @pending.keys } end end |
#stop ⇒ Object
Stop the retransmission manager
53 54 55 56 57 |
# File 'lib/takagi/message/retransmission_manager.rb', line 53 def stop @running = false @thread&.join @logger.debug 'Retransmission manager stopped' end |