Class: ZZQ::Routing::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/routing/client.rb

Overview

Client-side routing.

Responsibilities:

* CONNECT / CONNACK handshake on +handle_connected+.
* Per-connection read loop dispatching PINGRESP, DISCONNECT,
  PUBLISH (fan-out to matching subscriptions), SUBACK/UNSUBACK
  (resolve pending acks), and PUBACK/PUBREC/PUBREL/PUBCOMP
  (drive outbound QoS 1/2 state machines).
* Keepalive fiber.
* QoS 0/1/2 publish and subscribe.

Constant Summary collapse

HANDSHAKE_TIMEOUT =

seconds to wait for CONNACK

10
SUBSCRIBE_TIMEOUT =

seconds to wait for SUBACK / UNSUBACK

30
PUBLISH_TIMEOUT =

seconds to wait for terminal ack on qos>0

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Client

Returns a new instance of Client.



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/zzq/routing/client.rb', line 36

def initialize(engine)
  @engine        = engine
  @conn          = nil
  @keepalive     = nil
  @last_connack  = nil
  @subscriptions = []   # ZZQ::Subscription
  @packet_ids    = PacketIdAllocator.new(
    receive_maximum: @engine.options.receive_maximum,
  )
  @qos_tracker   = QosTracker.new
  @pending_ctl   = {}   # packet_id => Async::Promise (SUBACK / UNSUBACK)
  @inbound_qos2  = {}   # packet_id => Message awaiting PUBREL (manual-ack path; M9)
end

Instance Attribute Details

#last_connackProtocol::MQTT::Packet::Connack? (readonly)

Returns the most recent CONNACK received, or nil before the first connect.

Returns:

  • (Protocol::MQTT::Packet::Connack, nil)

    the most recent CONNACK received, or nil before the first connect.



33
34
35
# File 'lib/zzq/routing/client.rb', line 33

def last_connack
  @last_connack
end

Instance Method Details

#closeObject



104
105
106
107
108
109
110
111
112
# File 'lib/zzq/routing/client.rb', line 104

def close
  return unless @conn
  begin
    @conn.write_packet(Protocol::MQTT::Packet::Disconnect.new)
  rescue IOError, Errno::EPIPE, Errno::ECONNRESET, ClosedError
    # peer already gone; fine
  end
  @conn = nil
end

#connection_added(_conn) ⇒ Object



85
# File 'lib/zzq/routing/client.rb', line 85

def connection_added(_conn); end

#connection_removed(conn) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/zzq/routing/client.rb', line 88

def connection_removed(conn)
  return unless @conn.equal?(conn)
  @conn = nil
  @subscriptions.each(&:close)
  @subscriptions.clear
  @pending_ctl.each_value { |p| p.resolve(nil) unless p.resolved? }
  @pending_ctl.clear
  # Wake any fiber blocked on a publish ack. Don't clear the
  # tracker yet — M6 retransmit-on-reconnect will want to inspect
  # +inflight_outbound+. For now, just unblock with nil.
  @qos_tracker.inflight_outbound.each do |pending|
    pending.promise.resolve(nil) unless pending.promise.resolved?
  end
end

#handle_connected(mqtt, lifecycle) ⇒ Object

Called by Engine#handle_connected. Runs inside the caller’s Async task (Socket#connect → Engine.connect → Transport.connect → Engine.handle_connected), so we can synchronously drive the CONNECT/CONNACK exchange and raise to the caller on refusal.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/zzq/routing/client.rb', line 60

def handle_connected(mqtt, lifecycle)
  opts = @engine.options
  mqtt.version = opts.version

  connect_packet = build_connect(opts)

  Async::Task.current.with_timeout(HANDSHAKE_TIMEOUT) do
    mqtt.write_packet(connect_packet)
    reply = mqtt.read_packet
    raise ConnectionRefused.new(0, "no CONNACK from broker") if reply.nil?
    raise ConnectionRefused.new(0, "expected CONNACK, got #{reply.class.name}") unless reply.is_a?(Protocol::MQTT::Packet::Connack)
    unless success_connack?(reply)
      raise ConnectionRefused.new(reply.reason_code)
    end
    @last_connack = reply
  end

  @conn = mqtt
  lifecycle.handshake_succeeded!

  start_read_loop(mqtt, lifecycle)
  start_keepalive(mqtt, lifecycle)
end

#publish(topic:, payload:, qos: 0, retain: false, properties: {}) ⇒ Object

Send a PUBLISH. For QoS 0 returns immediately after write. For QoS 1 blocks the fiber until PUBACK. For QoS 2 blocks until PUBCOMP.

Raises:



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/zzq/routing/client.rb', line 118

def publish(topic:, payload:, qos: 0, retain: false, properties: {})
  raise ClosedError, "client not connected" unless @conn
  if qos.zero?
    pkt = Protocol::MQTT::Packet::Publish.new(
      topic: topic, payload: payload, qos: 0, retain: retain,
      properties: properties,
    )
    @conn.write_packet(pkt)
    @keepalive&.note_sent
    return nil
  end

  packet_id = @packet_ids.acquire
  pkt = Protocol::MQTT::Packet::Publish.new(
    topic: topic, payload: payload, qos: qos, retain: retain,
    packet_id: packet_id, properties: properties,
  )
  pending = qos == 1 \
    ? @qos_tracker.track_qos1(packet_id, pkt) \
    : @qos_tracker.track_qos2(packet_id, pkt)
  begin
    @conn.write_packet(pkt)
    @keepalive&.note_sent
    result = Async::Task.current.with_timeout(PUBLISH_TIMEOUT) { pending.promise.wait }
    raise ClosedError, "connection lost before publish ack" if result.nil?
  ensure
    @packet_ids.release(packet_id)
  end
  nil
end

#session_present?Boolean

Returns:

  • (Boolean)


51
52
53
# File 'lib/zzq/routing/client.rb', line 51

def session_present?
  @last_connack&.session_present || false
end

#subscribe(filter, qos: 0, no_local: false, retain_as_published: false, retain_handling: 0) ⇒ Object

Send a SUBSCRIBE, wait for SUBACK, return a Subscription.

Raises:



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/zzq/routing/client.rb', line 151

def subscribe(filter, qos: 0, no_local: false, retain_as_published: false,
              retain_handling: 0)
  raise ClosedError, "client not connected" unless @conn
  packet_id = @packet_ids.acquire
  filter_entry = {
    filter: filter, qos: qos, no_local: no_local,
    retain_as_published: retain_as_published,
    retain_handling: retain_handling,
  }
  subscription = Subscription.new(
    filters: [filter],
    hwm: @engine.options.recv_hwm,
    on_close: ->(sub) { send_unsubscribe(sub) },
  )
  @subscriptions << subscription

  promise = Async::Promise.new
  @pending_ctl[packet_id] = promise
  begin
    @conn.write_packet(Protocol::MQTT::Packet::Subscribe.new(
      packet_id: packet_id, filters: [filter_entry],
    ))
    @keepalive&.note_sent

    suback = Async::Task.current.with_timeout(SUBSCRIBE_TIMEOUT) { promise.wait }
    raise ClosedError, "connection lost before SUBACK" if suback.nil?
    reason = suback.reason_codes.first
    if reason >= 0x80
      @subscriptions.delete(subscription)
      raise Error, "SUBSCRIBE refused: reason=0x#{reason.to_s(16)}"
    end
  ensure
    @packet_ids.release(packet_id)
  end
  subscription
end