Class: NNQ::Socket
- Inherits:
-
Object
show all
- Defined in:
- lib/nnq/socket.rb
Overview
Socket base class. Subclasses (PUSH, PULL, …) wire up a routing strategy and the SP protocol id.
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(raw: false, linger: Float::INFINITY, send_hwm: Options::DEFAULT_HWM, recv_hwm: Options::DEFAULT_HWM) {|the| ... } ⇒ Socket
Returns a new instance of Socket.
30
31
32
33
34
35
36
37
38
39
40
|
# File 'lib/nnq/socket.rb', line 30
def initialize(raw: false, linger: Float::INFINITY, send_hwm: Options::DEFAULT_HWM, recv_hwm: Options::DEFAULT_HWM)
@raw = raw
@options = Options.new(linger: linger, send_hwm: send_hwm, recv_hwm: recv_hwm)
@engine = Engine.new(protocol: protocol, options: @options) { |engine| build_routing(engine) }
begin
yield self
ensure
close
end if block_given?
end
|
Instance Attribute Details
11
12
13
|
# File 'lib/nnq/socket.rb', line 11
def options
@options
end
|
Class Method Details
.bind(endpoint, **opts) ⇒ Object
14
15
16
17
18
|
# File 'lib/nnq/socket.rb', line 14
def self.bind(endpoint, **opts)
sock = new(**opts)
sock.bind(endpoint)
sock
end
|
.connect(endpoint, **opts) ⇒ Object
21
22
23
24
25
|
# File 'lib/nnq/socket.rb', line 21
def self.connect(endpoint, **opts)
sock = new(**opts)
sock.connect(endpoint)
sock
end
|
Instance Method Details
#all_peers_gone ⇒ Object
Resolves with ‘true` the first time all peers have disconnected (after at least one peer was connected). Edge-triggered.
85
86
87
|
# File 'lib/nnq/socket.rb', line 85
def all_peers_gone
@engine.all_peers_gone
end
|
#bind(endpoint, **opts) ⇒ Object
48
49
50
51
|
# File 'lib/nnq/socket.rb', line 48
def bind(endpoint, **opts)
ensure_parent_task
Reactor.run { @engine.bind(endpoint, **opts) }
end
|
#close ⇒ Object
60
61
62
63
|
# File 'lib/nnq/socket.rb', line 60
def close
Reactor.run { @engine.close }
nil
end
|
#close_read ⇒ Object
Closes the recv side only. Buffered messages drain, then #receive returns nil. Send side stays open.
102
103
104
105
|
# File 'lib/nnq/socket.rb', line 102
def close_read
Reactor.run { @engine.close_read }
nil
end
|
#coerce_binary(body) ⇒ Object
Coerces body to a frozen ‘Encoding::BINARY`-tagged String and returns it. Every send method runs its body through this so the receiver sees a uniform frozen+BINARY contract across transports (mutation bugs raise `FrozenError` instead of silently corrupting a shared reference on the inproc fast path).
Fast-path: unfrozen non-BINARY strings are re-tagged in place (force_encoding is a flag flip, no copy). The pathological case of a frozen non-BINARY body (e.g. a ‘# frozen_string_literal: true` literal) can’t be re-tagged in place — the inproc Pipe handles that with a copy so the receive contract stays uniform.
149
150
151
152
153
|
# File 'lib/nnq/socket.rb', line 149
def coerce_binary(body)
body = body.to_str unless body.is_a?(String)
body.force_encoding(Encoding::BINARY) unless body.frozen? || body.encoding == Encoding::BINARY
body.freeze
end
|
#connect(endpoint, **opts) ⇒ Object
54
55
56
57
|
# File 'lib/nnq/socket.rb', line 54
def connect(endpoint, **opts)
ensure_parent_task
Reactor.run { @engine.connect(endpoint, **opts) }
end
|
#connection_count ⇒ Object
71
72
73
|
# File 'lib/nnq/socket.rb', line 71
def connection_count
@engine.connections.size
end
|
#last_endpoint ⇒ Object
66
67
68
|
# File 'lib/nnq/socket.rb', line 66
def last_endpoint
@engine.last_endpoint
end
|
#monitor(verbose: false) {|event| ... } ⇒ Async::Task
Yields lifecycle events for this socket until it’s closed or the returned task is stopped.
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/nnq/socket.rb', line 116
def monitor(verbose: false, &block)
ensure_parent_task
queue = Async::Queue.new
@engine.monitor_queue = queue
@engine.verbose_monitor = verbose
Reactor.run do
@engine.monitor_task = @engine.spawn_task(annotation: "nnq monitor") do
while (event = queue.dequeue)
block.call(event)
end
rescue Async::Stop
ensure
@engine.monitor_queue = nil
@engine.monitor_task = nil
block.call(MonitorEvent.new(type: :monitor_stopped))
end
end
end
|
#peer_connected ⇒ Object
Resolves with the first connected peer (or nil on close without any peers). Block on ‘.wait` to wait until a connection is ready.
78
79
80
|
# File 'lib/nnq/socket.rb', line 78
def peer_connected
@engine.peer_connected
end
|
#raw? ⇒ Boolean
43
44
45
|
# File 'lib/nnq/socket.rb', line 43
def raw?
@raw
end
|
#reconnect_enabled ⇒ Object
90
91
92
|
# File 'lib/nnq/socket.rb', line 90
def reconnect_enabled
@engine.reconnect_enabled
end
|
#reconnect_enabled=(value) ⇒ Object
95
96
97
|
# File 'lib/nnq/socket.rb', line 95
def reconnect_enabled=(value)
@engine.reconnect_enabled = value
end
|