Class: ZZQ::Engine::Keepalive
- Inherits:
-
Object
- Object
- ZZQ::Engine::Keepalive
- Defined in:
- lib/zzq/engine/keepalive.rb
Overview
Keepalive driver for a single connection.
role:
* +:sender+ (client default) — sends PINGREQ if locally idle
for keep_alive/2; closes if nothing received for 1.5 × keep_alive.
* +:watcher+ (broker default) — does not send; just closes if
nothing received for 1.5 × keep_alive. MQTT-3.1.2-22 is
enforced by the broker.
One fiber per connection, attached to the connection’s barrier so it tears down with the connection.
Instance Attribute Summary collapse
-
#last_received_at ⇒ Object
Returns the value of attribute last_received_at.
-
#last_sent_at ⇒ Object
Returns the value of attribute last_sent_at.
Instance Method Summary collapse
-
#initialize(conn, keep_alive:, on_timeout:, role: :sender) ⇒ Keepalive
constructor
A new instance of Keepalive.
-
#note_received ⇒ Object
Call this every time a packet is read, to reset the recv-idle timer.
-
#note_sent ⇒ Object
Call this every time a packet is written, to reset the send-idle timer.
- #run ⇒ Object
Constructor Details
#initialize(conn, keep_alive:, on_timeout:, role: :sender) ⇒ Keepalive
Returns a new instance of Keepalive.
24 25 26 27 28 29 30 31 32 |
# File 'lib/zzq/engine/keepalive.rb', line 24 def initialize(conn, keep_alive:, on_timeout:, role: :sender) @conn = conn @keep_alive = keep_alive @on_timeout = on_timeout @role = role now = Async::Clock.now @last_sent_at = now @last_received_at = now end |
Instance Attribute Details
#last_received_at ⇒ Object
Returns the value of attribute last_received_at.
21 22 23 |
# File 'lib/zzq/engine/keepalive.rb', line 21 def last_received_at @last_received_at end |
#last_sent_at ⇒ Object
Returns the value of attribute last_sent_at.
21 22 23 |
# File 'lib/zzq/engine/keepalive.rb', line 21 def last_sent_at @last_sent_at end |
Instance Method Details
#note_received ⇒ Object
Call this every time a packet is read, to reset the recv-idle timer.
44 45 46 |
# File 'lib/zzq/engine/keepalive.rb', line 44 def note_received @last_received_at = Async::Clock.now end |
#note_sent ⇒ Object
Call this every time a packet is written, to reset the send-idle timer.
37 38 39 |
# File 'lib/zzq/engine/keepalive.rb', line 37 def note_sent @last_sent_at = Async::Clock.now end |
#run ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/zzq/engine/keepalive.rb', line 49 def run return if @keep_alive.zero? # 0 disables keepalive loop do sleep(@keep_alive / 2.0) now = Async::Clock.now if (now - @last_received_at) > (@keep_alive * 1.5) @on_timeout.call break end next unless @role == :sender if (now - @last_sent_at) >= (@keep_alive / 2.0) begin @conn.write_packet(Protocol::MQTT::Packet::Pingreq.new) note_sent rescue IOError, Errno::EPIPE, Errno::ECONNRESET break end end end rescue Async::Stop end |