Class: NNQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/socket.rb

Overview

Socket base class. Subclasses (PUSH, PULL, …) wire up a routing strategy and the SP protocol id.

Direct Known Subclasses

BUS0, PAIR0, PUB0, PULL0, PUSH0, REP0, REQ0, RESPONDENT0, SUB0, SURVEYOR0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(raw: false, linger: Float::INFINITY, send_hwm: Options::DEFAULT_HWM, recv_hwm: Options::DEFAULT_HWM) {|the| ... } ⇒ Socket

Returns a new instance of Socket.

Yield Parameters:

  • the (self)

    socket; when a block is passed the socket is #closed when the block returns (or raises), File.open-style.



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/nnq/socket.rb', line 30

def initialize(raw: false, linger: Float::INFINITY, send_hwm: Options::DEFAULT_HWM, recv_hwm: Options::DEFAULT_HWM)
  @raw     = raw
  @options = Options.new(linger: linger, send_hwm: send_hwm, recv_hwm: recv_hwm)
  @engine  = Engine.new(protocol: protocol, options: @options) { |engine| build_routing(engine) }

  begin
    yield self
  ensure
    close
  end if block_given?
end

Instance Attribute Details

#optionsOptions (readonly)

Returns:



11
12
13
# File 'lib/nnq/socket.rb', line 11

def options
  @options
end

Class Method Details

.bind(endpoint, **opts) ⇒ Object



14
15
16
17
18
# File 'lib/nnq/socket.rb', line 14

def self.bind(endpoint, **opts)
  sock = new(**opts)
  sock.bind(endpoint)
  sock
end

.connect(endpoint, **opts) ⇒ Object



21
22
23
24
25
# File 'lib/nnq/socket.rb', line 21

def self.connect(endpoint, **opts)
  sock = new(**opts)
  sock.connect(endpoint)
  sock
end

Instance Method Details

#all_peers_goneObject

Resolves with ‘true` the first time all peers have disconnected (after at least one peer was connected). Edge-triggered.



85
86
87
# File 'lib/nnq/socket.rb', line 85

def all_peers_gone
  @engine.all_peers_gone
end

#bind(endpoint, **opts) ⇒ Object



48
49
50
51
# File 'lib/nnq/socket.rb', line 48

def bind(endpoint, **opts)
  ensure_parent_task
  Reactor.run { @engine.bind(endpoint, **opts) }
end

#closeObject



60
61
62
63
# File 'lib/nnq/socket.rb', line 60

def close
  Reactor.run { @engine.close }
  nil
end

#close_readObject

Closes the recv side only. Buffered messages drain, then #receive returns nil. Send side stays open.



102
103
104
105
# File 'lib/nnq/socket.rb', line 102

def close_read
  Reactor.run { @engine.close_read }
  nil
end

#connect(endpoint, **opts) ⇒ Object



54
55
56
57
# File 'lib/nnq/socket.rb', line 54

def connect(endpoint, **opts)
  ensure_parent_task
  Reactor.run { @engine.connect(endpoint, **opts) }
end

#connection_countObject



71
72
73
# File 'lib/nnq/socket.rb', line 71

def connection_count
  @engine.connections.size
end

#frozen_binary(body) ⇒ Object

Coerces body to a frozen binary string. Called by every send method so a caller can’t mutate the string after it’s been enqueued (the body sits in a send queue or per-peer queue until the pump writes it, and an unfrozen caller-owned buffer could be appended to mid-flight).

Fast-path: already frozen + binary → returned as-is.



145
146
147
148
# File 'lib/nnq/socket.rb', line 145

def frozen_binary(body)
  return body if body.frozen? && body.encoding == Encoding::BINARY
  body.b.freeze
end

#last_endpointObject



66
67
68
# File 'lib/nnq/socket.rb', line 66

def last_endpoint
  @engine.last_endpoint
end

#monitor(verbose: false) {|event| ... } ⇒ Async::Task

Yields lifecycle events for this socket until it’s closed or the returned task is stopped.

Parameters:

  • verbose (Boolean) (defaults to: false)

    when true, also emits :message_sent / :message_received events

Yields:

  • (event)

Yield Parameters:

Returns:

  • (Async::Task)


116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/nnq/socket.rb', line 116

def monitor(verbose: false, &block)
  ensure_parent_task

  queue                   = Async::Queue.new
  @engine.monitor_queue   = queue
  @engine.verbose_monitor = verbose

  Reactor.run do
    @engine.monitor_task = @engine.spawn_task(annotation: "nnq monitor") do
      while (event = queue.dequeue)
        block.call(event)
      end
    rescue Async::Stop
    ensure
      @engine.monitor_queue = nil
      @engine.monitor_task  = nil
      block.call(MonitorEvent.new(type: :monitor_stopped))
    end
  end
end

#peer_connectedObject

Resolves with the first connected peer (or nil on close without any peers). Block on ‘.wait` to wait until a connection is ready.



78
79
80
# File 'lib/nnq/socket.rb', line 78

def peer_connected
  @engine.peer_connected
end

#raw?Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/nnq/socket.rb', line 43

def raw?
  @raw
end

#reconnect_enabledObject



90
91
92
# File 'lib/nnq/socket.rb', line 90

def reconnect_enabled
  @engine.reconnect_enabled
end

#reconnect_enabled=(value) ⇒ Object



95
96
97
# File 'lib/nnq/socket.rb', line 95

def reconnect_enabled=(value)
  @engine.reconnect_enabled = value
end