Class: NNQ::Socket
- Inherits:
-
Object
show all
- Defined in:
- lib/nnq/socket.rb
Overview
Socket base class. Subclasses (PUSH, PULL, …) wire up a routing strategy and the SP protocol id.
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(raw: false, linger: Float::INFINITY, send_hwm: Options::DEFAULT_HWM) {|the| ... } ⇒ Socket
Returns a new instance of Socket.
35
36
37
38
39
40
41
42
43
44
45
|
# File 'lib/nnq/socket.rb', line 35
def initialize(raw: false, linger: Float::INFINITY, send_hwm: Options::DEFAULT_HWM)
@raw = raw
@options = Options.new(linger: linger, send_hwm: send_hwm)
@engine = Engine.new(protocol: protocol, options: @options) { |engine| build_routing(engine) }
begin
yield self
ensure
close
end if block_given?
end
|
Instance Attribute Details
16
17
18
|
# File 'lib/nnq/socket.rb', line 16
def options
@options
end
|
Class Method Details
.bind(endpoint, **opts) ⇒ Object
19
20
21
22
23
|
# File 'lib/nnq/socket.rb', line 19
def self.bind(endpoint, **opts)
sock = new(**opts)
sock.bind(endpoint)
sock
end
|
.connect(endpoint, **opts) ⇒ Object
26
27
28
29
30
|
# File 'lib/nnq/socket.rb', line 26
def self.connect(endpoint, **opts)
sock = new(**opts)
sock.connect(endpoint)
sock
end
|
Instance Method Details
#all_peers_gone ⇒ Object
Resolves with ‘true` the first time all peers have disconnected (after at least one peer was connected). Edge-triggered.
90
91
92
|
# File 'lib/nnq/socket.rb', line 90
def all_peers_gone
@engine.all_peers_gone
end
|
#bind(endpoint) ⇒ Object
53
54
55
56
|
# File 'lib/nnq/socket.rb', line 53
def bind(endpoint)
ensure_parent_task
Reactor.run { @engine.bind(endpoint) }
end
|
#close ⇒ Object
65
66
67
68
|
# File 'lib/nnq/socket.rb', line 65
def close
Reactor.run { @engine.close }
nil
end
|
#close_read ⇒ Object
Closes the recv side only. Buffered messages drain, then #receive returns nil. Send side stays open.
107
108
109
110
|
# File 'lib/nnq/socket.rb', line 107
def close_read
Reactor.run { @engine.close_read }
nil
end
|
#connect(endpoint) ⇒ Object
59
60
61
62
|
# File 'lib/nnq/socket.rb', line 59
def connect(endpoint)
ensure_parent_task
Reactor.run { @engine.connect(endpoint) }
end
|
#connection_count ⇒ Object
76
77
78
|
# File 'lib/nnq/socket.rb', line 76
def connection_count
@engine.connections.size
end
|
#frozen_binary(body) ⇒ Object
Coerces body to a frozen binary string. Called by every send method so a caller can’t mutate the string after it’s been enqueued (the body sits in a send queue or per-peer queue until the pump writes it, and an unfrozen caller-owned buffer could be appended to mid-flight).
Fast-path: already frozen + binary → returned as-is.
149
150
151
152
|
# File 'lib/nnq/socket.rb', line 149
def frozen_binary(body)
return body if body.frozen? && body.encoding == Encoding::BINARY
body.b.freeze
end
|
#last_endpoint ⇒ Object
71
72
73
|
# File 'lib/nnq/socket.rb', line 71
def last_endpoint
@engine.last_endpoint
end
|
#monitor(verbose: false) {|event| ... } ⇒ Async::Task
Yields lifecycle events for this socket until it’s closed or the returned task is stopped.
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/nnq/socket.rb', line 121
def monitor(verbose: false, &block)
ensure_parent_task
queue = Async::Queue.new
@engine.monitor_queue = queue
@engine.verbose_monitor = verbose
Reactor.run do
@engine.spawn_task(annotation: "nnq monitor") do
while (event = queue.dequeue)
block.call(event)
end
rescue Async::Stop
ensure
@engine.monitor_queue = nil
block.call(MonitorEvent.new(type: :monitor_stopped))
end
end
end
|
#peer_connected ⇒ Object
Resolves with the first connected peer (or nil on close without any peers). Block on ‘.wait` to wait until a connection is ready.
83
84
85
|
# File 'lib/nnq/socket.rb', line 83
def peer_connected
@engine.peer_connected
end
|
#raw? ⇒ Boolean
48
49
50
|
# File 'lib/nnq/socket.rb', line 48
def raw?
@raw
end
|
#reconnect_enabled ⇒ Object
95
96
97
|
# File 'lib/nnq/socket.rb', line 95
def reconnect_enabled
@engine.reconnect_enabled
end
|
#reconnect_enabled=(value) ⇒ Object
100
101
102
|
# File 'lib/nnq/socket.rb', line 100
def reconnect_enabled=(value)
@engine.reconnect_enabled = value
end
|