Class: NNQ::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/nnq/connection.rb

Overview

Per-pipe state: thin wrapper around Protocol::SP::Connection.

Owns no fibers itself — recv loop and send pump are spawned by the Engine and routing strategy respectively.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sp, endpoint: nil) ⇒ Connection

Returns a new instance of Connection.

Parameters:

  • sp (Protocol::SP::Connection)

    handshake-completed SP connection

  • endpoint (String, nil) (defaults to: nil)


22
23
24
25
26
# File 'lib/nnq/connection.rb', line 22

def initialize(sp, endpoint: nil)
  @sp       = sp
  @endpoint = endpoint
  @closed   = false
end

Instance Attribute Details

#endpointString? (readonly)

Returns endpoint URI we connected to / accepted from.

Returns:

  • (String, nil)

    endpoint URI we connected to / accepted from



17
18
19
# File 'lib/nnq/connection.rb', line 17

def endpoint
  @endpoint
end

#spProtocol::SP::Connection (readonly)

Returns:

  • (Protocol::SP::Connection)


13
14
15
# File 'lib/nnq/connection.rb', line 13

def sp
  @sp
end

Instance Method Details

#closeObject

Closes the underlying SP connection. Safe to call twice.



90
91
92
93
94
# File 'lib/nnq/connection.rb', line 90

def close
  return if @closed
  @closed = true
  @sp.close
end

#closed?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/nnq/connection.rb', line 84

def closed?
  @closed
end

#flushvoid

This method returns an undefined value.

Flushes the SP connection’s send buffer to the socket.



70
71
72
# File 'lib/nnq/connection.rb', line 70

def flush
  @sp.flush
end

#peer_protocolInteger

Returns peer protocol id (e.g. Protocols::PULL_V0).

Returns:

  • (Integer)

    peer protocol id (e.g. Protocols::PULL_V0)



30
31
32
# File 'lib/nnq/connection.rb', line 30

def peer_protocol
  @sp.peer_protocol
end

#receive_messageString

Reads one message body off the wire. Blocks the calling fiber.

Returns:

  • (String)


78
79
80
# File 'lib/nnq/connection.rb', line 78

def receive_message
  @sp.receive_message
end

#send_message(body) ⇒ void

This method returns an undefined value.

Writes one message AND flushes immediately. Used by REQ/REP where each call is request-paced and there’s nothing to batch.

Parameters:

  • body (String)

Raises:



61
62
63
64
# File 'lib/nnq/connection.rb', line 61

def send_message(body)
  raise ClosedError, "connection closed" if @closed
  @sp.send_message(body)
end

#write_message(body) ⇒ void

This method returns an undefined value.

Writes one message into the SP connection’s send buffer (no flush).

Parameters:

  • body (String)

Raises:



39
40
41
42
# File 'lib/nnq/connection.rb', line 39

def write_message(body)
  raise ClosedError, "connection closed" if @closed
  @sp.write_message(body)
end

#write_messages(bodies) ⇒ void

This method returns an undefined value.

Writes a batch of bodies under a single SP mutex acquisition. Used by the work-stealing send pump hot path.

Parameters:

  • bodies (Array<String>)

Raises:



50
51
52
53
# File 'lib/nnq/connection.rb', line 50

def write_messages(bodies)
  raise ClosedError, "connection closed" if @closed
  @sp.write_messages(bodies)
end