Class: ZZQ::Routing::Broker

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/routing/broker.rb

Overview

Broker-side routing.

Current scope:

* Read CONNECT, send CONNACK.
* Per-connection read loop: PINGREQ/PINGRESP, DISCONNECT,
  PUBLISH fan-out to matching sessions (QoS 0/1/2), SUBSCRIBE /
  UNSUBSCRIBE with SUBACK / UNSUBACK, and PUBACK/PUBREC/PUBREL/
  PUBCOMP to drive outbound QoS state per session.
* Simple linear session map; proper topic trie + retained store
  come in M7.

Defined Under Namespace

Classes: Session

Constant Summary collapse

CONNECT_TIMEOUT =

seconds to wait for the initial CONNECT

10

Instance Method Summary collapse

Constructor Details

#initialize(engine) ⇒ Broker

Returns a new instance of Broker.



37
38
39
40
41
42
43
# File 'lib/zzq/routing/broker.rb', line 37

def initialize(engine)
  @engine     = engine
  @sessions   = {}  # client_id  => Session
  @by_conn    = {}  # Connection => Session
  @trie       = TopicTrie.new
  @retained   = RetainedStore.new(persistence: engine.options.persistence)
end

Instance Method Details

#closeObject



109
# File 'lib/zzq/routing/broker.rb', line 109

def close; end

#connection_added(_conn) ⇒ Object



78
# File 'lib/zzq/routing/broker.rb', line 78

def connection_added(_conn); end

#connection_removed(conn) ⇒ Object



81
82
83
84
85
86
87
# File 'lib/zzq/routing/broker.rb', line 81

def connection_removed(conn)
  session = @by_conn.delete(conn)
  return unless session
  publish_will(session) unless session.clean_disconnect
  @sessions.delete(session.client_id)
  @trie.remove_session(session)
end

#dispatch_message(message, origin: nil) ⇒ Object

Fan out a Message to every session subscribed to a matching filter. Used by both inbound PUBLISH handling and Broker#ingest from a non-MQTT source.



119
120
121
122
123
124
125
126
127
128
129
# File 'lib/zzq/routing/broker.rb', line 119

def dispatch_message(message, origin: nil)
  delivered = {}  # session.object_id => true  (one copy per session)
  @trie.match(message.topic).each do |hit|
    # v5 No Local (MQTT-3.8.3-3): when set on the subscription,
    # don't echo a publish back to the publishing client.
    next if hit.entry[:no_local] && origin && hit.session.conn.equal?(origin)
    next if delivered[hit.session.object_id]
    delivered[hit.session.object_id] = true
    forward_publish(hit.session, message, hit.entry)
  end
end

#handle_accepted(mqtt, lifecycle) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/zzq/routing/broker.rb', line 46

def handle_accepted(mqtt, lifecycle)
  connect = nil
  Async::Task.current.with_timeout(CONNECT_TIMEOUT) do
    connect = mqtt.read_connect_packet
    raise ConnectionRejected, "no CONNECT received" if connect.nil?

    client_id = connect.client_id
    if client_id.empty? && mqtt.version == 3 && !connect.clean_start
      write_connack(mqtt, reason: Protocol::MQTT::ReasonCodes::V3Connack::IDENTIFIER_REJECTED)
      raise ConnectionRejected, "empty ClientId with clean_session=false is illegal in v3"
    end
    assigned = client_id.empty?
    client_id = generate_client_id if assigned

    # v5 Receive Maximum: client advertises the max number of
    # outstanding QoS > 0 publishes it is willing to accept. Sized
    # on the packet-id allocator used for fan-out to this session.
    client_rcv_max = connect.properties[:receive_maximum] || 65_535
    session = build_session(client_id, mqtt, will: connect.will,
                             receive_maximum: client_rcv_max)
    @sessions[client_id] = session
    @by_conn[mqtt] = session

    write_connack(mqtt, reason: 0, assigned_client_id: assigned ? client_id : nil)
  end

  lifecycle.handshake_succeeded!
  start_read_loop(mqtt, lifecycle)
  start_keepalive(mqtt, lifecycle, connect.keep_alive)
end

#publish_will(session) ⇒ Object

Publish the client’s Last Will and Testament via the normal dispatch path. Called when the session is torn down without having seen a clean DISCONNECT.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/zzq/routing/broker.rb', line 93

def publish_will(session)
  will = session.will
  return unless will
  msg = Message.new(
    topic:      will.fetch(:topic),
    payload:    will.fetch(:payload),
    qos:        will.fetch(:qos, 0),
    retain:     will.fetch(:retain, false),
    properties: will.fetch(:properties, {}) || {},
  )
  # Also honor RETAIN on wills.
  apply_retain(msg) if msg.retain
  dispatch_message(msg)
end

#session_countObject

Used by tests.



113
# File 'lib/zzq/routing/broker.rb', line 113

def session_count = @sessions.size