Class: ZZQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/zzq/socket.rb

Overview

Abstract base class for Client and Broker. Owns the Engine and exposes the infrastructure API (bind / connect / close / monitor / peer_connected / all_peers_gone). The role-specific API (subscribe / publish / ingest) lives on the subclasses.

ZZQ is pure-Async: every public method that performs I/O must run inside an Async::Task. Either wrap calls in ‘Async do … end`, or use the block form of #connect / #bind which enters `Async{}` for you and closes the socket on block exit.

Direct Known Subclasses

Broker, Client

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**opts) ⇒ Socket

Returns a new instance of Socket.



46
47
48
49
# File 'lib/zzq/socket.rb', line 46

def initialize(**opts)
  @options = Options.new(**opts)
  @engine  = Engine.new(options: @options) { |engine| build_routing(engine) }
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



21
22
23
# File 'lib/zzq/socket.rb', line 21

def options
  @options
end

Class Method Details

.bind(endpoint, **opts, &block) ⇒ Object



24
25
26
27
28
29
30
31
32
# File 'lib/zzq/socket.rb', line 24

def self.bind(endpoint, **opts, &block)
  sock = new(**opts)
  if block
    sock.bind(endpoint, &block)
  else
    sock.bind(endpoint)
    sock
  end
end

.connect(endpoint, **opts, &block) ⇒ Object



35
36
37
38
39
40
41
42
43
# File 'lib/zzq/socket.rb', line 35

def self.connect(endpoint, **opts, &block)
  sock = new(**opts)
  if block
    sock.connect(endpoint, &block)
  else
    sock.connect(endpoint)
    sock
  end
end

Instance Method Details

#all_peers_goneObject



80
# File 'lib/zzq/socket.rb', line 80

def all_peers_gone       = @engine.all_peers_gone

#bind(endpoint, **opts, &block) ⇒ Object

Binds to endpoint. If a block is given, wraps the call in ‘Async do … end` (if not already inside one), yields the owning Async::Task to the block, and closes the socket on block exit.



55
56
57
# File 'lib/zzq/socket.rb', line 55

def bind(endpoint, **opts, &block)
  block ? run_in_async(endpoint, :bind, opts, &block) : run_sync(:bind, endpoint, **opts)
end

#closeObject



67
68
69
70
71
72
73
74
# File 'lib/zzq/socket.rb', line 67

def close
  if Async::Task.current?
    @engine.close
  else
    Async { @engine.close }.wait
  end
  nil
end

#coerce_binary(body) ⇒ Object

Coerces body to a frozen BINARY String. MQTT payloads are opaque bytes — uniform frozen+BINARY contract mirrors NNQ.



103
104
105
106
107
# File 'lib/zzq/socket.rb', line 103

def coerce_binary(body)
  body = body.to_str unless body.is_a?(String)
  body.force_encoding(Encoding::BINARY) unless body.frozen? || body.encoding == Encoding::BINARY
  body.freeze
end

#connect(endpoint, **opts, &block) ⇒ Object

Connects to endpoint. Block form yields the owning Async::Task and auto-closes on block exit, same as #bind.



62
63
64
# File 'lib/zzq/socket.rb', line 62

def connect(endpoint, **opts, &block)
  block ? run_in_async(endpoint, :connect, opts, &block) : run_sync(:connect, endpoint, **opts)
end

#connection_countObject



78
# File 'lib/zzq/socket.rb', line 78

def connection_count     = @engine.connections.size

#last_endpointObject



77
# File 'lib/zzq/socket.rb', line 77

def last_endpoint        = @engine.last_endpoint

#monitor(&block) ⇒ Object

Yields lifecycle events for this socket until it’s closed. Must be called from within an Async::Task.

Raises:



85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/zzq/socket.rb', line 85

def monitor(&block)
  raise NotInAsyncContext unless Async::Task.current?
  queue = Async::Queue.new
  @engine.monitor_queue = queue
  @engine.spawn_task(annotation: "zzq monitor") do
    while (event = queue.dequeue)
      block.call(event)
    end
  rescue Async::Stop
  ensure
    @engine.monitor_queue = nil
    block.call(MonitorEvent.new(type: :monitor_stopped))
  end
end

#peer_connectedObject



79
# File 'lib/zzq/socket.rb', line 79

def peer_connected       = @engine.peer_connected