Class: ZZQ::Engine::Keepalive

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/engine/keepalive.rb

Overview

Keepalive driver for a single connection.

role:

* +:sender+ (client default) — sends PINGREQ if locally idle
  for keep_alive/2; closes if nothing received for 1.5 × keep_alive.
* +:watcher+ (broker default) — does not send; just closes if
  nothing received for 1.5 × keep_alive. MQTT-3.1.2-22 is
  enforced by the broker.

One fiber per connection, attached to the connection’s barrier so it tears down with the connection.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn, keep_alive:, on_timeout:, role: :sender) ⇒ Keepalive

Returns a new instance of Keepalive.



24
25
26
27
28
29
30
31
32
# File 'lib/zzq/engine/keepalive.rb', line 24

def initialize(conn, keep_alive:, on_timeout:, role: :sender)
  @conn = conn
  @keep_alive = keep_alive
  @on_timeout = on_timeout
  @role = role
  now = Async::Clock.now
  @last_sent_at = now
  @last_received_at = now
end

Instance Attribute Details

#last_received_atObject

Returns the value of attribute last_received_at.



21
22
23
# File 'lib/zzq/engine/keepalive.rb', line 21

def last_received_at
  @last_received_at
end

#last_sent_atObject

Returns the value of attribute last_sent_at.



21
22
23
# File 'lib/zzq/engine/keepalive.rb', line 21

def last_sent_at
  @last_sent_at
end

Instance Method Details

#note_receivedObject

Call this every time a packet is read, to reset the recv-idle timer.



44
45
46
# File 'lib/zzq/engine/keepalive.rb', line 44

def note_received
  @last_received_at = Async::Clock.now
end

#note_sentObject

Call this every time a packet is written, to reset the send-idle timer.



37
38
39
# File 'lib/zzq/engine/keepalive.rb', line 37

def note_sent
  @last_sent_at = Async::Clock.now
end

#runObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/zzq/engine/keepalive.rb', line 49

def run
  return if @keep_alive.zero?  # 0 disables keepalive
  loop do
    sleep(@keep_alive / 2.0)
    now = Async::Clock.now

    if (now - @last_received_at) > (@keep_alive * 1.5)
      @on_timeout.call
      break
    end

    next unless @role == :sender

    if (now - @last_sent_at) >= (@keep_alive / 2.0)
      begin
        @conn.write_packet(Protocol::MQTT::Packet::Pingreq.new)
        note_sent
      rescue IOError, Errno::EPIPE, Errno::ECONNRESET
        break
      end
    end
  end
rescue Async::Stop
end