Class: ZZQ::Routing::Broker
- Inherits:
-
Object
- Object
- ZZQ::Routing::Broker
- Defined in:
- lib/zzq/routing/broker.rb
Overview
Broker-side routing.
Current scope:
* Read CONNECT, send CONNACK.
* Per-connection read loop: PINGREQ/PINGRESP, DISCONNECT,
PUBLISH fan-out to matching sessions (QoS 0/1/2), SUBSCRIBE /
UNSUBSCRIBE with SUBACK / UNSUBACK, and PUBACK/PUBREC/PUBREL/
PUBCOMP to drive outbound QoS state per session.
* Simple linear session map; proper topic trie + retained store
come in M7.
Defined Under Namespace
Classes: Session
Constant Summary collapse
- CONNECT_TIMEOUT =
seconds to wait for the initial CONNECT
10
Instance Method Summary collapse
- #close ⇒ Object
- #connection_added(_conn) ⇒ Object
- #connection_removed(conn) ⇒ Object
-
#dispatch_message(message, origin: nil) ⇒ Object
Fan out a Message to every session subscribed to a matching filter.
- #handle_accepted(mqtt, lifecycle) ⇒ Object
-
#initialize(engine) ⇒ Broker
constructor
A new instance of Broker.
-
#publish_will(session) ⇒ Object
Publish the client’s Last Will and Testament via the normal dispatch path.
-
#session_count ⇒ Object
Used by tests.
Constructor Details
#initialize(engine) ⇒ Broker
Returns a new instance of Broker.
37 38 39 40 41 42 43 |
# File 'lib/zzq/routing/broker.rb', line 37 def initialize(engine) @engine = engine @sessions = {} # client_id => Session @by_conn = {} # Connection => Session @trie = TopicTrie.new @retained = RetainedStore.new(persistence: engine..persistence) end |
Instance Method Details
#close ⇒ Object
109 |
# File 'lib/zzq/routing/broker.rb', line 109 def close; end |
#connection_added(_conn) ⇒ Object
78 |
# File 'lib/zzq/routing/broker.rb', line 78 def connection_added(_conn); end |
#connection_removed(conn) ⇒ Object
81 82 83 84 85 86 87 |
# File 'lib/zzq/routing/broker.rb', line 81 def connection_removed(conn) session = @by_conn.delete(conn) return unless session publish_will(session) unless session.clean_disconnect @sessions.delete(session.client_id) @trie.remove_session(session) end |
#dispatch_message(message, origin: nil) ⇒ Object
Fan out a Message to every session subscribed to a matching filter. Used by both inbound PUBLISH handling and Broker#ingest from a non-MQTT source.
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/zzq/routing/broker.rb', line 119 def (, origin: nil) delivered = {} # session.object_id => true (one copy per session) @trie.match(.topic).each do |hit| # v5 No Local (MQTT-3.8.3-3): when set on the subscription, # don't echo a publish back to the publishing client. next if hit.entry[:no_local] && origin && hit.session.conn.equal?(origin) next if delivered[hit.session.object_id] delivered[hit.session.object_id] = true forward_publish(hit.session, , hit.entry) end end |
#handle_accepted(mqtt, lifecycle) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/zzq/routing/broker.rb', line 46 def handle_accepted(mqtt, lifecycle) connect = nil Async::Task.current.with_timeout(CONNECT_TIMEOUT) do connect = mqtt.read_connect_packet raise ConnectionRejected, "no CONNECT received" if connect.nil? client_id = connect.client_id if client_id.empty? && mqtt.version == 3 && !connect.clean_start write_connack(mqtt, reason: Protocol::MQTT::ReasonCodes::V3Connack::IDENTIFIER_REJECTED) raise ConnectionRejected, "empty ClientId with clean_session=false is illegal in v3" end assigned = client_id.empty? client_id = generate_client_id if assigned # v5 Receive Maximum: client advertises the max number of # outstanding QoS > 0 publishes it is willing to accept. Sized # on the packet-id allocator used for fan-out to this session. client_rcv_max = connect.properties[:receive_maximum] || 65_535 session = build_session(client_id, mqtt, will: connect.will, receive_maximum: client_rcv_max) @sessions[client_id] = session @by_conn[mqtt] = session write_connack(mqtt, reason: 0, assigned_client_id: assigned ? client_id : nil) end lifecycle.handshake_succeeded! start_read_loop(mqtt, lifecycle) start_keepalive(mqtt, lifecycle, connect.keep_alive) end |
#publish_will(session) ⇒ Object
Publish the client’s Last Will and Testament via the normal dispatch path. Called when the session is torn down without having seen a clean DISCONNECT.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/zzq/routing/broker.rb', line 93 def publish_will(session) will = session.will return unless will msg = Message.new( topic: will.fetch(:topic), payload: will.fetch(:payload), qos: will.fetch(:qos, 0), retain: will.fetch(:retain, false), properties: will.fetch(:properties, {}) || {}, ) # Also honor RETAIN on wills. apply_retain(msg) if msg.retain (msg) end |
#session_count ⇒ Object
Used by tests.
113 |
# File 'lib/zzq/routing/broker.rb', line 113 def session_count = @sessions.size |