Class: ZZQ::Client

Inherits:
Socket show all
Defined in:
lib/zzq/client.rb

Overview

MQTT client. Owns exactly one MQTT session at a time (possibly persisted across reconnects), subscriptions, and a QoS tracker for inflight PUB flows.

M3 stub: only the constructor and #build_routing stand. The CONNECT/CONNACK handshake and publish/subscribe plumbing come in M4 / M5.

Instance Attribute Summary

Attributes inherited from Socket

#options

Instance Method Summary collapse

Methods inherited from Socket

#all_peers_gone, bind, #bind, #close, #coerce_binary, connect, #connect, #connection_count, #initialize, #last_endpoint, #monitor, #peer_connected

Constructor Details

This class inherits a constructor from ZZQ::Socket

Instance Method Details

#build_routing(engine) ⇒ Object



14
15
16
# File 'lib/zzq/client.rb', line 14

def build_routing(engine)
  Routing::Client.new(engine)
end

#last_connackObject

Most recent CONNACK received from the broker, or nil before the first successful connect.



21
# File 'lib/zzq/client.rb', line 21

def last_connack     = @engine.routing.last_connack

#publish(topic, payload, qos: 0, retain: false, properties: {}) ⇒ Object

Publish a message. For QoS 0 returns once the packet is written. For QoS 1 blocks until PUBACK; for QoS 2 blocks until PUBCOMP.

Raises:



28
29
30
31
32
33
34
# File 'lib/zzq/client.rb', line 28

def publish(topic, payload, qos: 0, retain: false, properties: {})
  raise NotInAsyncContext unless Async::Task.current?
  @engine.routing.publish(
    topic: topic, payload: coerce_binary(payload),
    qos: qos, retain: retain, properties: properties,
  )
end

#session_present?Boolean

Returns:

  • (Boolean)


22
# File 'lib/zzq/client.rb', line 22

def session_present? = @engine.routing.session_present?

#subscribe(filter, qos: 0, no_local: false, retain_as_published: false, retain_handling: 0) ⇒ Object

Subscribe to a topic filter. Returns a Subscription whose queue receives matching messages until it is closed.

Raises:



39
40
41
42
43
44
45
46
47
# File 'lib/zzq/client.rb', line 39

def subscribe(filter, qos: 0, no_local: false, retain_as_published: false,
              retain_handling: 0)
  raise NotInAsyncContext unless Async::Task.current?
  @engine.routing.subscribe(
    filter, qos: qos, no_local: no_local,
    retain_as_published: retain_as_published,
    retain_handling: retain_handling,
  )
end