Class: Takagi::Message::RetransmissionManager

Inherits:
Object
  • Object
show all
Defined in:
lib/takagi/message/retransmission_manager.rb

Overview

Implements CoAP retransmission logic as per RFC 7252 Section 4.2

For CON (Confirmable) messages, the client MUST retransmit the message until it receives an ACK, RST, or the transmission times out.

RFC 7252 §4.2: Retransmission uses exponential back-off with random factor RFC 7252 §4.8: Default transmission parameters

Defined Under Namespace

Classes: PendingTransmission

Constant Summary collapse

ACK_TIMEOUT =

RFC 7252 §4.8: Transmission parameters

2.0
ACK_RANDOM_FACTOR =

Initial timeout in seconds

1.5
MAX_RETRANSMIT =

Random factor for timeout calculation

4

Instance Method Summary collapse

Constructor Details

#initialize(logger: nil) ⇒ RetransmissionManager

Returns a new instance of RetransmissionManager.



35
36
37
38
39
40
41
# File 'lib/takagi/message/retransmission_manager.rb', line 35

def initialize(logger: nil)
  @pending = {}
  @mutex = Mutex.new
  @logger = logger || Takagi.logger
  @running = false
  @thread = nil
end

Instance Method Details

#handle_response(message_id, response_data = nil) ⇒ Object

Handle incoming ACK or RST to cancel retransmission

Parameters:

  • message_id (Integer)

    CoAP Message ID

  • response_data (String) (defaults to: nil)

    Response message data



94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/takagi/message/retransmission_manager.rb', line 94

def handle_response(message_id, response_data = nil)
  transmission = nil

  @mutex.synchronize do
    transmission = @pending.delete(message_id)
  end

  return unless transmission

  @logger.debug "Received response for MID: #{message_id}, canceling retransmission"
  transmission.callback&.call(response_data, nil)
end

#send_confirmable(message_id, message_data, socket, host, port, &callback) ⇒ Object

Send a CON message with automatic retransmission

Parameters:

  • message_id (Integer)

    CoAP Message ID

  • message_data (String)

    Serialized message bytes

  • socket (UDPSocket)

    Socket to send on

  • host (String)

    Destination host

  • port (Integer)

    Destination port

  • callback (Proc)

    Called with response or timeout error



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/takagi/message/retransmission_manager.rb', line 66

def send_confirmable(message_id, message_data, socket, host, port, &callback)
  initial_timeout = calculate_timeout(0)

  transmission = PendingTransmission.new(
    message_id,
    message_data,
    socket,
    host,
    port,
    0, # attempt
    initial_timeout,
    Time.now.to_f + initial_timeout,
    callback
  )

  @mutex.synchronize do
    @pending[message_id] = transmission
  end

  # Send initial transmission
  transmit(transmission)

  @logger.debug "Scheduled CON message (MID: #{message_id}) with timeout #{initial_timeout}s"
end

#startObject

Start the retransmission manager background thread



44
45
46
47
48
49
50
# File 'lib/takagi/message/retransmission_manager.rb', line 44

def start
  return if @running

  @running = true
  @thread = Thread.new { run_retransmission_loop }
  @logger.debug 'Retransmission manager started'
end

#statsObject

Get statistics about pending transmissions



108
109
110
111
112
113
114
115
# File 'lib/takagi/message/retransmission_manager.rb', line 108

def stats
  @mutex.synchronize do
    {
      pending_count: @pending.size,
      message_ids: @pending.keys
    }
  end
end

#stopObject

Stop the retransmission manager



53
54
55
56
57
# File 'lib/takagi/message/retransmission_manager.rb', line 53

def stop
  @running = false
  @thread&.join
  @logger.debug 'Retransmission manager stopped'
end