Class: NNQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/socket.rb

Overview

Socket base class. Subclasses (PUSH, PULL, …) wire up a routing strategy and the SP protocol id.

Direct Known Subclasses

BUS0, PAIR0, PUB0, PULL0, PUSH0, REP0, REQ0, RESPONDENT0, SUB0, SURVEYOR0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(raw: false, linger: Float::INFINITY, send_hwm: Options::DEFAULT_HWM) {|the| ... } ⇒ Socket

Returns a new instance of Socket.

Yield Parameters:

  • the (self)

    socket; when a block is passed the socket is #closed when the block returns (or raises), File.open-style.



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/nnq/socket.rb', line 35

def initialize(raw: false, linger: Float::INFINITY, send_hwm: Options::DEFAULT_HWM)
  @raw     = raw
  @options = Options.new(linger: linger, send_hwm: send_hwm)
  @engine  = Engine.new(protocol: protocol, options: @options) { |engine| build_routing(engine) }

  begin
    yield self
  ensure
    close
  end if block_given?
end

Instance Attribute Details

#optionsOptions (readonly)

Returns:



16
17
18
# File 'lib/nnq/socket.rb', line 16

def options
  @options
end

Class Method Details

.bind(endpoint, **opts) ⇒ Object



19
20
21
22
23
# File 'lib/nnq/socket.rb', line 19

def self.bind(endpoint, **opts)
  sock = new(**opts)
  sock.bind(endpoint)
  sock
end

.connect(endpoint, **opts) ⇒ Object



26
27
28
29
30
# File 'lib/nnq/socket.rb', line 26

def self.connect(endpoint, **opts)
  sock = new(**opts)
  sock.connect(endpoint)
  sock
end

Instance Method Details

#all_peers_goneObject

Resolves with ‘true` the first time all peers have disconnected (after at least one peer was connected). Edge-triggered.



90
91
92
# File 'lib/nnq/socket.rb', line 90

def all_peers_gone
  @engine.all_peers_gone
end

#bind(endpoint) ⇒ Object



53
54
55
56
# File 'lib/nnq/socket.rb', line 53

def bind(endpoint)
  ensure_parent_task
  Reactor.run { @engine.bind(endpoint) }
end

#closeObject



65
66
67
68
# File 'lib/nnq/socket.rb', line 65

def close
  Reactor.run { @engine.close }
  nil
end

#close_readObject

Closes the recv side only. Buffered messages drain, then #receive returns nil. Send side stays open.



107
108
109
110
# File 'lib/nnq/socket.rb', line 107

def close_read
  Reactor.run { @engine.close_read }
  nil
end

#connect(endpoint) ⇒ Object



59
60
61
62
# File 'lib/nnq/socket.rb', line 59

def connect(endpoint)
  ensure_parent_task
  Reactor.run { @engine.connect(endpoint) }
end

#connection_countObject



76
77
78
# File 'lib/nnq/socket.rb', line 76

def connection_count
  @engine.connections.size
end

#frozen_binary(body) ⇒ Object

Coerces body to a frozen binary string. Called by every send method so a caller can’t mutate the string after it’s been enqueued (the body sits in a send queue or per-peer queue until the pump writes it, and an unfrozen caller-owned buffer could be appended to mid-flight).

Fast-path: already frozen + binary → returned as-is.



149
150
151
152
# File 'lib/nnq/socket.rb', line 149

def frozen_binary(body)
  return body if body.frozen? && body.encoding == Encoding::BINARY
  body.b.freeze
end

#last_endpointObject



71
72
73
# File 'lib/nnq/socket.rb', line 71

def last_endpoint
  @engine.last_endpoint
end

#monitor(verbose: false) {|event| ... } ⇒ Async::Task

Yields lifecycle events for this socket until it’s closed or the returned task is stopped.

Parameters:

  • verbose (Boolean) (defaults to: false)

    when true, also emits :message_sent / :message_received events

Yields:

  • (event)

Yield Parameters:

Returns:

  • (Async::Task)


121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/nnq/socket.rb', line 121

def monitor(verbose: false, &block)
  ensure_parent_task

  queue                   = Async::Queue.new
  @engine.monitor_queue   = queue
  @engine.verbose_monitor = verbose

  Reactor.run do
    @engine.spawn_task(annotation: "nnq monitor") do
      while (event = queue.dequeue)
        block.call(event)
      end
    rescue Async::Stop
    ensure
      @engine.monitor_queue = nil
      block.call(MonitorEvent.new(type: :monitor_stopped))
    end
  end
end

#peer_connectedObject

Resolves with the first connected peer (or nil on close without any peers). Block on ‘.wait` to wait until a connection is ready.



83
84
85
# File 'lib/nnq/socket.rb', line 83

def peer_connected
  @engine.peer_connected
end

#raw?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/nnq/socket.rb', line 48

def raw?
  @raw
end

#reconnect_enabledObject



95
96
97
# File 'lib/nnq/socket.rb', line 95

def reconnect_enabled
  @engine.reconnect_enabled
end

#reconnect_enabled=(value) ⇒ Object



100
101
102
# File 'lib/nnq/socket.rb', line 100

def reconnect_enabled=(value)
  @engine.reconnect_enabled = value
end