Class: NNQ::Socket
- Inherits:
-
Object
show all
- Defined in:
- lib/nnq/socket.rb
Overview
Socket base class. Subclasses (PUSH, PULL, …) wire up a routing strategy and the SP protocol id.
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(raw: false, linger: Float::INFINITY, send_hwm: Options::DEFAULT_HWM, recv_hwm: Options::DEFAULT_HWM) {|the| ... } ⇒ Socket
Returns a new instance of Socket.
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/nnq/socket.rb', line 30
def initialize(raw: false, linger: Float::INFINITY, send_hwm: Options::DEFAULT_HWM, recv_hwm: Options::DEFAULT_HWM)
@raw = raw
@options = Options.new(linger: linger, send_hwm: send_hwm, recv_hwm: recv_hwm)
@engine = Engine.new(protocol: protocol, options: @options) { |engine| build_routing(engine) }
begin
yield self
ensure
close
end if block_given?
end
|
Instance Attribute Details
11
12
13
|
# File 'lib/nnq/socket.rb', line 11
def options
@options
end
|
Class Method Details
.bind(endpoint, **opts) ⇒ Object
14
15
16
17
18
|
# File 'lib/nnq/socket.rb', line 14
def self.bind(endpoint, **opts)
sock = new(**opts)
sock.bind(endpoint)
sock
end
|
.connect(endpoint, **opts) ⇒ Object
21
22
23
24
25
|
# File 'lib/nnq/socket.rb', line 21
def self.connect(endpoint, **opts)
sock = new(**opts)
sock.connect(endpoint)
sock
end
|
Instance Method Details
#all_peers_gone ⇒ Object
Resolves with ‘true` the first time all peers have disconnected (after at least one peer was connected). Edge-triggered.
85
86
87
|
# File 'lib/nnq/socket.rb', line 85
def all_peers_gone
@engine.all_peers_gone
end
|
#bind(endpoint, **opts) ⇒ Object
48
49
50
51
|
# File 'lib/nnq/socket.rb', line 48
def bind(endpoint, **opts)
ensure_parent_task
Reactor.run { @engine.bind(endpoint, **opts) }
end
|
#close ⇒ Object
60
61
62
63
|
# File 'lib/nnq/socket.rb', line 60
def close
Reactor.run { @engine.close }
nil
end
|
#close_read ⇒ Object
Closes the recv side only. Buffered messages drain, then #receive returns nil. Send side stays open.
102
103
104
105
|
# File 'lib/nnq/socket.rb', line 102
def close_read
Reactor.run { @engine.close_read }
nil
end
|
#connect(endpoint, **opts) ⇒ Object
54
55
56
57
|
# File 'lib/nnq/socket.rb', line 54
def connect(endpoint, **opts)
ensure_parent_task
Reactor.run { @engine.connect(endpoint, **opts) }
end
|
#connection_count ⇒ Object
71
72
73
|
# File 'lib/nnq/socket.rb', line 71
def connection_count
@engine.connections.size
end
|
#frozen_binary(body) ⇒ Object
Coerces body to a frozen binary string. Called by every send method so a caller can’t mutate the string after it’s been enqueued (the body sits in a send queue or per-peer queue until the pump writes it, and an unfrozen caller-owned buffer could be appended to mid-flight).
Fast-path: already frozen + binary → returned as-is.
145
146
147
148
|
# File 'lib/nnq/socket.rb', line 145
def frozen_binary(body)
return body if body.frozen? && body.encoding == Encoding::BINARY
body.b.freeze
end
|
#last_endpoint ⇒ Object
66
67
68
|
# File 'lib/nnq/socket.rb', line 66
def last_endpoint
@engine.last_endpoint
end
|
#monitor(verbose: false) {|event| ... } ⇒ Async::Task
Yields lifecycle events for this socket until it’s closed or the returned task is stopped.
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/nnq/socket.rb', line 116
def monitor(verbose: false, &block)
ensure_parent_task
queue = Async::Queue.new
@engine.monitor_queue = queue
@engine.verbose_monitor = verbose
Reactor.run do
@engine.monitor_task = @engine.spawn_task(annotation: "nnq monitor") do
while (event = queue.dequeue)
block.call(event)
end
rescue Async::Stop
ensure
@engine.monitor_queue = nil
@engine.monitor_task = nil
block.call(MonitorEvent.new(type: :monitor_stopped))
end
end
end
|
#peer_connected ⇒ Object
Resolves with the first connected peer (or nil on close without any peers). Block on ‘.wait` to wait until a connection is ready.
78
79
80
|
# File 'lib/nnq/socket.rb', line 78
def peer_connected
@engine.peer_connected
end
|
#raw? ⇒ Boolean
43
44
45
|
# File 'lib/nnq/socket.rb', line 43
def raw?
@raw
end
|
#reconnect_enabled ⇒ Object
90
91
92
|
# File 'lib/nnq/socket.rb', line 90
def reconnect_enabled
@engine.reconnect_enabled
end
|
#reconnect_enabled=(value) ⇒ Object
95
96
97
|
# File 'lib/nnq/socket.rb', line 95
def reconnect_enabled=(value)
@engine.reconnect_enabled = value
end
|